Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8E1671753C for ; Mon, 29 Sep 2014 21:02:11 +0000 (UTC) Received: (qmail 38800 invoked by uid 500); 29 Sep 2014 21:02:11 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 38705 invoked by uid 500); 29 Sep 2014 21:02:11 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 38688 invoked by uid 99); 29 Sep 2014 21:02:11 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Sep 2014 21:02:11 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id E3887A02205; Mon, 29 Sep 2014 21:02:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: arp@apache.org To: common-commits@hadoop.apache.org Date: Mon, 29 Sep 2014 21:02:10 -0000 Message-Id: <0815810fbbe64059a00930ad2e242eb9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/15] YARN-1769. CapacityScheduler: Improve reservations. Contributed by Thomas Graves Repository: hadoop Updated Branches: refs/heads/HDFS-6581 d45e7c7e8 -> 1e6b4b9b2 http://git-wip-us.apache.org/repos/asf/hadoop/blob/9c220651/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java new file mode 100644 index 0000000..0f8290e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -0,0 +1,1184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.when; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; +import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +public class TestReservations { + + private static final Log LOG = LogFactory.getLog(TestReservations.class); + + private final RecordFactory recordFactory = RecordFactoryProvider + .getRecordFactory(null); + + RMContext rmContext; + CapacityScheduler cs; + // CapacitySchedulerConfiguration csConf; + CapacitySchedulerContext csContext; + + private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); + + CSQueue root; + Map queues = new HashMap(); + Map oldQueues = new HashMap(); + + final static int GB = 1024; + final static String DEFAULT_RACK = "/default"; + + @Before + public void setUp() throws Exception { + CapacityScheduler spyCs = new CapacityScheduler(); + cs = spy(spyCs); + rmContext = TestUtils.getMockRMContext(); + + } + + private void setup(CapacitySchedulerConfiguration csConf) throws Exception { + + csConf.setBoolean("yarn.scheduler.capacity.user-metrics.enable", true); + final String newRoot = "root" + System.currentTimeMillis(); + // final String newRoot = "root"; + + setupQueueConfiguration(csConf, newRoot); + YarnConfiguration conf = new YarnConfiguration(); + cs.setConf(conf); + + csContext = mock(CapacitySchedulerContext.class); + when(csContext.getConfiguration()).thenReturn(csConf); + when(csContext.getConf()).thenReturn(conf); + when(csContext.getMinimumResourceCapability()).thenReturn( + Resources.createResource(GB, 1)); + when(csContext.getMaximumResourceCapability()).thenReturn( + Resources.createResource(16 * GB, 12)); + when(csContext.getClusterResource()).thenReturn( + Resources.createResource(100 * 16 * GB, 100 * 12)); + when(csContext.getApplicationComparator()).thenReturn( + CapacityScheduler.applicationComparator); + when(csContext.getQueueComparator()).thenReturn( + CapacityScheduler.queueComparator); + when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); + RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager( + conf); + containerTokenSecretManager.rollMasterKey(); + when(csContext.getContainerTokenSecretManager()).thenReturn( + containerTokenSecretManager); + + root = CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); + + cs.setRMContext(rmContext); + cs.init(csConf); + cs.start(); + } + + private static final String A = "a"; + + private void setupQueueConfiguration(CapacitySchedulerConfiguration conf, + final String newRoot) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { newRoot }); + conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100); + conf.setAcl(CapacitySchedulerConfiguration.ROOT, + QueueACL.SUBMIT_APPLICATIONS, " "); + + final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + + newRoot; + conf.setQueues(Q_newRoot, new String[] { A }); + conf.setCapacity(Q_newRoot, 100); + conf.setMaximumCapacity(Q_newRoot, 100); + conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " "); + + final String Q_A = Q_newRoot + "." + A; + conf.setCapacity(Q_A, 100f); + conf.setMaximumCapacity(Q_A, 100); + conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*"); + + } + + static LeafQueue stubLeafQueue(LeafQueue queue) { + + // Mock some methods for ease in these unit tests + + // 1. LeafQueue.createContainer to return dummy containers + doAnswer(new Answer() { + @Override + public Container answer(InvocationOnMock invocation) throws Throwable { + final FiCaSchedulerApp application = (FiCaSchedulerApp) (invocation + .getArguments()[0]); + final ContainerId containerId = TestUtils + .getMockContainerId(application); + + Container container = TestUtils.getMockContainer(containerId, + ((FiCaSchedulerNode) (invocation.getArguments()[1])).getNodeID(), + (Resource) (invocation.getArguments()[2]), + ((Priority) invocation.getArguments()[3])); + return container; + } + }).when(queue).createContainer(any(FiCaSchedulerApp.class), + any(FiCaSchedulerNode.class), any(Resource.class), any(Priority.class)); + + // 2. Stub out LeafQueue.parent.completedContainer + CSQueue parent = queue.getParent(); + doNothing().when(parent).completedContainer(any(Resource.class), + any(FiCaSchedulerApp.class), any(FiCaSchedulerNode.class), + any(RMContainer.class), any(ContainerStatus.class), + any(RMContainerEventType.class), any(CSQueue.class), anyBoolean()); + + return queue; + } + + @Test + public void testReservation() throws Exception { + // Test that we now unreserve and use a node that has space + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 3; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(22 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(19 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // try to assign reducer (5G on node 0 and should reserve) + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(11 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // assign reducer to node 2 + a.assignContainers(clusterResource, node_2, false); + assertEquals(18 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, a.getMetrics().getAvailableMB()); + assertEquals(6 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); + + // node_1 heartbeat and unreserves from node_0 in order to allocate + // on node_1 + a.assignContainers(clusterResource, node_1, false); + assertEquals(18 * GB, a.getUsedResources().getMemory()); + assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(18 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, a.getMetrics().getAvailableMB()); + assertEquals(6 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(8 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + assertEquals(0, app_0.getTotalRequiredResources(priorityReduce)); + } + + @Test + public void testReservationNoContinueLook() throws Exception { + // Test that with reservations-continue-look-all-nodes feature off + // we don't unreserve and show we could get stuck + + queues = new HashMap(); + // test that the deadlock occurs when turned off + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setBoolean( + "yarn.scheduler.capacity.reservations-continue-look-all-nodes", false); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 3; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(22 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(19 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // try to assign reducer (5G on node 0 and should reserve) + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(11 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // assign reducer to node 2 + a.assignContainers(clusterResource, node_2, false); + assertEquals(18 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, a.getMetrics().getAvailableMB()); + assertEquals(6 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); + + // node_1 heartbeat and won't unreserve from node_0, potentially stuck + // if AM doesn't handle + a.assignContainers(clusterResource, node_1, false); + assertEquals(18 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, a.getMetrics().getAvailableMB()); + assertEquals(6 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); + } + + @Test + public void testAssignContainersNeedToUnreserve() throws Exception { + // Test that we now unreserve and use a node that has space + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(14 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(8 * GB, a.getMetrics().getAvailableMB()); + assertEquals(8 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // try to assign reducer (5G on node 0 and should reserve) + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getReservedContainer().getReservedResource() + .getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(2, app_0.getTotalRequiredResources(priorityReduce)); + + // could allocate but told need to unreserve first + a.assignContainers(clusterResource, node_1, true); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(8 * GB, node_1.getUsedResource().getMemory()); + assertEquals(1, app_0.getTotalRequiredResources(priorityReduce)); + } + + @Test + public void testGetAppToUnreserve() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + final String user_0 = "user_0"; + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + + // Setup resource-requests + Priority priorityMap = TestUtils.createMockPriority(5); + Resource capability = Resources.createResource(2*GB, 0); + + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); + RMContext rmContext = mock(RMContext.class); + ContainerAllocationExpirer expirer = + mock(ContainerAllocationExpirer.class); + DrainDispatcher drainDispatcher = new DrainDispatcher(); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + app_0.getApplicationId(), 1); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); + Container container = TestUtils.getMockContainer(containerId, + node_1.getNodeID(), Resources.createResource(2*GB), priorityMap); + RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + node_1.getNodeID(), "user", rmContext); + + Container container_1 = TestUtils.getMockContainer(containerId, + node_0.getNodeID(), Resources.createResource(1*GB), priorityMap); + RMContainer rmContainer_1 = new RMContainerImpl(container_1, appAttemptId, + node_0.getNodeID(), "user", rmContext); + + // no reserved containers + NodeId unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + assertEquals(null, unreserveId); + + // no reserved containers - reserve then unreserve + app_0.reserve(node_0, priorityMap, rmContainer_1, container_1); + app_0.unreserve(node_0, priorityMap); + unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + assertEquals(null, unreserveId); + + // no container large enough is reserved + app_0.reserve(node_0, priorityMap, rmContainer_1, container_1); + unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + assertEquals(null, unreserveId); + + // reserve one that is now large enough + app_0.reserve(node_1, priorityMap, rmContainer, container); + unreserveId = app_0.getNodeIdToUnreserve(priorityMap, capability); + assertEquals(node_1.getNodeID(), unreserveId); + } + + @Test + public void testFindNodeToUnreserve() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + final String user_0 = "user_0"; + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + + // Setup resource-requests + Priority priorityMap = TestUtils.createMockPriority(5); + Resource capability = Resources.createResource(2 * GB, 0); + + RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); + SystemMetricsPublisher publisher = mock(SystemMetricsPublisher.class); + RMContext rmContext = mock(RMContext.class); + ContainerAllocationExpirer expirer = + mock(ContainerAllocationExpirer.class); + DrainDispatcher drainDispatcher = new DrainDispatcher(); + when(rmContext.getContainerAllocationExpirer()).thenReturn(expirer); + when(rmContext.getDispatcher()).thenReturn(drainDispatcher); + when(rmContext.getRMApplicationHistoryWriter()).thenReturn(writer); + when(rmContext.getSystemMetricsPublisher()).thenReturn(publisher); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + app_0.getApplicationId(), 1); + ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1); + Container container = TestUtils.getMockContainer(containerId, + node_1.getNodeID(), Resources.createResource(2*GB), priorityMap); + RMContainer rmContainer = new RMContainerImpl(container, appAttemptId, + node_1.getNodeID(), "user", rmContext); + + // nothing reserved + boolean res = a.findNodeToUnreserve(csContext.getClusterResource(), + node_1, app_0, priorityMap, capability); + assertFalse(res); + + // reserved but scheduler doesn't know about that node. + app_0.reserve(node_1, priorityMap, rmContainer, container); + node_1.reserveResource(app_0, priorityMap, rmContainer); + res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0, + priorityMap, capability); + assertFalse(res); + } + + @Test + public void testAssignToQueue() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(14 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(8 * GB, a.getMetrics().getAvailableMB()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + + // allocate to queue so that the potential new capacity is greater then + // absoluteMaxCapacity + Resource capability = Resources.createResource(32 * GB, 0); + boolean res = a.assignToQueue(clusterResource, capability, app_0, true); + assertFalse(res); + + // now add in reservations and make sure it continues if config set + // allocate to queue so that the potential new capacity is greater then + // absoluteMaxCapacity + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + + capability = Resources.createResource(5 * GB, 0); + res = a + .assignToQueue(clusterResource, capability, app_0, true); + assertTrue(res); + + // tell to not check reservations + res = a.assignToQueue(clusterResource, capability, app_0, false); + assertFalse(res); + + refreshQueuesTurnOffReservationsContLook(a, csConf); + + // should return false no matter what checkReservations is passed + // in since feature is off + res = a.assignToQueue(clusterResource, capability, app_0, false); + assertFalse(res); + + res = a + .assignToQueue(clusterResource, capability, app_0, true); + assertFalse(res); + } + + public void refreshQueuesTurnOffReservationsContLook(LeafQueue a, + CapacitySchedulerConfiguration csConf) throws Exception { + // before reinitialization + assertEquals(true, a.getReservationContinueLooking()); + assertEquals(true, + ((ParentQueue) a.getParent()).getReservationContinueLooking()); + + csConf.setBoolean( + CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false); + Map newQueues = new HashMap(); + CSQueue newRoot = CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, newQueues, queues, + TestUtils.spyHook); + queues = newQueues; + root.reinitialize(newRoot, cs.getClusterResource()); + + // after reinitialization + assertEquals(false, a.getReservationContinueLooking()); + assertEquals(false, + ((ParentQueue) a.getParent()).getReservationContinueLooking()); + } + + @Test + public void testContinueLookingReservationsAfterQueueRefresh() + throws Exception { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'e' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + refreshQueuesTurnOffReservationsContLook(a, csConf); + } + + @Test + public void testAssignToUser() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 2, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(14 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(8 * GB, a.getMetrics().getAvailableMB()); + assertEquals(null, node_0.getReservedContainer()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + + // now add in reservations and make sure it continues if config set + // allocate to queue so that the potential new capacity is greater then + // absoluteMaxCapacity + a.assignContainers(clusterResource, node_0, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(5 * GB, app_0.getCurrentReservation().getMemory()); + + assertEquals(5 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + + // set limit so subtrace reservations it can continue + Resource limit = Resources.createResource(12 * GB, 0); + boolean res = a.assignToUser(clusterResource, user_0, limit, app_0, + true); + assertTrue(res); + + // tell it not to check for reservations and should fail as already over + // limit + res = a.assignToUser(clusterResource, user_0, limit, app_0, false); + assertFalse(res); + + refreshQueuesTurnOffReservationsContLook(a, csConf); + + // should now return false since feature off + res = a.assignToUser(clusterResource, user_0, limit, app_0, true); + assertFalse(res); + } + + @Test + public void testReservationsNoneAvailable() throws Exception { + // Test that we now unreserve and use a node that has space + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + setup(csConf); + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A)); + + // Users + final String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = TestUtils + .getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), rmContext); + + a.submitApplicationAttempt(app_0, user_0); + + final ApplicationAttemptId appAttemptId_1 = TestUtils + .getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), rmContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes + String host_0 = "host_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, + 8 * GB); + String host_1 = "host_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, + 8 * GB); + String host_2 = "host_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, + 8 * GB); + + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); + when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); + when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + + final int numNodes = 3; + Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests + Priority priorityAM = TestUtils.createMockPriority(1); + Priority priorityMap = TestUtils.createMockPriority(5); + Priority priorityReduce = TestUtils.createMockPriority(10); + Priority priorityLast = TestUtils.createMockPriority(12); + + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 2 * GB, 1, true, + priorityAM, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 5 * GB, 1, true, + priorityReduce, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 3 * GB, 2, true, + priorityMap, recordFactory))); + app_0.updateResourceRequests(Collections.singletonList(TestUtils + .createResourceRequest(ResourceRequest.ANY, 8 * GB, 2, true, + priorityLast, recordFactory))); + + // Start testing... + // Only AM + a.assignContainers(clusterResource, node_0, false); + assertEquals(2 * GB, a.getUsedResources().getMemory()); + assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(2 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(22 * GB, a.getMetrics().getAvailableMB()); + assertEquals(2 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map - simulating reduce + a.assignContainers(clusterResource, node_0, false); + assertEquals(5 * GB, a.getUsedResources().getMemory()); + assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(5 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(19 * GB, a.getMetrics().getAvailableMB()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(0 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // Only 1 map to other node - simulating reduce + a.assignContainers(clusterResource, node_1, false); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // try to assign reducer (5G on node 0), but tell it + // it has to unreserve. No room to allocate and shouldn't reserve + // since nothing currently reserved. + a.assignContainers(clusterResource, node_0, true); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // try to assign reducer (5G on node 2), but tell it + // it has to unreserve. Has room but shouldn't reserve + // since nothing currently reserved. + a.assignContainers(clusterResource, node_2, true); + assertEquals(8 * GB, a.getUsedResources().getMemory()); + assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(8 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(16 * GB, a.getMetrics().getAvailableMB()); + assertEquals(16 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(0 * GB, node_2.getUsedResource().getMemory()); + + // let it assign 5G to node_2 + a.assignContainers(clusterResource, node_2, false); + assertEquals(13 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(0 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(11 * GB, a.getMetrics().getAvailableMB()); + assertEquals(11 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + + // reserve 8G node_0 + a.assignContainers(clusterResource, node_0, false); + assertEquals(21 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(8 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + + // try to assign (8G on node 2). No room to allocate, + // continued to try due to having reservation above, + // but hits queue limits so can't reserve anymore. + a.assignContainers(clusterResource, node_2, false); + assertEquals(21 * GB, a.getUsedResources().getMemory()); + assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(8 * GB, a.getMetrics().getReservedMB()); + assertEquals(13 * GB, a.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, a.getMetrics().getAvailableMB()); + assertEquals(3 * GB, app_0.getHeadroom().getMemory()); + assertEquals(5 * GB, node_0.getUsedResource().getMemory()); + assertEquals(3 * GB, node_1.getUsedResource().getMemory()); + assertEquals(5 * GB, node_2.getUsedResource().getMemory()); + } +}