Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CED5E200BC9 for ; Fri, 11 Nov 2016 19:57:28 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CD7C5160B15; Fri, 11 Nov 2016 18:57:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DDDAF160AEE for ; Fri, 11 Nov 2016 19:57:26 +0100 (CET) Received: (qmail 3529 invoked by uid 500); 11 Nov 2016 18:57:24 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 2992 invoked by uid 99); 11 Nov 2016 18:57:24 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Nov 2016 18:57:24 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 279A5F1592; Fri, 11 Nov 2016 18:57:24 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Fri, 11 Nov 2016 18:57:29 -0000 Message-Id: In-Reply-To: <52f720d75a2f444ca97fd872b173e652@git.apache.org> References: <52f720d75a2f444ca97fd872b173e652@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/50] [abbrv] hadoop git commit: YARN-5716. Add global scheduler interface definition and update CapacityScheduler to use it. Contributed by Wangda Tan archived-at: Fri, 11 Nov 2016 18:57:29 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 42a8872..d875969 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -49,8 +49,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -123,6 +127,27 @@ public class TestParentQueue { return application; } + private void applyAllocationToQueue(Resource clusterResource, + int allocatedMem, + CSQueue queue) { + // Call accept & apply for queue + ResourceCommitRequest request = mock(ResourceCommitRequest.class); + when(request.anythingAllocatedOrReserved()).thenReturn(true); + ContainerAllocationProposal allocation = mock( + ContainerAllocationProposal.class); + when(request.getTotalReleasedResource()).thenReturn(Resources.none()); + when(request.getFirstAllocatedOrReservedContainer()).thenReturn(allocation); + SchedulerContainer scontainer = mock(SchedulerContainer.class); + when(allocation.getAllocatedOrReservedContainer()).thenReturn(scontainer); + when(allocation.getAllocatedOrReservedResource()).thenReturn( + Resources.createResource(allocatedMem)); + when(scontainer.getNodePartition()).thenReturn(""); + + if (queue.accept(clusterResource, request)) { + queue.apply(clusterResource, request); + } + } + private void stubQueueAllocation(final CSQueue queue, final Resource clusterResource, final FiCaSchedulerNode node, final int allocation) { @@ -157,7 +182,7 @@ public class TestParentQueue { // Next call - nothing if (allocation > 0) { doReturn(new CSAssignment(Resources.none(), type)).when(queue) - .assignContainers(eq(clusterResource), eq(node), + .assignContainers(eq(clusterResource), any(PlacementSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); // Mock the node's resource availability @@ -168,7 +193,7 @@ public class TestParentQueue { return new CSAssignment(allocatedResource, type); } - }).when(queue).assignContainers(eq(clusterResource), eq(node), + }).when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); } @@ -205,8 +230,8 @@ public class TestParentQueue { setupSingleLevelQueues(csConf); Map queues = new HashMap(); - CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, + CSQueue root = + CapacityScheduler.parseQueue(csContext, csConf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); @@ -245,13 +270,18 @@ public class TestParentQueue { // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G stubQueueAllocation(a, clusterResource, node_1, 2*GB); stubQueueAllocation(b, clusterResource, node_1, 1*GB); - root.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + root.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a, b); - allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(PlacementSet.class), anyResourceLimits(), + any(SchedulingMode.class)); + root.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); @@ -261,11 +291,13 @@ public class TestParentQueue { stubQueueAllocation(b, clusterResource, node_0, 2*GB); root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + root.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); @@ -277,9 +309,9 @@ public class TestParentQueue { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); @@ -287,13 +319,17 @@ public class TestParentQueue { // since A has 3/6G while B has 8/14G stubQueueAllocation(a, clusterResource, node_1, 1*GB); stubQueueAllocation(b, clusterResource, node_1, 1*GB); - root.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + root.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + root.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(a, b); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 4*GB, clusterResource); verifyQueueMetrics(b, 9*GB, clusterResource); } @@ -430,9 +466,9 @@ public class TestParentQueue { setupMultiLevelQueues(csConf); Map queues = new HashMap(); - CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, queues, queues, + CSQueue root = + CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); // Setup some nodes @@ -496,6 +532,8 @@ public class TestParentQueue { stubQueueAllocation(c, clusterResource, node_1, 0*GB); root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + applyAllocationToQueue(clusterResource, 4*GB, + b); verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); verifyQueueMetrics(c, 1*GB, clusterResource); @@ -506,15 +544,27 @@ public class TestParentQueue { stubQueueAllocation(a1, clusterResource, node_0, 1*GB); stubQueueAllocation(b3, clusterResource, node_0, 2*GB); stubQueueAllocation(c, clusterResource, node_0, 2*GB); + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(a, c, b); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + applyAllocationToQueue(clusterResource, 1*GB, a); + + root.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + applyAllocationToQueue(clusterResource, 2*GB, root); + + root.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + applyAllocationToQueue(clusterResource, 2*GB, b); verifyQueueMetrics(a, 1*GB, clusterResource); verifyQueueMetrics(b, 6*GB, clusterResource); verifyQueueMetrics(c, 3*GB, clusterResource); @@ -533,17 +583,28 @@ public class TestParentQueue { stubQueueAllocation(b3, clusterResource, node_2, 1*GB); stubQueueAllocation(b1, clusterResource, node_2, 1*GB); stubQueueAllocation(c, clusterResource, node_2, 1*GB); - root.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + root.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(a, a2, a1, b, c); allocationOrder.verify(a).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(a2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + applyAllocationToQueue(clusterResource, 2*GB, a); + + root.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder.verify(b).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + applyAllocationToQueue(clusterResource, 2*GB, b); + + root.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder.verify(c).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(a, 3*GB, clusterResource); verifyQueueMetrics(b, 8*GB, clusterResource); verifyQueueMetrics(c, 4*GB, clusterResource); @@ -614,27 +675,25 @@ public class TestParentQueue { public void testOffSwitchScheduling() throws Exception { // Setup queue configs setupSingleLevelQueues(csConf); - csConf.setOffSwitchPerHeartbeatLimit(2); - Map queues = new HashMap(); - CSQueue root = - CapacityScheduler.parseQueue(csContext, csConf, null, - CapacitySchedulerConfiguration.ROOT, queues, queues, + CSQueue root = + CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook); // Setup some nodes final int memoryPerNode = 10; final int coresPerNode = 16; final int numNodes = 2; - - FiCaSchedulerNode node_0 = + + FiCaSchedulerNode node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); - FiCaSchedulerNode node_1 = + FiCaSchedulerNode node_1 = TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB); - - final Resource clusterResource = - Resources.createResource(numNodes * (memoryPerNode*GB), + + final Resource clusterResource = + Resources.createResource(numNodes * (memoryPerNode*GB), numNodes * coresPerNode); when(csContext.getNumClusterNodes()).thenReturn(numNodes); @@ -644,50 +703,46 @@ public class TestParentQueue { a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB)); queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage() - .incPending(Resources.createResource(1 * GB)); - - // Simulate returning 2 containers on node_0 before offswitch limit - stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); - stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); + .incPending(Resources.createResource(1 * GB)); - root.assignContainers(clusterResource, node_0, + // Simulate B returning a container on node_0 + stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH); + stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH); + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - InOrder allocationOrder = inOrder(a, b); - allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - verifyQueueMetrics(a, 1*GB, clusterResource); + verifyQueueMetrics(a, 0*GB, clusterResource); verifyQueueMetrics(b, 1*GB, clusterResource); - + // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G // also, B gets a scheduling opportunity since A allocates RACK_LOCAL stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL); stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); - root.assignContainers(clusterResource, node_1, + root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - allocationOrder = inOrder(a, b); - allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - verifyQueueMetrics(a, 3*GB, clusterResource); + InOrder allocationOrder = inOrder(a); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + root.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + allocationOrder = inOrder(b); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 2*GB, clusterResource); - - // Now, B should get the scheduling opportunity - // since A has 2/6G while B has 2/14G, - // A also gets an opportunity because offswitchlimit not reached + + // Now, B should get the scheduling opportunity + // since A has 2/6G while B has 2/14G, + // However, since B returns off-switch, A won't get an opportunity stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL); stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH); - - root.assignContainers(clusterResource, node_0, + root.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b, a); - allocationOrder.verify(b, times(1)).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - allocationOrder.verify(a, times(1)).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); - verifyQueueMetrics(a, 4*GB, clusterResource); + allocationOrder.verify(b).assignContainers(eq(clusterResource), + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + allocationOrder.verify(a).assignContainers(eq(clusterResource), + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); + verifyQueueMetrics(a, 2*GB, clusterResource); verifyQueueMetrics(b, 4*GB, clusterResource); } @@ -743,11 +798,13 @@ public class TestParentQueue { stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH); root.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + root.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); InOrder allocationOrder = inOrder(b2, b3); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 2*GB, clusterResource); @@ -760,9 +817,9 @@ public class TestParentQueue { new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); allocationOrder = inOrder(b3, b2); allocationOrder.verify(b3).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); allocationOrder.verify(b2).assignContainers(eq(clusterResource), - any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class)); + any(PlacementSet.class), anyResourceLimits(), any(SchedulingMode.class)); verifyQueueMetrics(b2, 1*GB, clusterResource); verifyQueueMetrics(b3, 3*GB, clusterResource); http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 8fe85c9..f6caa50 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -30,6 +32,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import com.google.common.collect.ImmutableMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -59,6 +62,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; @@ -190,6 +194,15 @@ public class TestReservations { } static LeafQueue stubLeafQueue(LeafQueue queue) { + ParentQueue parent = (ParentQueue) queue.getParent(); + + if (parent != null) { + // Stub out parent queue's accept and apply. + doReturn(true).when(parent).accept(any(Resource.class), + any(ResourceCommitRequest.class)); + doNothing().when(parent).apply(any(Resource.class), + any(ResourceCommitRequest.class)); + } return queue; } @@ -239,6 +252,12 @@ public class TestReservations { FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, 8 * GB); + Map apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2); + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); @@ -268,8 +287,10 @@ public class TestReservations { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -280,8 +301,10 @@ public class TestReservations { assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(5 * GB, a.getUsedResources().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -292,8 +315,10 @@ public class TestReservations { assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -308,8 +333,10 @@ public class TestReservations { toSchedulerKey(priorityReduce))); // try to assign reducer (5G on node 0 and should reserve) - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -325,8 +352,10 @@ public class TestReservations { toSchedulerKey(priorityReduce))); // assign reducer to node 2 - a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(18 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -343,8 +372,10 @@ public class TestReservations { // node_1 heartbeat and unreserves from node_0 in order to allocate // on node_1 - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(18 * GB, a.getUsedResources().getMemorySize()); assertEquals(18 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -412,6 +443,12 @@ public class TestReservations { when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + Map apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2); + cs.getNodeTracker().addNode(node_0); cs.getNodeTracker().addNode(node_1); cs.getNodeTracker().addNode(node_2); @@ -434,8 +471,10 @@ public class TestReservations { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, app_1.getCurrentConsumption().getMemorySize()); @@ -446,8 +485,10 @@ public class TestReservations { assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize()); assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize()); - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(4 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(2 * GB, app_1.getCurrentConsumption().getMemorySize()); @@ -467,8 +508,10 @@ public class TestReservations { priorityMap, recordFactory))); // add a reservation for app_0 - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(12 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(2 * GB, app_1.getCurrentConsumption().getMemorySize()); @@ -481,8 +524,10 @@ public class TestReservations { // next assignment is beyond user limit for user_0 but it should assign to // app_1 for user_1 - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(14 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(4 * GB, app_1.getCurrentConsumption().getMemorySize()); @@ -544,6 +589,12 @@ public class TestReservations { FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, 8 * GB); + Map apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2); + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); @@ -569,8 +620,10 @@ public class TestReservations { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -581,8 +634,10 @@ public class TestReservations { assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(5 * GB, a.getUsedResources().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -593,8 +648,10 @@ public class TestReservations { assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -609,8 +666,10 @@ public class TestReservations { toSchedulerKey(priorityReduce))); // try to assign reducer (5G on node 0 and should reserve) - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -626,8 +685,10 @@ public class TestReservations { toSchedulerKey(priorityReduce))); // assign reducer to node 2 - a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(18 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -644,8 +705,10 @@ public class TestReservations { // node_1 heartbeat and won't unreserve from node_0, potentially stuck // if AM doesn't handle - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(18 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -706,6 +769,12 @@ public class TestReservations { FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8 * GB); + Map apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1); + cs.getNodeTracker().addNode(node_0); cs.getNodeTracker().addNode(node_1); @@ -733,8 +802,10 @@ public class TestReservations { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -744,8 +815,10 @@ public class TestReservations { assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(5 * GB, a.getUsedResources().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -755,8 +828,10 @@ public class TestReservations { assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -770,8 +845,10 @@ public class TestReservations { toSchedulerKey(priorityReduce))); // try to assign reducer (5G on node 0 and should reserve) - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -786,8 +863,10 @@ public class TestReservations { toSchedulerKey(priorityReduce))); // could allocate but told need to unreserve first - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -983,6 +1062,12 @@ public class TestReservations { when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); + Map apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2); + final int numNodes = 2; Resource clusterResource = Resources.createResource(numNodes * (8 * GB)); when(csContext.getNumClusterNodes()).thenReturn(numNodes); @@ -1004,8 +1089,10 @@ public class TestReservations { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1015,8 +1102,10 @@ public class TestReservations { assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(5 * GB, a.getUsedResources().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1026,8 +1115,10 @@ public class TestReservations { assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1040,8 +1131,10 @@ public class TestReservations { // now add in reservations and make sure it continues if config set // allocate to queue so that the potential new capacity is greater then // absoluteMaxCapacity - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, a.getMetrics().getReservedMB()); @@ -1153,6 +1246,12 @@ public class TestReservations { FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, 8 * GB); + Map apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2); + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); @@ -1178,8 +1277,10 @@ public class TestReservations { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1189,8 +1290,10 @@ public class TestReservations { assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(5 * GB, a.getUsedResources().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1200,8 +1303,10 @@ public class TestReservations { assertEquals(0 * GB, node_1.getAllocatedResource().getMemorySize()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1214,8 +1319,10 @@ public class TestReservations { // now add in reservations and make sure it continues if config set // allocate to queue so that the potential new capacity is greater then // absoluteMaxCapacity - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentReservation().getMemorySize()); @@ -1301,6 +1408,12 @@ public class TestReservations { FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, 8 * GB); + Map apps = ImmutableMap.of( + app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(), + app_1); + Map nodes = ImmutableMap.of(node_0.getNodeID(), + node_0, node_1.getNodeID(), node_1, node_2.getNodeID(), node_2); + when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0); when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1); when(csContext.getNode(node_2.getNodeID())).thenReturn(node_2); @@ -1330,8 +1443,10 @@ public class TestReservations { // Start testing... // Only AM - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(2 * GB, a.getUsedResources().getMemorySize()); assertEquals(2 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1342,8 +1457,10 @@ public class TestReservations { assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize()); // Only 1 map - simulating reduce - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(5 * GB, a.getUsedResources().getMemorySize()); assertEquals(5 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1354,8 +1471,10 @@ public class TestReservations { assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize()); // Only 1 map to other node - simulating reduce - a.assignContainers(clusterResource, node_1, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1370,8 +1489,10 @@ public class TestReservations { // used (8G) + required (5G). It will not reserved since it has to unreserve // some resource. Even with continous reservation looking, we don't allow // unreserve resource to reserve container. - a.assignContainers(clusterResource, node_0, - new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(Resources.createResource(10 * GB)), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1386,8 +1507,10 @@ public class TestReservations { // try to assign reducer (5G on node 0), but tell it's resource limits < // used (8G) + required (5G). It will not reserved since it has to unreserve // some resource. Unfortunately, there's nothing to unreserve. - a.assignContainers(clusterResource, node_2, - new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_2, + new ResourceLimits(Resources.createResource(10 * GB)), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(8 * GB, a.getUsedResources().getMemorySize()); assertEquals(8 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1400,8 +1523,10 @@ public class TestReservations { assertEquals(0 * GB, node_2.getAllocatedResource().getMemorySize()); // let it assign 5G to node_2 - a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(13 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(0 * GB, a.getMetrics().getReservedMB()); @@ -1413,8 +1538,10 @@ public class TestReservations { assertEquals(5 * GB, node_2.getAllocatedResource().getMemorySize()); // reserve 8G node_0 - a.assignContainers(clusterResource, node_0, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(21 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(8 * GB, a.getMetrics().getReservedMB()); @@ -1428,8 +1555,10 @@ public class TestReservations { // try to assign (8G on node 2). No room to allocate, // continued to try due to having reservation above, // but hits queue limits so can't reserve anymore. - a.assignContainers(clusterResource, node_2, - new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + TestUtils.applyResourceCommitRequest(clusterResource, + a.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodes, apps); assertEquals(21 * GB, a.getUsedResources().getMemorySize()); assertEquals(13 * GB, app_0.getCurrentConsumption().getMemorySize()); assertEquals(8 * GB, a.getMetrics().getReservedMB()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java index a60b7ed..e34ee34 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java @@ -24,6 +24,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.util.Map; import java.util.Set; import org.apache.commons.logging.Log; @@ -430,4 +432,28 @@ public class TestUtils { req.setAllocationRequestId(allocationRequestId); return SchedulerRequestKey.create(req); } + + public static void applyResourceCommitRequest(Resource clusterResource, + CSAssignment csAssignment, + final Map nodes, + final Map apps) + throws IOException { + CapacityScheduler cs = new CapacityScheduler() { + @Override + public FiCaSchedulerNode getNode(NodeId nodeId) { + return nodes.get(nodeId); + } + + @Override + public FiCaSchedulerApp getApplicationAttempt( + ApplicationAttemptId applicationAttemptId) { + return apps.get(applicationAttemptId); + } + }; + + cs.setResourceCalculator(new DefaultResourceCalculator()); + + cs.submitResourceCommitRequest(clusterResource, + csAssignment); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/de3b4aac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java index a0bd951..1e61186 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java @@ -103,10 +103,6 @@ public class TestRMWebServicesSchedulerActivities verifyStateOfAllocations(allocations.getJSONObject(i), "finalAllocationState", "ALLOCATED"); verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1"); - } else { - verifyStateOfAllocations(allocations.getJSONObject(i), - "finalAllocationState", "SKIPPED"); - verifyQueueOrder(allocations.getJSONObject(i), "root-a-b"); } } } @@ -409,9 +405,9 @@ public class TestRMWebServicesSchedulerActivities verifyStateOfAllocations(allocations, "finalAllocationState", "ALLOCATED"); - verifyNumberOfNodes(allocations, 6); + verifyNumberOfNodes(allocations, 5); - verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b1"); + verifyQueueOrder(json.getJSONObject("allocations"), "root-b-b1"); } finally { rm.stop(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org