hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jia...@apache.org
Subject [2/5] hadoop git commit: YARN-1651. CapacityScheduler side changes to support container resize. Contributed by Wangda Tan
Date Tue, 15 Sep 2015 02:30:17 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7b1d3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
index 85d2515..8fa1ad2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationMasterService.java
@@ -18,44 +18,51 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import static java.lang.Thread.sleep;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
-
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
-import org.apache.hadoop.yarn.api.records.*;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
+import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.*;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
-
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.Assert;
-
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static java.lang.Thread.sleep;
 
 public class TestApplicationMasterService {
   private static final Log LOG = LogFactory
@@ -343,6 +350,92 @@ public class TestApplicationMasterService {
     alloc1Response = am1.schedule();
     Assert.assertEquals(0, alloc1Response.getAllocatedContainers().size());
   }
+  
+  @Test(timeout=60000)
+  public void testInvalidIncreaseDecreaseRequest() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    
+    try {
+      rm.start();
+
+      // Register node1
+      MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
+
+      // Submit an application
+      RMApp app1 = rm.submitApp(1024);
+
+      // kick the scheduling
+      nm1.nodeHeartbeat(true);
+      RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
+      MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
+      RegisterApplicationMasterResponse registerResponse =
+          am1.registerAppAttempt();
+      
+      sentRMContainerLaunched(rm,
+          ContainerId.newContainerId(am1.getApplicationAttemptId(), 1));
+      
+      // Ask for a normal increase should be successfull
+      am1.sendContainerResizingRequest(Arrays.asList(
+              ContainerResourceChangeRequest.newInstance(
+                  ContainerId.newContainerId(attempt1.getAppAttemptId(), 1),
+                  Resources.createResource(2048))), null);
+      
+      // Target resource is negative, should fail
+      boolean exceptionCaught = false;
+      try {
+        am1.sendContainerResizingRequest(Arrays.asList(
+                ContainerResourceChangeRequest.newInstance(
+                    ContainerId.newContainerId(attempt1.getAppAttemptId(), 1),
+                    Resources.createResource(-1))), null);
+      } catch (InvalidResourceRequestException e) {
+        // This is expected
+        exceptionCaught = true;
+      }
+      Assert.assertTrue(exceptionCaught);
+      
+      // Target resource is more than maxAllocation, should fail
+      try {
+        am1.sendContainerResizingRequest(Arrays.asList(
+                ContainerResourceChangeRequest.newInstance(
+                    ContainerId.newContainerId(attempt1.getAppAttemptId(), 1),
+                    Resources
+                        .add(registerResponse.getMaximumResourceCapability(),
+                            Resources.createResource(1)))), null);
+      } catch (InvalidResourceRequestException e) {
+        // This is expected
+        exceptionCaught = true;
+      }
+
+      Assert.assertTrue(exceptionCaught);
+      
+      // Contains multiple increase/decrease requests for same contaienrId 
+      try {
+        am1.sendContainerResizingRequest(Arrays.asList(
+                ContainerResourceChangeRequest.newInstance(
+                    ContainerId.newContainerId(attempt1.getAppAttemptId(), 1),
+                    Resources
+                        .add(registerResponse.getMaximumResourceCapability(),
+                            Resources.createResource(1)))), Arrays.asList(
+                ContainerResourceChangeRequest.newInstance(
+                    ContainerId.newContainerId(attempt1.getAppAttemptId(), 1),
+                    Resources
+                        .add(registerResponse.getMaximumResourceCapability(),
+                            Resources.createResource(1)))));
+      } catch (InvalidResourceRequestException e) {
+        // This is expected
+        exceptionCaught = true;
+      }
+
+      Assert.assertTrue(exceptionCaught);
+    } finally {
+      if (rm != null) {
+        rm.close();
+      }
+    }
+  }
 
   private static class MyResourceManager extends MockRM {
 
@@ -354,4 +447,15 @@ public class TestApplicationMasterService {
       return new DrainDispatcher();
     }
   }
+  
+  private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    RMContainer rmContainer = cs.getRMContainer(containerId);
+    if (rmContainer != null) {
+      rmContainer.handle(
+          new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
+    } else {
+      Assert.fail("Cannot find RMContainer");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7b1d3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
index dc843b9..168280a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ControlledClock;
 import org.apache.hadoop.yarn.util.Records;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -331,11 +332,15 @@ public class TestAMRestart {
     MockAM am2 = MockRM.launchAM(app1, rm1, nm1);
     RegisterApplicationMasterResponse registerResponse =
         am2.registerAppAttempt();
-    rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+    rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
 
     // check am2 get the nm token from am1.
-    Assert.assertEquals(expectedNMTokens,
-      registerResponse.getNMTokensFromPreviousAttempts());
+    Assert.assertEquals(expectedNMTokens.size(),
+        registerResponse.getNMTokensFromPreviousAttempts().size());
+    for (int i = 0; i < expectedNMTokens.size(); i++) {
+      Assert.assertTrue(expectedNMTokens.get(i)
+          .equals(registerResponse.getNMTokensFromPreviousAttempts().get(i)));
+    }
 
     // am2 allocate 1 container on nm2
     containers = new ArrayList<Container>();
@@ -365,7 +370,7 @@ public class TestAMRestart {
     // restart am
     MockAM am3 = MockRM.launchAM(app1, rm1, nm1);
     registerResponse = am3.registerAppAttempt();
-    rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+    rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
 
     // check am3 get the NM token from both am1 and am2;
     List<NMToken> transferredTokens = registerResponse.getNMTokensFromPreviousAttempts();
@@ -430,7 +435,7 @@ public class TestAMRestart {
 
     ContainerStatus containerStatus =
         BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE,
-            "", ContainerExitStatus.DISKS_FAILED);
+            "", ContainerExitStatus.DISKS_FAILED, Resources.createResource(200));
     currentNode.containerStatus(containerStatus);
     am1.waitForState(RMAppAttemptState.FAILED);
     rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7b1d3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
index 484a1b6..1f307aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/logaggregationstatus/TestRMAppLogAggregationStatus.java
@@ -165,7 +165,7 @@ public class TestRMAppLogAggregationStatus {
     node1ReportForApp.add(report1);
     node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
       .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
-      null, node1ReportForApp));
+      null, node1ReportForApp, null));
 
     List<LogAggregationReport> node2ReportForApp =
         new ArrayList<LogAggregationReport>();
@@ -177,7 +177,7 @@ public class TestRMAppLogAggregationStatus {
     node2ReportForApp.add(report2);
     node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
       .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
-      null, node2ReportForApp));
+      null, node2ReportForApp, null));
     // node1 and node2 has updated its log aggregation status
     // verify that the log aggregation status for node1, node2
     // has been changed
@@ -215,7 +215,7 @@ public class TestRMAppLogAggregationStatus {
     node1ReportForApp2.add(report1_2);
     node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
       .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
-      null, node1ReportForApp2));
+      null, node1ReportForApp2, null));
 
     // verify that the log aggregation status for node1
     // has been changed
@@ -284,7 +284,7 @@ public class TestRMAppLogAggregationStatus {
     // 10 diagnostic messages/failure messages
     node1.handle(new RMNodeStatusEvent(node1.getNodeID(), NodeHealthStatus
       .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
-      null, node1ReportForApp3));
+      null, node1ReportForApp3, null));
 
     logAggregationStatus = rmApp.getLogAggregationReportsForApp();
     Assert.assertEquals(2, logAggregationStatus.size());
@@ -329,7 +329,7 @@ public class TestRMAppLogAggregationStatus {
     node2ReportForApp2.add(report2_3);
     node2.handle(new RMNodeStatusEvent(node2.getNodeID(), NodeHealthStatus
       .newInstance(true, null, 0), new ArrayList<ContainerStatus>(), null,
-      null, node2ReportForApp2));
+      null, node2ReportForApp2, null));
     Assert.assertEquals(LogAggregationStatus.FAILED,
       rmApp.getLogAggregationStatusForAppReport());
     logAggregationStatus = rmApp.getLogAggregationReportsForApp();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7b1d3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index 10ec453..828e149 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -465,10 +465,9 @@ public class TestRMAppAttemptTransitions {
       expectedAllocateCount = 1;
     }
 
-    assertEquals(expectedState, 
-        applicationAttempt.getAppAttemptState());
-    verify(scheduler, times(expectedAllocateCount)).
-    allocate(any(ApplicationAttemptId.class), 
+    assertEquals(expectedState, applicationAttempt.getAppAttemptState());
+    verify(scheduler, times(expectedAllocateCount)).allocate(
+        any(ApplicationAttemptId.class), any(List.class), any(List.class),
         any(List.class), any(List.class), any(List.class), any(List.class));
 
     assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
@@ -488,11 +487,9 @@ public class TestRMAppAttemptTransitions {
     assertEquals(amContainer, applicationAttempt.getMasterContainer());
     // Check events
     verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
-    verify(scheduler, times(2)).
-        allocate(
-            any(
-                ApplicationAttemptId.class), any(List.class), any(List.class), 
-                any(List.class), any(List.class));
+    verify(scheduler, times(2)).allocate(any(ApplicationAttemptId.class),
+        any(List.class), any(List.class), any(List.class), any(List.class),
+        any(List.class), any(List.class));
     verify(nmTokenManager).clearNodeSetForAttempt(
       applicationAttempt.getAppAttemptId());
   }
@@ -641,13 +638,9 @@ public class TestRMAppAttemptTransitions {
     Allocation allocation = mock(Allocation.class);
     when(allocation.getContainers()).
         thenReturn(Collections.singletonList(container));
-    when(
-        scheduler.allocate(
-            any(ApplicationAttemptId.class), 
-            any(List.class), 
-            any(List.class), 
-            any(List.class), 
-            any(List.class))).
+    when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class),
+        any(List.class), any(List.class), any(List.class), any(List.class),
+        any(List.class))).
     thenReturn(allocation);
     RMContainer rmContainer = mock(RMContainerImpl.class);
     when(scheduler.getRMContainer(container.getId())).
@@ -1511,10 +1504,9 @@ public class TestRMAppAttemptTransitions {
   @Test
   public void testScheduleTransitionReplaceAMContainerRequestWithDefaults() {
     YarnScheduler mockScheduler = mock(YarnScheduler.class);
-    when(
-        mockScheduler.allocate(any(ApplicationAttemptId.class),
-            any(List.class), any(List.class), any(List.class), any(List.class)))
-        .thenAnswer(new Answer<Allocation>() {
+    when(mockScheduler.allocate(any(ApplicationAttemptId.class),
+        any(List.class), any(List.class), any(List.class), any(List.class),
+        any(List.class), any(List.class))).thenAnswer(new Answer<Allocation>() {
 
           @SuppressWarnings("rawtypes")
           @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7b1d3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
index e4e2049..415e891 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmcontainer;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -191,6 +190,10 @@ public class TestRMContainerImpl {
 
     Container container = BuilderUtils.newContainer(containerId, nodeId,
         "host:3465", resource, priority, null);
+    
+    ConcurrentMap<ApplicationId, RMApp> appMap = new ConcurrentHashMap<>();
+    RMApp rmApp = mock(RMApp.class);
+    appMap.putIfAbsent(appId, rmApp);
 
     RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
     SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
@@ -200,6 +203,7 @@ public class TestRMContainerImpl {
     when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
     when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
     when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
+    when(rmContext.getRMApps()).thenReturn(appMap);
     RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
         nodeId, "user", rmContext);
 
@@ -235,11 +239,118 @@ public class TestRMContainerImpl {
     rmContainer.handle(new RMContainerFinishedEvent(containerId,
         containerStatus, RMContainerEventType.EXPIRE));
     drainDispatcher.await();
+    assertEquals(RMContainerState.EXPIRED, rmContainer.getState());
+    verify(writer, times(1)).containerFinished(any(RMContainer.class));
+    verify(publisher, times(1)).containerFinished(any(RMContainer.class),
+        anyLong());
+  }
+  
+  private void testExpireAfterIncreased(boolean acquired) {
+    /*
+     * Similar to previous test, a container is increased but not acquired by
+     * AM. In this case, if a container is expired, the container should be
+     * finished.
+     */
+    DrainDispatcher drainDispatcher = new DrainDispatcher();
+    EventHandler<RMAppAttemptEvent> appAttemptEventHandler =
+        mock(EventHandler.class);
+    EventHandler generic = mock(EventHandler.class);
+    drainDispatcher.register(RMAppAttemptEventType.class,
+        appAttemptEventHandler);
+    drainDispatcher.register(RMNodeEventType.class, generic);
+    drainDispatcher.init(new YarnConfiguration());
+    drainDispatcher.start();
+    NodeId nodeId = BuilderUtils.newNodeId("host", 3425);
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 1);
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
+    ContainerAllocationExpirer expirer = mock(ContainerAllocationExpirer.class);
+
+    Resource resource = BuilderUtils.newResource(512, 1);
+    Priority priority = BuilderUtils.newPriority(5);
+
+    Container container = BuilderUtils.newContainer(containerId, nodeId,
+        "host:3465", resource, priority, null);
+
+    RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
+    SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class);
+    RMContext rmContext = mock(RMContext.class);
+    when(rmContext.getDispatcher()).thenReturn(drainDispatcher);
+    when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer);
+    when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer);
+    when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher);
+    when(rmContext.getYarnConfiguration()).thenReturn(new YarnConfiguration());
+    ConcurrentMap<ApplicationId, RMApp> apps =
+        new ConcurrentHashMap<ApplicationId, RMApp>();
+    apps.put(appId, mock(RMApp.class));
+    when(rmContext.getRMApps()).thenReturn(apps);
+    RMContainer rmContainer = new RMContainerImpl(container, appAttemptId,
+        nodeId, "user", rmContext);
+
+    assertEquals(RMContainerState.NEW, rmContainer.getState());
+    assertEquals(resource, rmContainer.getAllocatedResource());
+    assertEquals(nodeId, rmContainer.getAllocatedNode());
+    assertEquals(priority, rmContainer.getAllocatedPriority());
+    verify(writer).containerStarted(any(RMContainer.class));
+    verify(publisher).containerCreated(any(RMContainer.class), anyLong());
+
+    rmContainer.handle(new RMContainerEvent(containerId,
+        RMContainerEventType.START));
+    drainDispatcher.await();
+    assertEquals(RMContainerState.ALLOCATED, rmContainer.getState());
+
+    rmContainer.handle(new RMContainerEvent(containerId,
+        RMContainerEventType.ACQUIRED));
+    drainDispatcher.await();
+    assertEquals(RMContainerState.ACQUIRED, rmContainer.getState());
+
+    rmContainer.handle(new RMContainerEvent(containerId,
+        RMContainerEventType.LAUNCHED));
+    drainDispatcher.await();
     assertEquals(RMContainerState.RUNNING, rmContainer.getState());
-    verify(writer, never()).containerFinished(any(RMContainer.class));
-    verify(publisher, never()).containerFinished(any(RMContainer.class),
+    assertEquals(
+        "http://host:3465/node/containerlogs/container_1_0001_01_000001/user",
+        rmContainer.getLogURL());
+    
+    // newResource is more than the old resource
+    Resource newResource = BuilderUtils.newResource(1024, 2);
+    rmContainer.handle(new RMContainerChangeResourceEvent(containerId,
+        newResource, true));
+
+    if (acquired) {
+      rmContainer
+          .handle(new RMContainerUpdatesAcquiredEvent(containerId, true));
+      drainDispatcher.await();
+      // status is still RUNNING since this is a increased container acquired by
+      // AM 
+      assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+    }
+
+    // In RUNNING state. Verify EXPIRE and associated actions.
+    reset(appAttemptEventHandler);
+    ContainerStatus containerStatus = SchedulerUtils
+        .createAbnormalContainerStatus(containerId,
+            SchedulerUtils.EXPIRED_CONTAINER);
+    rmContainer.handle(new RMContainerFinishedEvent(containerId,
+        containerStatus, RMContainerEventType.EXPIRE));
+    drainDispatcher.await();
+    assertEquals(RMContainerState.EXPIRED, rmContainer.getState());
+    
+    // Container will be finished only when it is acquired by AM after increase,
+    // we will only notify expirer when it is acquired by AM.
+    verify(writer, times(1)).containerFinished(any(RMContainer.class));
+    verify(publisher, times(1)).containerFinished(any(RMContainer.class),
         anyLong());
   }
+
+  @Test
+  public void testExpireAfterContainerResourceIncreased() throws Exception {
+    // expire after increased and acquired by AM
+    testExpireAfterIncreased(true);
+    // expire after increased but not acquired by AM
+    testExpireAfterIncreased(false);
+  }
   
   @Test
   public void testExistenceOfResourceRequestInRMContainer() throws Exception {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7b1d3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 8bbcf4d..d2fa2c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -31,7 +31,6 @@ import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -59,6 +58,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -103,6 +103,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptI
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
@@ -138,7 +140,6 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
@@ -677,11 +678,11 @@ public class TestCapacityScheduler {
     // Verify the blacklist can be updated independent of requesting containers
     cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
         Collections.<ContainerId>emptyList(),
-        Collections.singletonList(host), null);
+        Collections.singletonList(host), null, null, null);
     Assert.assertTrue(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
     cs.allocate(appAttemptId, Collections.<ResourceRequest>emptyList(),
         Collections.<ContainerId>emptyList(), null,
-        Collections.singletonList(host));
+        Collections.singletonList(host), null, null);
     Assert.assertFalse(cs.getApplicationAttempt(appAttemptId).isBlacklisted(host));
     rm.stop();
   }
@@ -776,7 +777,7 @@ public class TestCapacityScheduler {
     cs.allocate(appAttemptId1,
         Collections.<ResourceRequest>singletonList(r1),
         Collections.<ContainerId>emptyList(),
-        null, null);
+        null, null, null, null);
 
     //And this will result in container assignment for app1
     CapacityScheduler.schedule(cs);
@@ -793,7 +794,7 @@ public class TestCapacityScheduler {
     cs.allocate(appAttemptId2,
         Collections.<ResourceRequest>singletonList(r2),
         Collections.<ContainerId>emptyList(),
-        null, null);
+        null, null, null, null);
 
     //In this case we do not perform container assignment because we want to
     //verify re-ordering based on the allocation alone
@@ -2909,7 +2910,7 @@ public class TestCapacityScheduler {
 
     Allocation allocate =
         cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
-            Collections.<ContainerId> emptyList(), null, null);
+            Collections.<ContainerId> emptyList(), null, null, null, null);
 
     Assert.assertNotNull(attempt);
 
@@ -2925,7 +2926,7 @@ public class TestCapacityScheduler {
 
     allocate =
         cs.allocate(appAttemptId, Collections.<ResourceRequest> emptyList(),
-            Collections.<ContainerId> emptyList(), null, null);
+            Collections.<ContainerId> emptyList(), null, null, null, null);
 
     // All resources should be sent as headroom
     Assert.assertEquals(newResource, allocate.getResourceLimit());
@@ -3086,7 +3087,107 @@ public class TestCapacityScheduler {
     config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
         DominantResourceCalculator.class.getName());
     verifyAMLimitForLeafQueue(config);
+  }
+  
+  private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm,
+      ApplicationId appId) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
+  }
 
+  @Test
+  public void testPendingResourceUpdatedAccordingToIncreaseRequestChanges()
+      throws Exception {
+    Configuration conf =
+        TestUtils.getConfigurationWithQueueLabels(new Configuration(false));
+    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+    
+    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    MockRM rm = new MockRM(conf, memStore) {
+      protected RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    
+    rm.start();
+    
+    MockNM nm1 = // label = ""
+        new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
+    nm1.registerNode();
+    
+    // Launch app1 in queue=a1
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+    
+    // Allocate two more containers
+    am1.allocate(
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+            "*", Resources.createResource(2 * GB), 2)),
+        null);
+    ContainerId containerId1 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    ContainerId containerId2 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    ContainerId containerId3 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+    Assert.assertTrue(rm.waitForState(nm1, containerId3,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    // Acquire them
+    am1.allocate(null, null);
+    sentRMContainerLaunched(rm,
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1L));
+    sentRMContainerLaunched(rm,
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L));
+    sentRMContainerLaunched(rm,
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3L));
+
+    // am1 asks to change its AM container from 1GB to 3GB
+    am1.sendContainerResizingRequest(Arrays.asList(
+            ContainerResourceChangeRequest
+                .newInstance(containerId1, Resources.createResource(3 * GB))),
+        null);
+    
+    FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId());
+    
+    Assert.assertEquals(2 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    checkPendingResource(rm, "a1", 2 * GB, null);
+    checkPendingResource(rm, "a", 2 * GB, null);
+    checkPendingResource(rm, "root", 2 * GB, null);
+    
+    // am1 asks to change containerId2 (2G -> 3G) and containerId3 (2G -> 5G)
+    am1.sendContainerResizingRequest(Arrays.asList(
+            ContainerResourceChangeRequest
+                .newInstance(containerId2, Resources.createResource(3 * GB)),
+            ContainerResourceChangeRequest
+                .newInstance(containerId3, Resources.createResource(5 * GB))),
+        null);
+    
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    checkPendingResource(rm, "a1", 6 * GB, null);
+    checkPendingResource(rm, "a", 6 * GB, null);
+    checkPendingResource(rm, "root", 6 * GB, null);
+    
+    // am1 asks to change containerId1 (1G->3G), containerId2 (2G -> 4G) and
+    // containerId3 (2G -> 2G)
+    am1.sendContainerResizingRequest(Arrays.asList(
+            ContainerResourceChangeRequest
+                .newInstance(containerId1, Resources.createResource(3 * GB)),
+            ContainerResourceChangeRequest
+                .newInstance(containerId2, Resources.createResource(4 * GB)),
+            ContainerResourceChangeRequest
+                .newInstance(containerId3, Resources.createResource(2 * GB))),
+        null);
+    Assert.assertEquals(4 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    checkPendingResource(rm, "a1", 4 * GB, null);
+    checkPendingResource(rm, "a", 4 * GB, null);
+    checkPendingResource(rm, "root", 4 * GB, null);
   }
 
   private void verifyAMLimitForLeafQueue(CapacitySchedulerConfiguration config)
@@ -3148,4 +3249,15 @@ public class TestCapacityScheduler {
         + CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
     conf.setInt(propName, maxAllocVcores);
   }
+  
+  private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    RMContainer rmContainer = cs.getRMContainer(containerId);
+    if (rmContainer != null) {
+      rmContainer.handle(
+          new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
+    } else {
+      Assert.fail("Cannot find RMContainer");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7b1d3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 9dcab2e..88c7c13 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -132,11 +132,11 @@ public class TestChildQueueOrder {
         final Resource allocatedResource = Resources.createResource(allocation);
         if (queue instanceof ParentQueue) {
           ((ParentQueue)queue).allocateResource(clusterResource, 
-              allocatedResource, RMNodeLabelsManager.NO_LABEL);
+              allocatedResource, RMNodeLabelsManager.NO_LABEL, false);
         } else {
           FiCaSchedulerApp app1 = getMockApplication(0, "");
           ((LeafQueue)queue).allocateResource(clusterResource, app1, 
-              allocatedResource, null, null);
+              allocatedResource, null, null, false);
         }
 
         // Next call - nothing

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7b1d3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 769041b..b5b2222 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -60,9 +59,6 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
 
 public class TestContainerAllocation {
 
@@ -199,13 +195,16 @@ public class TestContainerAllocation {
 
     // acquire the container.
     SecurityUtilTestHelper.setTokenServiceUseIp(true);
-    List<Container> containers =
-        am1.allocate(new ArrayList<ResourceRequest>(),
-          new ArrayList<ContainerId>()).getAllocatedContainers();
-    // not able to fetch the container;
-    Assert.assertEquals(0, containers.size());
-
-    SecurityUtilTestHelper.setTokenServiceUseIp(false);
+    List<Container> containers;
+    try {
+      containers =
+          am1.allocate(new ArrayList<ResourceRequest>(),
+              new ArrayList<ContainerId>()).getAllocatedContainers();
+      // not able to fetch the container;
+      Assert.assertEquals(0, containers.size());
+    } finally {
+      SecurityUtilTestHelper.setTokenServiceUseIp(false);
+    }
     containers =
         am1.allocate(new ArrayList<ResourceRequest>(),
           new ArrayList<ContainerId>()).getAllocatedContainers();
@@ -315,21 +314,24 @@ public class TestContainerAllocation {
     rm1.start();
 
     MockNM nm1 = rm1.registerNode("unknownhost:1234", 8000);
-    SecurityUtilTestHelper.setTokenServiceUseIp(true);
-    RMApp app1 = rm1.submitApp(200);
-    RMAppAttempt attempt = app1.getCurrentAppAttempt();
-    nm1.nodeHeartbeat(true);
-
-    // fetching am container will fail, keep retrying 5 times.
-    while (numRetries <= 5) {
+    RMApp app1;
+    try {
+      SecurityUtilTestHelper.setTokenServiceUseIp(true);
+      app1 = rm1.submitApp(200);
+      RMAppAttempt attempt = app1.getCurrentAppAttempt();
       nm1.nodeHeartbeat(true);
-      Thread.sleep(1000);
-      Assert.assertEquals(RMAppAttemptState.SCHEDULED,
-        attempt.getAppAttemptState());
-      System.out.println("Waiting for am container to be allocated.");
-    }
 
-    SecurityUtilTestHelper.setTokenServiceUseIp(false);
+      // fetching am container will fail, keep retrying 5 times.
+      while (numRetries <= 5) {
+        nm1.nodeHeartbeat(true);
+        Thread.sleep(1000);
+        Assert.assertEquals(RMAppAttemptState.SCHEDULED,
+            attempt.getAppAttemptState());
+        System.out.println("Waiting for am container to be allocated.");
+      }
+    } finally {
+      SecurityUtilTestHelper.setTokenServiceUseIp(false);
+    }
     MockRM.launchAndRegisterAM(app1, rm1, nm1);
   }
   

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7b1d3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
new file mode 100644
index 0000000..23283f6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java
@@ -0,0 +1,963 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestContainerResizing {
+  private final int GB = 1024;
+
+  private YarnConfiguration conf;
+
+  RMNodeLabelsManager mgr;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+  }
+
+  @Test
+  public void testSimpleIncreaseContainer() throws Exception {
+    /**
+     * Application has a container running, and the node has enough available
+     * resource. Add a increase request to see if container will be increased
+     */
+    MockRM rm1 = new MockRM() {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
+
+    // app1 -> a1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    ContainerId containerId1 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    sentRMContainerLaunched(rm1, containerId1);
+    // am1 asks to change its AM container from 1GB to 3GB
+    am1.sendContainerResizingRequest(Arrays.asList(
+            ContainerResourceChangeRequest
+                .newInstance(containerId1, Resources.createResource(3 * GB))),
+        null);
+
+    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+    checkPendingResource(rm1, "default", 2 * GB, null);
+    Assert.assertEquals(2 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+
+    // NM1 do 1 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    // Pending resource should be deducted
+    checkPendingResource(rm1, "default", 0 * GB, null);
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+
+    verifyContainerIncreased(am1.allocate(null, null), containerId1, 3 * GB);
+    verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 17 * GB);
+
+    rm1.close();
+  }
+
+  @Test
+  public void testSimpleDecreaseContainer() throws Exception {
+    /**
+     * Application has a container running, try to decrease the container and
+     * check queue's usage and container resource will be updated.
+     */
+    MockRM rm1 = new MockRM() {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
+
+    // app1 -> a1
+    RMApp app1 = rm1.submitApp(3 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+    checkUsedResource(rm1, "default", 3 * GB, null);
+    Assert.assertEquals(3 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+    ContainerId containerId1 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    sentRMContainerLaunched(rm1, containerId1);
+
+    // am1 asks to change its AM container from 1GB to 3GB
+    AllocateResponse response = am1.sendContainerResizingRequest(null, Arrays
+        .asList(ContainerResourceChangeRequest
+            .newInstance(containerId1, Resources.createResource(1 * GB))));
+
+    verifyContainerDecreased(response, containerId1, 1 * GB);
+    checkUsedResource(rm1, "default", 1 * GB, null);
+    Assert.assertEquals(1 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+    // Check if decreased containers added to RMNode
+    RMNodeImpl rmNode =
+        (RMNodeImpl) rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    Collection<Container> decreasedContainers =
+        rmNode.getToBeDecreasedContainers();
+    boolean rmNodeReceivedDecreaseContainer = false;
+    for (Container c : decreasedContainers) {
+      if (c.getId().equals(containerId1)
+          && c.getResource().equals(Resources.createResource(1 * GB))) {
+        rmNodeReceivedDecreaseContainer = true;
+      }
+    }
+    Assert.assertTrue(rmNodeReceivedDecreaseContainer);
+
+    rm1.close();
+  }
+
+  @Test
+  public void testSimpleIncreaseRequestReservation() throws Exception {
+    /**
+     * Application has two containers running, try to increase one of then, node
+     * doesn't have enough resource, so the increase request will be reserved.
+     * Check resource usage after container reserved, finish a container, the
+     * reserved container should be allocated.
+     */
+    MockRM rm1 = new MockRM() {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+    // app1 -> a1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+    // Allocate two more containers
+    am1.allocate(
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*",
+            Resources.createResource(2 * GB), 1)),
+        null);
+    ContainerId containerId2 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertTrue(rm1.waitForState(nm1, containerId2,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    // Acquire them, and NM report RUNNING
+    am1.allocate(null, null);
+    sentRMContainerLaunched(rm1, containerId2);
+
+    ContainerId containerId1 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    sentRMContainerLaunched(rm1, containerId1);
+
+
+    // am1 asks to change its AM container from 1GB to 3GB
+    am1.sendContainerResizingRequest(Arrays.asList(
+            ContainerResourceChangeRequest
+                .newInstance(containerId1, Resources.createResource(7 * GB))),
+        null);
+
+    checkPendingResource(rm1, "default", 6 * GB, null);
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+
+    // NM1 do 1 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
+    
+    /* Check reservation statuses */
+    // Increase request should be reserved
+    Assert.assertTrue(rmContainer1.hasIncreaseReservation());
+    Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemory());
+    Assert.assertFalse(app.getReservedContainers().isEmpty());
+    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    // Pending resource will not be changed since it's not satisfied
+    checkPendingResource(rm1, "default", 6 * GB, null);
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    // Queue/user/application's usage will be updated
+    checkUsedResource(rm1, "default", 9 * GB, null);
+    Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default"))
+        .getUser("user").getUsed().getMemory());
+    Assert.assertEquals(3 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getReserved().getMemory());
+
+    // Complete one container and do another allocation
+    am1.allocate(null, Arrays.asList(containerId2));
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    // Now container should be increased
+    verifyContainerIncreased(am1.allocate(null, null), containerId1, 7 * GB);
+    
+    /* Check statuses after reservation satisfied */
+    // Increase request should be unreserved
+    Assert.assertFalse(rmContainer1.hasIncreaseReservation());
+    Assert.assertTrue(app.getReservedContainers().isEmpty());
+    Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    // Pending resource will be changed since it's satisfied
+    checkPendingResource(rm1, "default", 0 * GB, null);
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    // Queue/user/application's usage will be updated
+    checkUsedResource(rm1, "default", 7 * GB, null);
+    Assert.assertEquals(7 * GB, ((LeafQueue) cs.getQueue("default"))
+        .getUser("user").getUsed().getMemory());
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getReserved().getMemory());
+    Assert.assertEquals(7 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    verifyAvailableResourceOfSchedulerNode(rm1, nm1.getNodeId(), 1 * GB);
+
+    rm1.close();
+  }
+
+  @Test
+  public void testExcessiveReservationWhenCancelIncreaseRequest()
+      throws Exception {
+    /**
+     * Application has two containers running, try to increase one of then, node
+     * doesn't have enough resource, so the increase request will be reserved.
+     * Check resource usage after container reserved, finish a container &
+     * cancel the increase request, reservation should be cancelled
+     */
+    MockRM rm1 = new MockRM() {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+    // app1 -> a1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+    // Allocate two more containers
+    am1.allocate(
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*",
+            Resources.createResource(2 * GB), 1)),
+        null);
+    ContainerId containerId2 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertTrue(rm1.waitForState(nm1, containerId2,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    // Acquire them, and NM report RUNNING
+    am1.allocate(null, null);
+    sentRMContainerLaunched(rm1, containerId2);
+
+    ContainerId containerId1 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    sentRMContainerLaunched(rm1, containerId1);
+
+    // am1 asks to change its AM container from 1GB to 3GB
+    am1.sendContainerResizingRequest(Arrays.asList(
+            ContainerResourceChangeRequest
+                .newInstance(containerId1, Resources.createResource(7 * GB))),
+        null);
+
+    checkPendingResource(rm1, "default", 6 * GB, null);
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+
+    // NM1 do 1 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
+    
+    /* Check reservation statuses */
+    // Increase request should be reserved
+    Assert.assertTrue(rmContainer1.hasIncreaseReservation());
+    Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemory());
+    Assert.assertFalse(app.getReservedContainers().isEmpty());
+    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    // Pending resource will not be changed since it's not satisfied
+    checkPendingResource(rm1, "default", 6 * GB, null);
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    // Queue/user/application's usage will be updated
+    checkUsedResource(rm1, "default", 9 * GB, null);
+    Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default"))
+        .getUser("user").getUsed().getMemory());
+    Assert.assertEquals(3 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getReserved().getMemory());
+
+    // Complete one container and cancel increase request (via send a increase
+    // request, make target_capacity=existing_capacity)
+    am1.allocate(null, Arrays.asList(containerId2));
+    // am1 asks to change its AM container from 1G to 1G (cancel the increase
+    // request actually)
+    am1.sendContainerResizingRequest(Arrays.asList(
+            ContainerResourceChangeRequest
+                .newInstance(containerId1, Resources.createResource(1 * GB))),
+        null);
+    // Trigger a node heartbeat..
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    
+    /* Check statuses after reservation satisfied */
+    // Increase request should be unreserved
+    Assert.assertTrue(app.getReservedContainers().isEmpty());
+    Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    Assert.assertFalse(rmContainer1.hasIncreaseReservation());
+    // Pending resource will be changed since it's satisfied
+    checkPendingResource(rm1, "default", 0 * GB, null);
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    // Queue/user/application's usage will be updated
+    checkUsedResource(rm1, "default", 1 * GB, null);
+    Assert.assertEquals(1 * GB, ((LeafQueue) cs.getQueue("default"))
+        .getUser("user").getUsed().getMemory());
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getReserved().getMemory());
+    Assert.assertEquals(1 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+    rm1.close();
+  }
+
+  @Test
+  public void testExcessiveReservationWhenDecreaseSameContainer()
+      throws Exception {
+    /**
+     * Very similar to testExcessiveReservationWhenCancelIncreaseRequest, after
+     * the increase request reserved, it decreases the reserved container,
+     * container should be decreased and reservation will be cancelled
+     */
+    MockRM rm1 = new MockRM() {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+    // app1 -> a1
+    RMApp app1 = rm1.submitApp(2 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+    // Allocate two more containers
+    am1.allocate(
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*",
+            Resources.createResource(2 * GB), 1)),
+        null);
+    ContainerId containerId2 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertTrue(rm1.waitForState(nm1, containerId2,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    // Acquire them, and NM report RUNNING
+    am1.allocate(null, null);
+    sentRMContainerLaunched(rm1, containerId2);
+
+    ContainerId containerId1 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+    sentRMContainerLaunched(rm1, containerId1);
+
+
+    // am1 asks to change its AM container from 2GB to 8GB
+    am1.sendContainerResizingRequest(Arrays.asList(
+            ContainerResourceChangeRequest
+                .newInstance(containerId1, Resources.createResource(8 * GB))),
+        null);
+
+    checkPendingResource(rm1, "default", 6 * GB, null);
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+
+    // NM1 do 1 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    RMContainer rmContainer1 = app.getLiveContainersMap().get(containerId1);
+    
+    /* Check reservation statuses */
+    // Increase request should be reserved
+    Assert.assertTrue(rmContainer1.hasIncreaseReservation());
+    Assert.assertEquals(6 * GB, rmContainer1.getReservedResource().getMemory());
+    Assert.assertFalse(app.getReservedContainers().isEmpty());
+    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    // Pending resource will not be changed since it's not satisfied
+    checkPendingResource(rm1, "default", 6 * GB, null);
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    // Queue/user/application's usage will be updated
+    checkUsedResource(rm1, "default", 10 * GB, null);
+    Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default"))
+        .getUser("user").getUsed().getMemory());
+    Assert.assertEquals(4 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getReserved().getMemory());
+
+    // Complete one container and cancel increase request (via send a increase
+    // request, make target_capacity=existing_capacity)
+    am1.allocate(null, Arrays.asList(containerId2));
+    // am1 asks to change its AM container from 2G to 1G (decrease)
+    am1.sendContainerResizingRequest(null, Arrays.asList(
+            ContainerResourceChangeRequest
+                .newInstance(containerId1, Resources.createResource(1 * GB))));
+    // Trigger a node heartbeat..
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    
+    /* Check statuses after reservation satisfied */
+    // Increase request should be unreserved
+    Assert.assertTrue(app.getReservedContainers().isEmpty());
+    Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    Assert.assertFalse(rmContainer1.hasIncreaseReservation());
+    // Pending resource will be changed since it's satisfied
+    checkPendingResource(rm1, "default", 0 * GB, null);
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    // Queue/user/application's usage will be updated
+    checkUsedResource(rm1, "default", 1 * GB, null);
+    Assert.assertEquals(1 * GB, ((LeafQueue) cs.getQueue("default"))
+        .getUser("user").getUsed().getMemory());
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getReserved().getMemory());
+    Assert.assertEquals(1 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+    rm1.close();
+  }
+
+  @Test
+  public void testIncreaseContainerUnreservedWhenContainerCompleted()
+      throws Exception {
+    /**
+     * App has two containers on the same node (node.resource = 8G), container1
+     * = 2G, container2 = 2G. App asks to increase container2 to 8G.
+     *
+     * So increase container request will be reserved. When app releases
+     * container2, reserved part should be released as well.
+     */
+    MockRM rm1 = new MockRM() {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+    // app1 -> a1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+    // Allocate two more containers
+    am1.allocate(
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*",
+            Resources.createResource(2 * GB), 1)),
+        null);
+    ContainerId containerId2 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertTrue(rm1.waitForState(nm1, containerId2,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    // Acquire them, and NM report RUNNING
+    am1.allocate(null, null);
+    sentRMContainerLaunched(rm1, containerId2);
+    rm1.waitForContainerState(containerId2, RMContainerState.RUNNING);
+
+    // am1 asks to change its AM container from 2GB to 8GB
+    am1.sendContainerResizingRequest(Arrays.asList(
+            ContainerResourceChangeRequest
+                .newInstance(containerId2, Resources.createResource(8 * GB))),
+        null);
+
+    checkPendingResource(rm1, "default", 6 * GB, null);
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+
+    // NM1 do 1 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2);
+    
+    /* Check reservation statuses */
+    // Increase request should be reserved
+    Assert.assertTrue(rmContainer2.hasIncreaseReservation());
+    Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemory());
+    Assert.assertFalse(app.getReservedContainers().isEmpty());
+    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    // Pending resource will not be changed since it's not satisfied
+    checkPendingResource(rm1, "default", 6 * GB, null);
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    // Queue/user/application's usage will be updated
+    checkUsedResource(rm1, "default", 9 * GB, null);
+    Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default"))
+        .getUser("user").getUsed().getMemory());
+    Assert.assertEquals(3 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getReserved().getMemory());
+
+    // Complete container2, container will be unreserved and completed
+    am1.allocate(null, Arrays.asList(containerId2));
+    
+    /* Check statuses after reservation satisfied */
+    // Increase request should be unreserved
+    Assert.assertTrue(app.getReservedContainers().isEmpty());
+    Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    Assert.assertFalse(rmContainer2.hasIncreaseReservation());
+    // Pending resource will be changed since it's satisfied
+    checkPendingResource(rm1, "default", 0 * GB, null);
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    // Queue/user/application's usage will be updated
+    checkUsedResource(rm1, "default", 1 * GB, null);
+    Assert.assertEquals(1 * GB, ((LeafQueue) cs.getQueue("default"))
+        .getUser("user").getUsed().getMemory());
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getReserved().getMemory());
+    Assert.assertEquals(1 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+    rm1.close();
+  }
+
+  @Test
+  public void testIncreaseContainerUnreservedWhenApplicationCompleted()
+      throws Exception {
+    /**
+     * Similar to testIncreaseContainerUnreservedWhenContainerCompleted, when
+     * application finishes, reserved increase container should be cancelled
+     */
+    MockRM rm1 = new MockRM() {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+    // app1 -> a1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+
+    // Allocate two more containers
+    am1.allocate(
+        Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*",
+            Resources.createResource(2 * GB), 1)),
+        null);
+    ContainerId containerId2 =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertTrue(
+        rm1.waitForState(nm1, containerId2, RMContainerState.ALLOCATED,
+            10 * 1000));
+    // Acquire them, and NM report RUNNING
+    am1.allocate(null, null);
+    sentRMContainerLaunched(rm1, containerId2);
+
+    // am1 asks to change its AM container from 2GB to 8GB
+    am1.sendContainerResizingRequest(Arrays.asList(
+            ContainerResourceChangeRequest
+                .newInstance(containerId2, Resources.createResource(8 * GB))),
+        null);
+
+    checkPendingResource(rm1, "default", 6 * GB, null);
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+
+    // NM1 do 1 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    RMContainer rmContainer2 = app.getLiveContainersMap().get(containerId2);
+    
+    /* Check reservation statuses */
+    // Increase request should be reserved
+    Assert.assertTrue(rmContainer2.hasIncreaseReservation());
+    Assert.assertEquals(6 * GB, rmContainer2.getReservedResource().getMemory());
+    Assert.assertFalse(app.getReservedContainers().isEmpty());
+    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    // Pending resource will not be changed since it's not satisfied
+    checkPendingResource(rm1, "default", 6 * GB, null);
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    // Queue/user/application's usage will be updated
+    checkUsedResource(rm1, "default", 9 * GB, null);
+    Assert.assertEquals(9 * GB, ((LeafQueue) cs.getQueue("default"))
+        .getUser("user").getUsed().getMemory());
+    Assert.assertEquals(3 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getReserved().getMemory());
+
+    // Kill the application
+    cs.handle(new AppAttemptRemovedSchedulerEvent(am1.getApplicationAttemptId(),
+        RMAppAttemptState.KILLED, false));
+
+    /* Check statuses after reservation satisfied */
+    // Increase request should be unreserved
+    Assert.assertTrue(app.getReservedContainers().isEmpty());
+    Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    Assert.assertFalse(rmContainer2.hasIncreaseReservation());
+    // Pending resource will be changed since it's satisfied
+    checkPendingResource(rm1, "default", 0 * GB, null);
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    // Queue/user/application's usage will be updated
+    checkUsedResource(rm1, "default", 0 * GB, null);
+    Assert.assertEquals(0 * GB, ((LeafQueue) cs.getQueue("default"))
+        .getUser("user").getUsed().getMemory());
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getReserved().getMemory());
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+    rm1.close();
+  }
+
+  private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
+      int nContainer, int mem, int priority, int startContainerId)
+          throws Exception {
+    am.allocate(Arrays
+        .asList(ResourceRequest.newInstance(Priority.newInstance(priority), "*",
+            Resources.createResource(mem), nContainer)),
+        null);
+    ContainerId lastContainerId = ContainerId.newContainerId(
+        am.getApplicationAttemptId(), startContainerId + nContainer - 1);
+    Assert.assertTrue(rm.waitForState(nm, lastContainerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    // Acquire them, and NM report RUNNING
+    am.allocate(null, null);
+
+    for (int cId = startContainerId; cId < startContainerId
+        + nContainer; cId++) {
+      sentRMContainerLaunched(rm,
+          ContainerId.newContainerId(am.getApplicationAttemptId(), cId));
+      rm.waitForContainerState(
+          ContainerId.newContainerId(am.getApplicationAttemptId(), cId),
+          RMContainerState.RUNNING);
+    }
+  }
+
+  @Test
+  public void testOrderOfIncreaseContainerRequestAllocation()
+      throws Exception {
+    /**
+     * There're multiple containers need to be increased, check container will
+     * be increase sorted by priority, if priority is same, smaller containerId
+     * container will get preferred
+     */
+    MockRM rm1 = new MockRM() {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
+
+    // app1 -> a1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+    ApplicationAttemptId attemptId = am1.getApplicationAttemptId();
+
+    // Container 2, 3 (priority=3)
+    allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 3, 2);
+
+    // Container 4, 5 (priority=2)
+    allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 2, 4);
+
+    // Container 6, 7 (priority=4)
+    allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 4, 6);
+
+    // am1 asks to change its container[2-7] from 1G to 2G
+    List<ContainerResourceChangeRequest> increaseRequests = new ArrayList<>();
+    for (int cId = 2; cId <= 7; cId++) {
+      ContainerId containerId =
+          ContainerId.newContainerId(am1.getApplicationAttemptId(), cId);
+      increaseRequests.add(ContainerResourceChangeRequest
+          .newInstance(containerId, Resources.createResource(2 * GB)));
+    }
+    am1.sendContainerResizingRequest(increaseRequests, null);
+
+    checkPendingResource(rm1, "default", 6 * GB, null);
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+
+    // Get rmNode1
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    // assignContainer, container-4/5/2 increased (which has highest priority OR
+    // earlier allocated)
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    AllocateResponse allocateResponse = am1.allocate(null, null);
+    Assert.assertEquals(3, allocateResponse.getIncreasedContainers().size());
+    verifyContainerIncreased(allocateResponse,
+        ContainerId.newContainerId(attemptId, 4), 2 * GB);
+    verifyContainerIncreased(allocateResponse,
+        ContainerId.newContainerId(attemptId, 5), 2 * GB);
+    verifyContainerIncreased(allocateResponse,
+        ContainerId.newContainerId(attemptId, 2), 2 * GB);
+
+    /* Check statuses after allocation */
+    // There're still 3 pending increase requests
+    checkPendingResource(rm1, "default", 3 * GB, null);
+    Assert.assertEquals(3 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    // Queue/user/application's usage will be updated
+    checkUsedResource(rm1, "default", 10 * GB, null);
+    Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default"))
+        .getUser("user").getUsed().getMemory());
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getReserved().getMemory());
+    Assert.assertEquals(10 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+    rm1.close();
+  }
+
+  @Test
+  public void testIncreaseContainerRequestGetPreferrence()
+      throws Exception {
+    /**
+     * There're multiple containers need to be increased, and there're several
+     * container allocation request, scheduler will try to increase container
+     * before allocate new containers
+     */
+    MockRM rm1 = new MockRM() {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
+
+    // app1 -> a1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    FiCaSchedulerApp app = getFiCaSchedulerApp(rm1, app1.getApplicationId());
+    ApplicationAttemptId attemptId = am1.getApplicationAttemptId();
+
+    // Container 2, 3 (priority=3)
+    allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 3, 2);
+
+    // Container 4, 5 (priority=2)
+    allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 2, 4);
+
+    // Container 6, 7 (priority=4)
+    allocateAndLaunchContainers(am1, nm1, rm1, 2, 1 * GB, 4, 6);
+
+    // am1 asks to change its container[2-7] from 1G to 2G
+    List<ContainerResourceChangeRequest> increaseRequests = new ArrayList<>();
+    for (int cId = 2; cId <= 7; cId++) {
+      ContainerId containerId =
+          ContainerId.newContainerId(am1.getApplicationAttemptId(), cId);
+      increaseRequests.add(ContainerResourceChangeRequest
+          .newInstance(containerId, Resources.createResource(2 * GB)));
+    }
+    am1.sendContainerResizingRequest(increaseRequests, null);
+
+    checkPendingResource(rm1, "default", 6 * GB, null);
+    Assert.assertEquals(6 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+
+    // Get rmNode1
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    // assignContainer, container-4/5/2 increased (which has highest priority OR
+    // earlier allocated)
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    AllocateResponse allocateResponse = am1.allocate(null, null);
+    Assert.assertEquals(3, allocateResponse.getIncreasedContainers().size());
+    verifyContainerIncreased(allocateResponse,
+        ContainerId.newContainerId(attemptId, 4), 2 * GB);
+    verifyContainerIncreased(allocateResponse,
+        ContainerId.newContainerId(attemptId, 5), 2 * GB);
+    verifyContainerIncreased(allocateResponse,
+        ContainerId.newContainerId(attemptId, 2), 2 * GB);
+
+    /* Check statuses after allocation */
+    // There're still 3 pending increase requests
+    checkPendingResource(rm1, "default", 3 * GB, null);
+    Assert.assertEquals(3 * GB,
+        app.getAppAttemptResourceUsage().getPending().getMemory());
+    // Queue/user/application's usage will be updated
+    checkUsedResource(rm1, "default", 10 * GB, null);
+    Assert.assertEquals(10 * GB, ((LeafQueue) cs.getQueue("default"))
+        .getUser("user").getUsed().getMemory());
+    Assert.assertEquals(0 * GB,
+        app.getAppAttemptResourceUsage().getReserved().getMemory());
+    Assert.assertEquals(10 * GB,
+        app.getAppAttemptResourceUsage().getUsed().getMemory());
+
+    rm1.close();
+  }
+
+  private void checkPendingResource(MockRM rm, String queueName, int memory,
+      String label) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue queue = cs.getQueue(queueName);
+    Assert.assertEquals(memory,
+        queue.getQueueResourceUsage()
+            .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+            .getMemory());
+  }
+
+  private void checkUsedResource(MockRM rm, String queueName, int memory,
+      String label) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue queue = cs.getQueue(queueName);
+    Assert.assertEquals(memory,
+        queue.getQueueResourceUsage()
+            .getUsed(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+            .getMemory());
+  }
+
+  private void verifyContainerIncreased(AllocateResponse response,
+      ContainerId containerId, int mem) {
+    List<Container> increasedContainers = response.getIncreasedContainers();
+    boolean found = false;
+    for (Container c : increasedContainers) {
+      if (c.getId().equals(containerId)) {
+        found = true;
+        Assert.assertEquals(mem, c.getResource().getMemory());
+      }
+    }
+    if (!found) {
+      Assert.fail("Container not increased: containerId=" + containerId);
+    }
+  }
+
+  private void verifyContainerDecreased(AllocateResponse response,
+      ContainerId containerId, int mem) {
+    List<Container> decreasedContainers = response.getDecreasedContainers();
+    boolean found = false;
+    for (Container c : decreasedContainers) {
+      if (c.getId().equals(containerId)) {
+        found = true;
+        Assert.assertEquals(mem, c.getResource().getMemory());
+      }
+    }
+    if (!found) {
+      Assert.fail("Container not decreased: containerId=" + containerId);
+    }
+  }
+
+  private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    RMContainer rmContainer = cs.getRMContainer(containerId);
+    if (rmContainer != null) {
+      rmContainer.handle(
+          new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
+    } else {
+      Assert.fail("Cannot find RMContainer");
+    }
+  }
+
+  private void verifyAvailableResourceOfSchedulerNode(MockRM rm, NodeId nodeId,
+      int expectedMemory) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    SchedulerNode node = cs.getNode(nodeId);
+    Assert
+        .assertEquals(expectedMemory, node.getAvailableResource().getMemory());
+  }
+
+  private FiCaSchedulerApp getFiCaSchedulerApp(MockRM rm,
+      ApplicationId appId) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    return cs.getSchedulerApplications().get(appId).getCurrentAppAttempt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4a7b1d3d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index fe8be06..b85c697 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -770,9 +770,9 @@ public class TestLeafQueue {
     qb.finishApplication(app_0.getApplicationId(), user_0);
     qb.finishApplication(app_2.getApplicationId(), user_1);
     qb.releaseResource(clusterResource, app_0, app_0.getResource(u0Priority),
-        null, null);
+        null, null, false);
     qb.releaseResource(clusterResource, app_2, app_2.getResource(u1Priority),
-        null, null);
+        null, null, false);
 
     qb.setUserLimit(50);
     qb.setUserLimitFactor(1);


Mime
View raw message