Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5160E2009F3 for ; Thu, 5 May 2016 21:57:07 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 500441609F9; Thu, 5 May 2016 19:57:07 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0BF2C160A06 for ; Thu, 5 May 2016 21:57:04 +0200 (CEST) Received: (qmail 20036 invoked by uid 500); 5 May 2016 19:57:03 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 19551 invoked by uid 99); 5 May 2016 19:57:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 05 May 2016 19:57:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 0DB0FDFF8E; Thu, 5 May 2016 19:57:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jianhe@apache.org To: common-commits@hadoop.apache.org Date: Thu, 05 May 2016 19:57:04 -0000 Message-Id: <1008c89ff46d463ca7c8c39022140bfe@git.apache.org> In-Reply-To: <738331efbbc84193a77cd2a6179a8b06@git.apache.org> References: <738331efbbc84193a77cd2a6179a8b06@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] hadoop git commit: YARN-4390. Do surgical preemption based on reserved container in CapacityScheduler. Contributed by Wangda Tan archived-at: Thu, 05 May 2016 19:57:07 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6b24c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java new file mode 100644 index 0000000..1019548 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java @@ -0,0 +1,689 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Assert; +import org.junit.Before; +import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.TreeSet; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ProportionalCapacityPreemptionPolicyMockFramework { + static final Log LOG = + LogFactory.getLog(TestProportionalCapacityPreemptionPolicyForNodePartitions.class); + final String ROOT = CapacitySchedulerConfiguration.ROOT; + + Map nameToCSQueues = null; + Map partitionToResource = null; + Map nodeIdToSchedulerNodes = null; + RMNodeLabelsManager nlm = null; + RMContext rmContext = null; + + ResourceCalculator rc = new DefaultResourceCalculator(); + Clock mClock = null; + CapacitySchedulerConfiguration conf = null; + CapacityScheduler cs = null; + EventHandler mDisp = null; + ProportionalCapacityPreemptionPolicy policy = null; + Resource clusterResource = null; + + @SuppressWarnings("unchecked") + @Before + public void setup() { + org.apache.log4j.Logger.getRootLogger().setLevel( + org.apache.log4j.Level.DEBUG); + + conf = new CapacitySchedulerConfiguration(new Configuration(false)); + conf.setLong( + CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000); + conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, + 3000); + // report "ideal" preempt + conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, + (float) 1.0); + conf.setFloat( + CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, + (float) 1.0); + + mClock = mock(Clock.class); + cs = mock(CapacityScheduler.class); + when(cs.getResourceCalculator()).thenReturn(rc); + when(cs.getPreemptionManager()).thenReturn(new PreemptionManager()); + when(cs.getConfiguration()).thenReturn(conf); + + nlm = mock(RMNodeLabelsManager.class); + mDisp = mock(EventHandler.class); + + rmContext = mock(RMContext.class); + when(rmContext.getNodeLabelManager()).thenReturn(nlm); + Dispatcher disp = mock(Dispatcher.class); + when(rmContext.getDispatcher()).thenReturn(disp); + when(disp.getEventHandler()).thenReturn(mDisp); + when(cs.getRMContext()).thenReturn(rmContext); + + partitionToResource = new HashMap<>(); + nodeIdToSchedulerNodes = new HashMap<>(); + nameToCSQueues = new HashMap<>(); + } + + public void buildEnv(String labelsConfig, String nodesConfig, + String queuesConfig, String appsConfig) throws IOException { + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false); + } + + public void buildEnv(String labelsConfig, String nodesConfig, + String queuesConfig, String appsConfig, + boolean useDominantResourceCalculator) throws IOException { + if (useDominantResourceCalculator) { + when(cs.getResourceCalculator()).thenReturn( + new DominantResourceCalculator()); + } + mockNodeLabelsManager(labelsConfig); + mockSchedulerNodes(nodesConfig); + for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) { + when(cs.getSchedulerNode(nodeId)).thenReturn( + nodeIdToSchedulerNodes.get(nodeId)); + } + List allNodes = new ArrayList<>( + nodeIdToSchedulerNodes.values()); + when(cs.getAllNodes()).thenReturn(allNodes); + ParentQueue root = mockQueueHierarchy(queuesConfig); + when(cs.getRootQueue()).thenReturn(root); + when(cs.getClusterResource()).thenReturn(clusterResource); + mockApplications(appsConfig); + + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, + mClock); + } + + private void mockContainers(String containersConfig, ApplicationAttemptId attemptId, + String queueName, List reservedContainers, + List liveContainers) { + int containerId = 1; + int start = containersConfig.indexOf("=") + 1; + int end = -1; + + while (start < containersConfig.length()) { + while (start < containersConfig.length() + && containersConfig.charAt(start) != '(') { + start++; + } + if (start >= containersConfig.length()) { + throw new IllegalArgumentException( + "Error containers specification, line=" + containersConfig); + } + end = start + 1; + while (end < containersConfig.length() + && containersConfig.charAt(end) != ')') { + end++; + } + if (end >= containersConfig.length()) { + throw new IllegalArgumentException( + "Error containers specification, line=" + containersConfig); + } + + // now we found start/end, get container values + String[] values = containersConfig.substring(start + 1, end).split(","); + if (values.length != 6) { + throw new IllegalArgumentException("Format to define container is:" + + "(priority,resource,host,expression,repeat,reserved)"); + } + Priority pri = Priority.newInstance(Integer.valueOf(values[0])); + Resource res = parseResourceFromString(values[1]); + NodeId host = NodeId.newInstance(values[2], 1); + String exp = values[3]; + int repeat = Integer.valueOf(values[4]); + boolean reserved = Boolean.valueOf(values[5]); + + for (int i = 0; i < repeat; i++) { + Container c = mock(Container.class); + when(c.getResource()).thenReturn(res); + when(c.getPriority()).thenReturn(pri); + RMContainerImpl rmc = mock(RMContainerImpl.class); + when(rmc.getAllocatedNode()).thenReturn(host); + when(rmc.getNodeLabelExpression()).thenReturn(exp); + when(rmc.getAllocatedResource()).thenReturn(res); + when(rmc.getContainer()).thenReturn(c); + when(rmc.getApplicationAttemptId()).thenReturn(attemptId); + when(rmc.getQueueName()).thenReturn(queueName); + final ContainerId cId = ContainerId.newContainerId(attemptId, containerId); + when(rmc.getContainerId()).thenReturn( + cId); + doAnswer(new Answer() { + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + return cId.compareTo(((RMContainer) invocation.getArguments()[0]) + .getContainerId()); + } + }).when(rmc).compareTo(any(RMContainer.class)); + + if (containerId == 1) { + when(rmc.isAMContainer()).thenReturn(true); + } + + if (reserved) { + reservedContainers.add(rmc); + when(rmc.getReservedResource()).thenReturn(res); + } else { + liveContainers.add(rmc); + } + + // Add container to scheduler-node + addContainerToSchedulerNode(host, rmc, reserved); + + // If this is a non-exclusive allocation + String partition = null; + if (exp.isEmpty() + && !(partition = nodeIdToSchedulerNodes.get(host).getPartition()) + .isEmpty()) { + LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); + Map> ignoreExclusivityContainers = + queue.getIgnoreExclusivityRMContainers(); + if (!ignoreExclusivityContainers.containsKey(partition)) { + ignoreExclusivityContainers.put(partition, + new TreeSet()); + } + ignoreExclusivityContainers.get(partition).add(rmc); + } + LOG.debug("add container to app=" + attemptId + " res=" + res + + " node=" + host + " nodeLabelExpression=" + exp + " partition=" + + partition); + + containerId++; + } + + start = end + 1; + } + } + + /** + * Format is: + *
+   * queueName\t  // app1
+   * (priority,resource,host,expression,#repeat,reserved)
+   * (priority,resource,host,expression,#repeat,reserved);
+   * queueName\t  // app2
+   * 
+ */ + private void mockApplications(String appsConfig) { + int id = 1; + for (String a : appsConfig.split(";")) { + String[] strs = a.split("\t"); + String queueName = strs[0]; + + // get containers + List liveContainers = new ArrayList(); + List reservedContainers = new ArrayList(); + ApplicationId appId = ApplicationId.newInstance(0L, id); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); + + mockContainers(strs[1], appAttemptId, queueName, reservedContainers, + liveContainers); + + FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); + when(app.getLiveContainers()).thenReturn(liveContainers); + when(app.getReservedContainers()).thenReturn(reservedContainers); + when(app.getApplicationAttemptId()).thenReturn(appAttemptId); + when(app.getApplicationId()).thenReturn(appId); + when(app.getPriority()).thenReturn(Priority.newInstance(0)); + + // add to LeafQueue + LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); + queue.getApplications().add(app); + + id++; + } + } + + private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container, + boolean isReserved) { + SchedulerNode node = nodeIdToSchedulerNodes.get(nodeId); + assert node != null; + + if (isReserved) { + when(node.getReservedContainer()).thenReturn(container); + } else { + node.getCopiedListOfRunningContainers().add(container); + Resources.subtractFrom(node.getUnallocatedResource(), + container.getAllocatedResource()); + } + } + + /** + * Format is: + * host1=partition[ res=resource]; + * host2=partition[ res=resource]; + */ + private void mockSchedulerNodes(String schedulerNodesConfigStr) + throws IOException { + String[] nodesConfigStrArray = schedulerNodesConfigStr.split(";"); + for (String p : nodesConfigStrArray) { + String[] arr = p.split(" "); + + NodeId nodeId = NodeId.newInstance(arr[0].substring(0, arr[0].indexOf("=")), 1); + String partition = arr[0].substring(arr[0].indexOf("=") + 1, arr[0].length()); + + FiCaSchedulerNode sn = mock(FiCaSchedulerNode.class); + when(sn.getNodeID()).thenReturn(nodeId); + when(sn.getPartition()).thenReturn(partition); + + Resource totalRes = Resources.createResource(0); + if (arr.length > 1) { + String res = arr[1]; + if (res.contains("res=")) { + String resSring = res.substring( + res.indexOf("res=") + "res=".length()); + totalRes = parseResourceFromString(resSring); + } + } + when(sn.getTotalResource()).thenReturn(totalRes); + when(sn.getUnallocatedResource()).thenReturn(Resources.clone(totalRes)); + + // TODO, add settings of killable resources when necessary + when(sn.getTotalKillableResources()).thenReturn(Resources.none()); + + List liveContainers = new ArrayList<>(); + when(sn.getCopiedListOfRunningContainers()).thenReturn(liveContainers); + + nodeIdToSchedulerNodes.put(nodeId, sn); + + LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition); + } + } + + /** + * Format is: + *
+   * partition0=total_resource,exclusivity;
+   * partition1=total_resource,exclusivity;
+   * ...
+   * 
+ */ + private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOException { + String[] partitionConfigArr = nodeLabelsConfigStr.split(";"); + clusterResource = Resources.createResource(0); + for (String p : partitionConfigArr) { + String partitionName = p.substring(0, p.indexOf("=")); + Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1, + p.indexOf(","))); + boolean exclusivity = + Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length())); + when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class))) + .thenReturn(res); + when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity); + + // add to partition to resource + partitionToResource.put(partitionName, res); + LOG.debug("add partition=" + partitionName + " totalRes=" + res + + " exclusivity=" + exclusivity); + Resources.addTo(clusterResource, res); + } + + when(nlm.getClusterNodeLabelNames()).thenReturn( + partitionToResource.keySet()); + } + + private Resource parseResourceFromString(String p) { + String[] resource = p.split(":"); + Resource res; + if (resource.length == 1) { + res = Resources.createResource(Integer.valueOf(resource[0])); + } else { + res = Resources.createResource(Integer.valueOf(resource[0]), + Integer.valueOf(resource[1])); + } + return res; + } + + /** + * Format is: + *
+   * root (=[guaranteed max used pending (reserved)],=..);
+   * -A(...);
+   * --A1(...);
+   * --A2(...);
+   * -B...
+   * 
+ * ";" splits queues, and there should no empty lines, no extra spaces + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + private ParentQueue mockQueueHierarchy(String queueExprs) { + String[] queueExprArray = queueExprs.split(";"); + ParentQueue rootQueue = null; + for (int idx = 0; idx < queueExprArray.length; idx++) { + String q = queueExprArray[idx]; + CSQueue queue; + + // Initialize queue + if (isParent(queueExprArray, idx)) { + ParentQueue parentQueue = mock(ParentQueue.class); + queue = parentQueue; + List children = new ArrayList(); + when(parentQueue.getChildQueues()).thenReturn(children); + } else { + LeafQueue leafQueue = mock(LeafQueue.class); + final TreeSet apps = new TreeSet<>( + new Comparator() { + @Override + public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { + return a1.getApplicationId().compareTo(a2.getApplicationId()); + } + }); + when(leafQueue.getApplications()).thenReturn(apps); + OrderingPolicy so = mock(OrderingPolicy.class); + when(so.getPreemptionIterator()).thenAnswer(new Answer() { + public Object answer(InvocationOnMock invocation) { + return apps.descendingIterator(); + } + }); + when(leafQueue.getOrderingPolicy()).thenReturn(so); + + Map> ignorePartitionContainers = + new HashMap<>(); + when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn( + ignorePartitionContainers); + queue = leafQueue; + } + + setupQueue(queue, q, queueExprArray, idx); + if (queue.getQueueName().equals(ROOT)) { + rootQueue = (ParentQueue) queue; + } + } + return rootQueue; + } + + private void setupQueue(CSQueue queue, String q, String[] queueExprArray, + int idx) { + LOG.debug("*** Setup queue, source=" + q); + String queuePath = null; + + int myLevel = getLevel(q); + if (0 == myLevel) { + // It's root + when(queue.getQueueName()).thenReturn(ROOT); + queuePath = ROOT; + } + + String queueName = getQueueName(q); + when(queue.getQueueName()).thenReturn(queueName); + + // Setup parent queue, and add myself to parentQueue.children-list + ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel); + if (null != parentQueue) { + when(queue.getParent()).thenReturn(parentQueue); + parentQueue.getChildQueues().add(queue); + + // Setup my path + queuePath = parentQueue.getQueuePath() + "." + queueName; + } + when(queue.getQueuePath()).thenReturn(queuePath); + + QueueCapacities qc = new QueueCapacities(0 == myLevel); + ResourceUsage ru = new ResourceUsage(); + + when(queue.getQueueCapacities()).thenReturn(qc); + when(queue.getQueueResourceUsage()).thenReturn(ru); + + LOG.debug("Setup queue, name=" + queue.getQueueName() + " path=" + + queue.getQueuePath()); + LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue + .getQueueName())); + + // Setup other fields like used resource, guaranteed resource, etc. + String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")")); + for (String s : capacitySettingStr.split(",")) { + String partitionName = s.substring(0, s.indexOf("=")); + String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" "); + // Add a small epsilon to capacities to avoid truncate when doing + // Resources.multiply + float epsilon = 1e-6f; + Resource totResoucePerPartition = partitionToResource.get(partitionName); + float absGuaranteed = Resources.divide(rc, totResoucePerPartition, + parseResourceFromString(values[0].trim()), totResoucePerPartition) + + epsilon; + float absMax = Resources.divide(rc, totResoucePerPartition, + parseResourceFromString(values[1].trim()), totResoucePerPartition) + + epsilon; + float absUsed = Resources.divide(rc, totResoucePerPartition, + parseResourceFromString(values[2].trim()), totResoucePerPartition) + + epsilon; + Resource pending = parseResourceFromString(values[3].trim()); + qc.setAbsoluteCapacity(partitionName, absGuaranteed); + qc.setAbsoluteMaximumCapacity(partitionName, absMax); + qc.setAbsoluteUsedCapacity(partitionName, absUsed); + ru.setPending(partitionName, pending); + if (!isParent(queueExprArray, idx)) { + LeafQueue lq = (LeafQueue) queue; + when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), + isA(String.class))).thenReturn(pending); + } + ru.setUsed(partitionName, parseResourceFromString(values[2].trim())); + + // Setup reserved resource if it contained by input config + Resource reserved = Resources.none(); + if(values.length == 5) { + reserved = parseResourceFromString(values[4].trim()); + ru.setReserved(partitionName, reserved); + } + LOG.debug("Setup queue=" + queueName + " partition=" + partitionName + + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax + + ",abs_used" + absUsed + ",pending_resource=" + pending + + ", reserved_resource=" + reserved + "]"); + } + + // Setup preemption disabled + when(queue.getPreemptionDisabled()).thenReturn( + conf.getPreemptionDisabled(queuePath, false)); + + nameToCSQueues.put(queueName, queue); + when(cs.getQueue(eq(queueName))).thenReturn(queue); + } + + /** + * Level of a queue is how many "-" at beginning, root's level is 0 + */ + private int getLevel(String q) { + int level = 0; // level = how many "-" at beginning + while (level < q.length() && q.charAt(level) == '-') { + level++; + } + return level; + } + + private String getQueueName(String q) { + int idx = 0; + // find first != '-' char + while (idx < q.length() && q.charAt(idx) == '-') { + idx++; + } + if (idx == q.length()) { + throw new IllegalArgumentException("illegal input:" + q); + } + // name = after '-' and before '(' + String name = q.substring(idx, q.indexOf('(')); + if (name.isEmpty()) { + throw new IllegalArgumentException("queue name shouldn't be empty:" + q); + } + if (name.contains(".")) { + throw new IllegalArgumentException("queue name shouldn't contain '.':" + + name); + } + return name; + } + + private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) { + idx--; + while (idx >= 0) { + int level = getLevel(queueExprArray[idx]); + if (level < myLevel) { + String parentQueuName = getQueueName(queueExprArray[idx]); + return (ParentQueue) nameToCSQueues.get(parentQueuName); + } + idx--; + } + + return null; + } + + /** + * Get if a queue is ParentQueue + */ + private boolean isParent(String[] queues, int idx) { + int myLevel = getLevel(queues[idx]); + idx++; + while (idx < queues.length && getLevel(queues[idx]) == myLevel) { + idx++; + } + if (idx >= queues.length || getLevel(queues[idx]) < myLevel) { + // It's a LeafQueue + return false; + } else { + return true; + } + } + + public ApplicationAttemptId getAppAttemptId(int id) { + ApplicationId appId = ApplicationId.newInstance(0L, id); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 1); + return appAttemptId; + } + + public void checkContainerNodesInApp(FiCaSchedulerApp app, + int expectedContainersNumber, String host) { + NodeId nodeId = NodeId.newInstance(host, 1); + int num = 0; + for (RMContainer c : app.getLiveContainers()) { + if (c.getAllocatedNode().equals(nodeId)) { + num++; + } + } + for (RMContainer c : app.getReservedContainers()) { + if (c.getAllocatedNode().equals(nodeId)) { + num++; + } + } + Assert.assertEquals(expectedContainersNumber, num); + } + + public FiCaSchedulerApp getApp(String queueName, int appId) { + for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName)) + .getApplications()) { + if (app.getApplicationId().getId() == appId) { + return app; + } + } + return null; + } + + public void checkAbsCapacities(CSQueue queue, String partition, + float guaranteed, float max, float used) { + QueueCapacities qc = queue.getQueueCapacities(); + Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition), 1e-3); + Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition), 1e-3); + Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition), 1e-3); + } + + public void checkPendingResource(CSQueue queue, String partition, int pending) { + ResourceUsage ru = queue.getQueueResourceUsage(); + Assert.assertEquals(pending, ru.getPending(partition).getMemory()); + } + + public void checkReservedResource(CSQueue queue, String partition, int reserved) { + ResourceUsage ru = queue.getQueueResourceUsage(); + Assert.assertEquals(reserved, ru.getReserved(partition).getMemory()); + } + + static class IsPreemptionRequestForQueueAndNode + extends ArgumentMatcher { + private final ApplicationAttemptId appAttId; + private final String queueName; + private final NodeId nodeId; + + IsPreemptionRequestForQueueAndNode(ApplicationAttemptId appAttId, + String queueName, NodeId nodeId) { + this.appAttId = appAttId; + this.queueName = queueName; + this.nodeId = nodeId; + } + @Override + public boolean matches(Object o) { + ContainerPreemptEvent cpe = (ContainerPreemptEvent)o; + + return appAttId.equals(cpe.getAppId()) + && queueName.equals(cpe.getContainer().getQueueName()) + && nodeId.equals(cpe.getContainer().getAllocatedNode()); + } + @Override + public String toString() { + return appAttId.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6b24c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java index 499a3d0..5b7ac52 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java @@ -62,6 +62,7 @@ import org.mockito.ArgumentMatcher; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; import java.util.Deque; @@ -161,6 +162,11 @@ public class TestProportionalCapacityPreemptionPolicy { mCS = mock(CapacityScheduler.class); when(mCS.getResourceCalculator()).thenReturn(rc); lm = mock(RMNodeLabelsManager.class); + try { + when(lm.isExclusiveNodeLabel(anyString())).thenReturn(true); + } catch (IOException e) { + // do nothing + } when(mCS.getConfiguration()).thenReturn(conf); rmContext = mock(RMContext.class); when(mCS.getRMContext()).thenReturn(rmContext); @@ -650,6 +656,26 @@ public class TestProportionalCapacityPreemptionPolicy { } @Test + public void testHierarchicalWithReserved() { + int[][] qData = new int[][] { + // / A B C D E F + { 200, 100, 50, 50, 100, 10, 90 }, // abs + { 200, 200, 200, 200, 200, 200, 200 }, // maxCap + { 200, 110, 60, 50, 90, 90, 0 }, // used + { 10, 0, 0, 0, 10, 0, 10 }, // pending + { 40, 25, 15, 10, 15, 15, 0 }, // reserved + { 4, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 2, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + // verify capacity taken from A1, not B1 despite B1 being far over + // its absolute guaranteed capacity + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); + } + + @Test public void testZeroGuar() { int[][] qData = new int[][] { // / A B C D E F @@ -934,7 +960,38 @@ public class TestProportionalCapacityPreemptionPolicy { //check the parent can prempt only the extra from > 2 level child TempQueuePerPartition tempQueueAPartition = policy.getQueuePartitions().get("queueA").get(""); assertEquals(0, tempQueueAPartition.untouchableExtra.getMemory()); - int extraForQueueA = tempQueueAPartition.current.getMemory()- tempQueueAPartition.guaranteed.getMemory(); + int extraForQueueA = tempQueueAPartition.getUsed().getMemory() + - tempQueueAPartition.getGuaranteed().getMemory(); + assertEquals(extraForQueueA,tempQueueAPartition.preemptableExtra.getMemory()); + } + + @Test + public void testHierarchicalLarge3LevelsWithReserved() { + int[][] qData = new int[][] { + // / A F I + // B C G H J K + // D E + { 400, 200, 60, 140, 100, 40, 100, 70, 30, 100, 10, 90 }, // abs + { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap + { 400, 210, 60, 150, 100, 50, 100, 50, 50, 90, 10, 80 }, // used + { 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, // pending + { 50, 30, 20, 10, 5, 5, 0, 0, 0, 10, 10, 0 }, // reserved + // appA appB appC appD appE appF appG + { 7, 3, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps + { -1, -1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity + { 3, 2, 0, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues + }; + ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); + policy.editSchedule(); + + verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC))); + assertEquals(10, policy.getQueuePartitions().get("queueE").get("").preemptableExtra.getMemory()); + //2nd level child(E) preempts 10, but parent A has only 9 extra + //check the parent can prempt only the extra from > 2 level child + TempQueuePerPartition tempQueueAPartition = policy.getQueuePartitions().get("queueA").get(""); + assertEquals(0, tempQueueAPartition.untouchableExtra.getMemory()); + int extraForQueueA = tempQueueAPartition.getUsed().getMemory() + - tempQueueAPartition.getGuaranteed().getMemory(); assertEquals(extraForQueueA,tempQueueAPartition.preemptableExtra.getMemory()); } @@ -1058,6 +1115,7 @@ public class TestProportionalCapacityPreemptionPolicy { ParentQueue root = mockParentQueue(null, queues[0], pqs); ResourceUsage resUsage = new ResourceUsage(); resUsage.setUsed(used[0]); + resUsage.setReserved(reserved[0]); when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT); when(root.getAbsoluteUsedCapacity()).thenReturn( Resources.divide(rc, tot, used[0], tot)); @@ -1083,6 +1141,7 @@ public class TestProportionalCapacityPreemptionPolicy { q = mockParentQueue(p, queues[i], pqs); ResourceUsage resUsagePerQueue = new ResourceUsage(); resUsagePerQueue.setUsed(used[i]); + resUsagePerQueue.setReserved(reserved[i]); when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue); } else { q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran); @@ -1159,6 +1218,7 @@ public class TestProportionalCapacityPreemptionPolicy { ResourceUsage ru = new ResourceUsage(); ru.setPending(pending[i]); ru.setUsed(used[i]); + ru.setReserved(reserved[i]); when(lq.getQueueResourceUsage()).thenReturn(ru); // consider moving where CapacityScheduler::comparator accessible final NavigableSet qApps = new TreeSet( http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6b24c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java index 5ffae6e..e31a889 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java @@ -18,231 +18,23 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; -import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor; -import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; -import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.IOException; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.TreeSet; -import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; -import static org.mockito.Matchers.isA; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -public class TestProportionalCapacityPreemptionPolicyForNodePartitions { - private static final Log LOG = - LogFactory.getLog(TestProportionalCapacityPreemptionPolicyForNodePartitions.class); - static final String ROOT = CapacitySchedulerConfiguration.ROOT; - private Map nameToCSQueues = null; - private Map partitionToResource = null; - private Map nodeIdToSchedulerNodes = null; - private RMNodeLabelsManager nlm = null; - private RMContext rmContext = null; - - private ResourceCalculator rc = new DefaultResourceCalculator(); - private Clock mClock = null; - private CapacitySchedulerConfiguration conf = null; - private CapacityScheduler cs = null; - private EventHandler mDisp = null; - private ProportionalCapacityPreemptionPolicy policy = null; - private Resource clusterResource = null; - - @SuppressWarnings("unchecked") +public class TestProportionalCapacityPreemptionPolicyForNodePartitions + extends ProportionalCapacityPreemptionPolicyMockFramework { @Before public void setup() { - org.apache.log4j.Logger.getRootLogger().setLevel( - org.apache.log4j.Level.DEBUG); - - conf = new CapacitySchedulerConfiguration(new Configuration(false)); - conf.setLong( - CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000); - conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL, - 3000); - // report "ideal" preempt - conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, - (float) 1.0); - conf.setFloat( - CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR, - (float) 1.0); - - mClock = mock(Clock.class); - cs = mock(CapacityScheduler.class); - when(cs.getResourceCalculator()).thenReturn(rc); - when(cs.getPreemptionManager()).thenReturn(new PreemptionManager()); - when(cs.getConfiguration()).thenReturn(conf); - - nlm = mock(RMNodeLabelsManager.class); - mDisp = mock(EventHandler.class); - - rmContext = mock(RMContext.class); - when(rmContext.getNodeLabelManager()).thenReturn(nlm); - Dispatcher disp = mock(Dispatcher.class); - when(rmContext.getDispatcher()).thenReturn(disp); - when(disp.getEventHandler()).thenReturn(mDisp); - when(cs.getRMContext()).thenReturn(rmContext); - + super.setup(); policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); - partitionToResource = new HashMap<>(); - nodeIdToSchedulerNodes = new HashMap<>(); - nameToCSQueues = new HashMap<>(); - } - - @Test - public void testBuilder() throws Exception { - /** - * Test of test, make sure we build expected mock schedulable objects - */ - String labelsConfig = - "=200,true;" + // default partition - "red=100,false;" + // partition=red - "blue=200,true"; // partition=blue - String nodesConfig = - "n1=red;" + // n1 has partition=red - "n2=blue;" + // n2 has partition=blue - "n3="; // n3 doesn't have partition - String queuesConfig = - // guaranteed,max,used,pending - "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root - "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a - "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1 - "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2 - "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])"; - String appsConfig= - //queueName\t(priority,resource,host,expression,#repeat,reserved) - // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated) - "a1\t" // app1 in a1 - + "(1,1,n3,red,50,false);" + // 50 * default in n3 - - "a1\t" // app2 in a1 - + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved), - // 50 * ignore-exclusivity (allocated) - + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved), - // 50 in n2 (allocated) - "a2\t" // app3 in a2 - + "(1,1,n3,red,50,false);" + // 50 * default in n3 - - "b\t" // app4 in b - + "(1,1,n1,red,100,false);"; - - buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); - - // Check queues: - // root - checkAbsCapacities(cs.getQueue("root"), "", 1f, 1f, 0.5f); - checkPendingResource(cs.getQueue("root"), "", 100); - checkAbsCapacities(cs.getQueue("root"), "red", 1f, 1f, 1f); - checkPendingResource(cs.getQueue("root"), "red", 100); - checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f); - checkPendingResource(cs.getQueue("root"), "blue", 200); - - // a - checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f); - checkPendingResource(cs.getQueue("a"), "", 100); - checkAbsCapacities(cs.getQueue("a"), "red", 0f, 0f, 0f); - checkPendingResource(cs.getQueue("a"), "red", 0); - checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f); - checkPendingResource(cs.getQueue("a"), "blue", 200); - - // a1 - checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f); - checkPendingResource(cs.getQueue("a1"), "", 100); - checkAbsCapacities(cs.getQueue("a1"), "red", 0f, 0f, 0f); - checkPendingResource(cs.getQueue("a1"), "red", 0); - checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f); - checkPendingResource(cs.getQueue("a1"), "blue", 0); - - // a2 - checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f); - checkPendingResource(cs.getQueue("a2"), "", 0); - checkAbsCapacities(cs.getQueue("a2"), "red", 0f, 0f, 0f); - checkPendingResource(cs.getQueue("a2"), "red", 0); - checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f); - checkPendingResource(cs.getQueue("a2"), "blue", 200); - - // b1 - checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f); - checkPendingResource(cs.getQueue("b"), "", 0); - checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f); - checkPendingResource(cs.getQueue("b"), "red", 100); - checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f); - checkPendingResource(cs.getQueue("b"), "blue", 0); - - // Check ignored partitioned containers in queue - Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1")) - .getIgnoreExclusivityRMContainers().get("blue").size()); - - // Check applications - Assert.assertEquals(2, ((LeafQueue)cs.getQueue("a1")).getApplications().size()); - Assert.assertEquals(1, ((LeafQueue)cs.getQueue("a2")).getApplications().size()); - Assert.assertEquals(1, ((LeafQueue)cs.getQueue("b")).getApplications().size()); - - // Check #containers - FiCaSchedulerApp app1 = getApp("a1", 1); - FiCaSchedulerApp app2 = getApp("a1", 2); - FiCaSchedulerApp app3 = getApp("a2", 3); - FiCaSchedulerApp app4 = getApp("b", 4); - - Assert.assertEquals(50, app1.getLiveContainers().size()); - checkContainerNodesInApp(app1, 50, "n3"); - - Assert.assertEquals(50, app2.getLiveContainers().size()); - Assert.assertEquals(150, app2.getReservedContainers().size()); - checkContainerNodesInApp(app2, 200, "n2"); - - Assert.assertEquals(50, app3.getLiveContainers().size()); - checkContainerNodesInApp(app3, 50, "n3"); - - Assert.assertEquals(100, app4.getLiveContainers().size()); - checkContainerNodesInApp(app4, 100, "n1"); } @Test @@ -822,477 +614,4 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions { verify(mDisp, never()).handle( argThat(new IsPreemptionRequestFor(getAppAttemptId(3)))); } - - private ApplicationAttemptId getAppAttemptId(int id) { - ApplicationId appId = ApplicationId.newInstance(0L, id); - ApplicationAttemptId appAttemptId = - ApplicationAttemptId.newInstance(appId, 1); - return appAttemptId; - } - - private void checkContainerNodesInApp(FiCaSchedulerApp app, - int expectedContainersNumber, String host) { - NodeId nodeId = NodeId.newInstance(host, 1); - int num = 0; - for (RMContainer c : app.getLiveContainers()) { - if (c.getAllocatedNode().equals(nodeId)) { - num++; - } - } - for (RMContainer c : app.getReservedContainers()) { - if (c.getAllocatedNode().equals(nodeId)) { - num++; - } - } - Assert.assertEquals(expectedContainersNumber, num); - } - - private FiCaSchedulerApp getApp(String queueName, int appId) { - for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName)) - .getApplications()) { - if (app.getApplicationId().getId() == appId) { - return app; - } - } - return null; - } - - private void checkAbsCapacities(CSQueue queue, String partition, - float guaranteed, float max, float used) { - QueueCapacities qc = queue.getQueueCapacities(); - Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition), 1e-3); - Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition), 1e-3); - Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition), 1e-3); - } - - private void checkPendingResource(CSQueue queue, String partition, int pending) { - ResourceUsage ru = queue.getQueueResourceUsage(); - Assert.assertEquals(pending, ru.getPending(partition).getMemory()); - } - - private void buildEnv(String labelsConfig, String nodesConfig, - String queuesConfig, String appsConfig) throws IOException { - buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false); - } - - private void buildEnv(String labelsConfig, String nodesConfig, - String queuesConfig, String appsConfig, - boolean useDominantResourceCalculator) throws IOException { - if (useDominantResourceCalculator) { - when(cs.getResourceCalculator()).thenReturn( - new DominantResourceCalculator()); - } - mockNodeLabelsManager(labelsConfig); - mockSchedulerNodes(nodesConfig); - for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) { - when(cs.getSchedulerNode(nodeId)).thenReturn( - nodeIdToSchedulerNodes.get(nodeId)); - } - ParentQueue root = mockQueueHierarchy(queuesConfig); - when(cs.getRootQueue()).thenReturn(root); - when(cs.getClusterResource()).thenReturn(clusterResource); - mockApplications(appsConfig); - - policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, - mClock); - } - - private void mockContainers(String containersConfig, ApplicationAttemptId attemptId, - String queueName, List reservedContainers, - List liveContainers) { - int containerId = 1; - int start = containersConfig.indexOf("=") + 1; - int end = -1; - - while (start < containersConfig.length()) { - while (start < containersConfig.length() - && containersConfig.charAt(start) != '(') { - start++; - } - if (start >= containersConfig.length()) { - throw new IllegalArgumentException( - "Error containers specification, line=" + containersConfig); - } - end = start + 1; - while (end < containersConfig.length() - && containersConfig.charAt(end) != ')') { - end++; - } - if (end >= containersConfig.length()) { - throw new IllegalArgumentException( - "Error containers specification, line=" + containersConfig); - } - - // now we found start/end, get container values - String[] values = containersConfig.substring(start + 1, end).split(","); - if (values.length != 6) { - throw new IllegalArgumentException("Format to define container is:" - + "(priority,resource,host,expression,repeat,reserved)"); - } - Priority pri = Priority.newInstance(Integer.parseInt(values[0])); - Resource res = parseResourceFromString(values[1]); - NodeId host = NodeId.newInstance(values[2], 1); - String exp = values[3]; - int repeat = Integer.parseInt(values[4]); - boolean reserved = Boolean.parseBoolean(values[5]); - - for (int i = 0; i < repeat; i++) { - Container c = mock(Container.class); - when(c.getResource()).thenReturn(res); - when(c.getPriority()).thenReturn(pri); - RMContainerImpl rmc = mock(RMContainerImpl.class); - when(rmc.getAllocatedNode()).thenReturn(host); - when(rmc.getNodeLabelExpression()).thenReturn(exp); - when(rmc.getAllocatedResource()).thenReturn(res); - when(rmc.getContainer()).thenReturn(c); - when(rmc.getApplicationAttemptId()).thenReturn(attemptId); - final ContainerId cId = ContainerId.newContainerId(attemptId, containerId); - when(rmc.getContainerId()).thenReturn( - cId); - doAnswer(new Answer() { - @Override - public Integer answer(InvocationOnMock invocation) throws Throwable { - return cId.compareTo(((RMContainer) invocation.getArguments()[0]) - .getContainerId()); - } - }).when(rmc).compareTo(any(RMContainer.class)); - - if (containerId == 1) { - when(rmc.isAMContainer()).thenReturn(true); - } - - if (reserved) { - reservedContainers.add(rmc); - } else { - liveContainers.add(rmc); - } - - // If this is a non-exclusive allocation - String partition = null; - if (exp.isEmpty() - && !(partition = nodeIdToSchedulerNodes.get(host).getPartition()) - .isEmpty()) { - LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); - Map> ignoreExclusivityContainers = - queue.getIgnoreExclusivityRMContainers(); - if (!ignoreExclusivityContainers.containsKey(partition)) { - ignoreExclusivityContainers.put(partition, - new TreeSet()); - } - ignoreExclusivityContainers.get(partition).add(rmc); - } - LOG.debug("add container to app=" + attemptId + " res=" + res - + " node=" + host + " nodeLabelExpression=" + exp + " partition=" - + partition); - - containerId++; - } - - start = end + 1; - } - } - - /** - * Format is: - *
-   * queueName\t  // app1
-   * (priority,resource,host,expression,#repeat,reserved)
-   * (priority,resource,host,expression,#repeat,reserved);
-   * queueName\t  // app2
-   * 
- */ - private void mockApplications(String appsConfig) { - int id = 1; - for (String a : appsConfig.split(";")) { - String[] strs = a.split("\t"); - String queueName = strs[0]; - - // get containers - List liveContainers = new ArrayList(); - List reservedContainers = new ArrayList(); - ApplicationId appId = ApplicationId.newInstance(0L, id); - ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1); - - mockContainers(strs[1], appAttemptId, queueName, reservedContainers, - liveContainers); - - FiCaSchedulerApp app = mock(FiCaSchedulerApp.class); - when(app.getLiveContainers()).thenReturn(liveContainers); - when(app.getReservedContainers()).thenReturn(reservedContainers); - when(app.getApplicationAttemptId()).thenReturn(appAttemptId); - when(app.getApplicationId()).thenReturn(appId); - when(app.getPriority()).thenReturn(Priority.newInstance(0)); - - // add to LeafQueue - LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName); - queue.getApplications().add(app); - - id++; - } - } - - /** - * Format is: - * host1=partition; - * host2=partition; - */ - private void mockSchedulerNodes(String schedulerNodesConfigStr) - throws IOException { - String[] nodesConfigStrArray = schedulerNodesConfigStr.split(";"); - for (String p : nodesConfigStrArray) { - NodeId nodeId = NodeId.newInstance(p.substring(0, p.indexOf("=")), 1); - String partition = p.substring(p.indexOf("=") + 1, p.length()); - - SchedulerNode sn = mock(SchedulerNode.class); - when(sn.getNodeID()).thenReturn(nodeId); - when(sn.getPartition()).thenReturn(partition); - nodeIdToSchedulerNodes.put(nodeId, sn); - - LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition); - } - } - - /** - * Format is: - *
-   * partition0=total_resource,exclusivity;
-   * partition1=total_resource,exclusivity;
-   * ...
-   * 
- */ - private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOException { - String[] partitionConfigArr = nodeLabelsConfigStr.split(";"); - clusterResource = Resources.createResource(0); - for (String p : partitionConfigArr) { - String partitionName = p.substring(0, p.indexOf("=")); - Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1, - p.indexOf(","))); - boolean exclusivity = - Boolean.parseBoolean(p.substring(p.indexOf(",") + 1, p.length())); - when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class))) - .thenReturn(res); - when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity); - - // add to partition to resource - partitionToResource.put(partitionName, res); - LOG.debug("add partition=" + partitionName + " totalRes=" + res - + " exclusivity=" + exclusivity); - Resources.addTo(clusterResource, res); - } - - when(nlm.getClusterNodeLabelNames()).thenReturn( - partitionToResource.keySet()); - } - - private Resource parseResourceFromString(String p) { - String[] resource = p.split(":"); - Resource res = Resources.createResource(0); - if (resource.length == 1) { - res = Resources.createResource(Integer.parseInt(resource[0])); - } else { - res = Resources.createResource(Integer.parseInt(resource[0]), - Integer.parseInt(resource[1])); - } - return res; - } - - /** - * Format is: - *
-   * root (=[guaranteed max used pending],=..);
-   * -A(...);
-   * --A1(...);
-   * --A2(...);
-   * -B...
-   * 
- * ";" splits queues, and there should no empty lines, no extra spaces - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - private ParentQueue mockQueueHierarchy(String queueExprs) { - String[] queueExprArray = queueExprs.split(";"); - ParentQueue rootQueue = null; - for (int idx = 0; idx < queueExprArray.length; idx++) { - String q = queueExprArray[idx]; - CSQueue queue; - - // Initialize queue - if (isParent(queueExprArray, idx)) { - ParentQueue parentQueue = mock(ParentQueue.class); - queue = parentQueue; - List children = new ArrayList(); - when(parentQueue.getChildQueues()).thenReturn(children); - } else { - LeafQueue leafQueue = mock(LeafQueue.class); - final TreeSet apps = new TreeSet<>( - new Comparator() { - @Override - public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) { - return a1.getApplicationId().compareTo(a2.getApplicationId()); - } - }); - when(leafQueue.getApplications()).thenReturn(apps); - OrderingPolicy so = mock(OrderingPolicy.class); - when(so.getPreemptionIterator()).thenAnswer(new Answer() { - public Object answer(InvocationOnMock invocation) { - return apps.descendingIterator(); - } - }); - when(leafQueue.getOrderingPolicy()).thenReturn(so); - - Map> ignorePartitionContainers = - new HashMap<>(); - when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn( - ignorePartitionContainers); - queue = leafQueue; - } - - setupQueue(queue, q, queueExprArray, idx); - if (queue.getQueueName().equals(ROOT)) { - rootQueue = (ParentQueue) queue; - } - } - return rootQueue; - } - - private void setupQueue(CSQueue queue, String q, String[] queueExprArray, - int idx) { - LOG.debug("*** Setup queue, source=" + q); - String queuePath = null; - - int myLevel = getLevel(q); - if (0 == myLevel) { - // It's root - when(queue.getQueueName()).thenReturn(ROOT); - queuePath = ROOT; - } - - String queueName = getQueueName(q); - when(queue.getQueueName()).thenReturn(queueName); - - // Setup parent queue, and add myself to parentQueue.children-list - ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel); - if (null != parentQueue) { - when(queue.getParent()).thenReturn(parentQueue); - parentQueue.getChildQueues().add(queue); - - // Setup my path - queuePath = parentQueue.getQueuePath() + "." + queueName; - } - when(queue.getQueuePath()).thenReturn(queuePath); - - QueueCapacities qc = new QueueCapacities(0 == myLevel); - ResourceUsage ru = new ResourceUsage(); - - when(queue.getQueueCapacities()).thenReturn(qc); - when(queue.getQueueResourceUsage()).thenReturn(ru); - - LOG.debug("Setup queue, name=" + queue.getQueueName() + " path=" - + queue.getQueuePath()); - LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue - .getQueueName())); - - // Setup other fields like used resource, guaranteed resource, etc. - String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")")); - for (String s : capacitySettingStr.split(",")) { - String partitionName = s.substring(0, s.indexOf("=")); - String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" "); - // Add a small epsilon to capacities to avoid truncate when doing - // Resources.multiply - float epsilon = 1e-6f; - Resource totResoucePerPartition = partitionToResource.get(partitionName); - float absGuaranteed = Resources.divide(rc, totResoucePerPartition, - parseResourceFromString(values[0].trim()), totResoucePerPartition) - + epsilon; - float absMax = Resources.divide(rc, totResoucePerPartition, - parseResourceFromString(values[1].trim()), totResoucePerPartition) - + epsilon; - float absUsed = Resources.divide(rc, totResoucePerPartition, - parseResourceFromString(values[2].trim()), totResoucePerPartition) - + epsilon; - Resource pending = parseResourceFromString(values[3].trim()); - qc.setAbsoluteCapacity(partitionName, absGuaranteed); - qc.setAbsoluteMaximumCapacity(partitionName, absMax); - qc.setAbsoluteUsedCapacity(partitionName, absUsed); - ru.setPending(partitionName, pending); - if (!isParent(queueExprArray, idx)) { - LeafQueue lq = (LeafQueue) queue; - when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class), - isA(String.class))).thenReturn(pending); - } - ru.setUsed(partitionName, parseResourceFromString(values[2].trim())); - LOG.debug("Setup queue=" + queueName + " partition=" + partitionName - + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax - + ",abs_used" + absUsed + ",pending_resource=" + pending + "]"); - } - - // Setup preemption disabled - when(queue.getPreemptionDisabled()).thenReturn( - conf.getPreemptionDisabled(queuePath, false)); - - nameToCSQueues.put(queueName, queue); - when(cs.getQueue(eq(queueName))).thenReturn(queue); - } - - /** - * Level of a queue is how many "-" at beginning, root's level is 0 - */ - private int getLevel(String q) { - int level = 0; // level = how many "-" at beginning - while (level < q.length() && q.charAt(level) == '-') { - level++; - } - return level; - } - - private String getQueueName(String q) { - int idx = 0; - // find first != '-' char - while (idx < q.length() && q.charAt(idx) == '-') { - idx++; - } - if (idx == q.length()) { - throw new IllegalArgumentException("illegal input:" + q); - } - // name = after '-' and before '(' - String name = q.substring(idx, q.indexOf('(')); - if (name.isEmpty()) { - throw new IllegalArgumentException("queue name shouldn't be empty:" + q); - } - if (name.contains(".")) { - throw new IllegalArgumentException("queue name shouldn't contain '.':" - + name); - } - return name; - } - - private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) { - idx--; - while (idx >= 0) { - int level = getLevel(queueExprArray[idx]); - if (level < myLevel) { - String parentQueuName = getQueueName(queueExprArray[idx]); - return (ParentQueue) nameToCSQueues.get(parentQueuName); - } - idx--; - } - - return null; - } - - /** - * Get if a queue is ParentQueue - */ - private boolean isParent(String[] queues, int idx) { - int myLevel = getLevel(queues[idx]); - idx++; - while (idx < queues.length && getLevel(queues[idx]) == myLevel) { - idx++; - } - if (idx >= queues.length || getLevel(queues[idx]) < myLevel) { - // It's a LeafQueue - return false; - } else { - return true; - } - } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a6b24c62/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java new file mode 100644 index 0000000..38b2e78 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java @@ -0,0 +1,430 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; + +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +public class TestProportionalCapacityPreemptionPolicyForReservedContainers + extends ProportionalCapacityPreemptionPolicyMockFramework { + @Before + public void setup() { + super.setup(); + conf.setBoolean( + CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS, + true); + policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock); + } + + @Test + public void testPreemptionForSimpleReservedContainer() throws IOException { + /** + * The simplest test of reserved container, Queue structure is: + * + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * Guaranteed resource of a/b are 50:50 + * Total cluster resource = 100 + * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each + * container is 1. + * - B has am container at n1, and reserves 1 container with size = 9 at n1, + * so B needs to preempt 9 containers from A at n1 instead of randomly + * preempt from n1 and n2. + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=50;" + + "n2= res=50"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 9 9]);" + //root + "-a(=[50 100 90 0]);" + // a + "-b(=[50 100 10 9 9])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,,45,false)" // 45 in n1 + + "(1,1,n2,,45,false);" + // 45 in n2 + "b\t" // app2 in b + + "(1,1,n1,,1,false)" // AM container in n1 + + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Total 5 preempted from app1 at n1, don't preempt container from other + // app/node + verify(mDisp, times(5)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(5)).handle( + argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n1", 1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testUseReservedAndFifoSelectorTogether() throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * Guaranteed resource of a/b are 30:70 + * Total cluster resource = 100 + * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each + * container is 1. + * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1, + * B also has 20 pending resources. + * so B needs to preempt: + * - 10 containers from n1 (for reserved) + * - 5 containers from n2 for pending resources + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=50;" + + "n2= res=50"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 70 10]);" + //root + "-a(=[30 100 45 0]);" + // a + "-b(=[70 100 55 70 50])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n2,,35,false)" // 35 in n2 + + "(1,1,n1,,10,false);" + // 10 in n1 + "b\t" // app2 in b + + "(1,1,n2,,5,false)" // 5 in n2 + + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(10)).handle( + argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n1", 1)))); + verify(mDisp, times(5)).handle( + argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n2", 1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testReservedSelectorSkipsAMContainer() throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * Guaranteed resource of a/b are 30:70 + * Total cluster resource = 100 + * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each + * container is 1. + * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1, + * B also has 20 pending resources. + * + * Ideally B needs to preempt: + * - 10 containers from n1 (for reserved) + * - 5 containers from n2 for pending resources + * + * However, since one AM container is located at n1 (from queueA), we cannot + * preempt 10 containers from n1 for reserved container. Instead, we will + * preempt 15 containers from n2, since containers from queueA launched in n2 + * are later than containers from queueA launched in n1 (FIFO order of containers) + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=50;" + + "n2= res=50"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 70 10]);" + //root + "-a(=[30 100 45 0]);" + // a + "-b(=[70 100 55 70 50])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,,10,false)" // 10 in n1 + + "(1,1,n2,,35,false);" +// 35 in n2 + "b\t" // app2 in b + + "(1,1,n2,,5,false)" // 5 in n2 + + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(15)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle( + argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n1", 1)))); + verify(mDisp, times(15)).handle( + argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n2", 1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testPreemptionForReservedContainerRespectGuaranteedResource() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * Guaranteed resource of a/b are 85:15 + * Total cluster resource = 100 + * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each + * container is 1. + * - B has am container at n1, and reserves 1 container with size = 9 at n1, + * + * If we preempt 9 containers from queue-A, queue-A will be below its + * guaranteed resource = 90 - 9 = 81 < 85. + * + * So no preemption will take place + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=50;" + + "n2= res=50"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 100 9 9]);" + //root + "-a(=[85 100 90 0]);" + // a + "-b(=[15 100 10 9 9])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,,45,false)" // 45 in n1 + + "(1,1,n2,,45,false);" + // 45 in n2 + "b\t" // app2 in b + + "(1,1,n1,,1,false)" // AM container in n1 + + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(1)))); + verify(mDisp, times(0)).handle(argThat( + new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor( + getAppAttemptId(2)))); + } + + @Test + public void testPreemptionForReservedContainerWhichHasAvailableResource() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * + * Guaranteed resource of a/b are 50:50 + * Total cluster resource = 100 + * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each + * container is 1. + * - B has am container at n1, and reserves 1 container with size = 9 at n1, + * + * So we can get 4 containers preempted after preemption. + * (reserved 5 + preempted 4) = 9 + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=50;" + + "n2= res=50"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 99 9 9]);" + //root + "-a(=[50 100 90 0]);" + // a + "-b(=[50 100 9 9 9])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,1,n1,,45,false)" // 45 in n1 + + "(1,1,n2,,45,false);" + // 45 in n2 + "b\t" // app2 in b + + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Total 4 preempted from app1 at n1, don't preempt container from other + // app/node + verify(mDisp, times(4)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n1", 1)))); + verify(mDisp, times(0)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n2", 1)))); + } + + @Test + public void testPreemptionForReservedContainerWhichHasNondivisibleAvailableResource() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * + * Guaranteed resource of a/b are 50:50 + * Total cluster resource = 100 + * - A has 45 containers on two node, size of each container is 2, + * n1 has 23, n2 has 22 + * - B reserves 1 container with size = 9 at n1, + * + * So we can get 4 containers (total-resource = 8) preempted after + * preemption. Actual required is 3.5, but we need to preempt integer + * number of containers + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = // n1 / n2 has no label + "n1= res=50;" + + "n2= res=50"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 99 9 9]);" + //root + "-a(=[50 100 90 0]);" + // a + "-b(=[50 100 9 9 9])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,2,n1,,24,false)" // 48 in n1 + + "(1,2,n2,,23,false);" + // 46 in n2 + "b\t" // app2 in b + + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // Total 4 preempted from app1 at n1, don't preempt container from other + // app/node + verify(mDisp, times(4)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n1", 1)))); + verify(mDisp, times(0)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n2", 1)))); + } + + @Test + public void testPreemptionForReservedContainerRespectAvailableResources() + throws IOException { + /** + * Queue structure is: + * + *
+     *       root
+     *       /  \
+     *      a    b
+     * 
+ * + * Guaranteed resource of a/b are 50:50 + * Total cluster resource = 100, 4 nodes, 25 on each node + * - A has 10 containers on every node, size of container is 2 + * - B reserves 1 container with size = 9 at n1, + * + * So even if we cannot allocate container for B now, no preemption should + * happen since there're plenty of available resources. + */ + String labelsConfig = + "=100,true;"; + String nodesConfig = + "n1= res=25;" + + "n2= res=25;" + + "n3= res=25;" + + "n4= res=25;"; + String queuesConfig = + // guaranteed,max,used,pending,reserved + "root(=[100 100 89 9 9]);" + //root + "-a(=[50 100 80 0]);" + // a + "-b(=[50 100 9 9 9])"; // b + String appsConfig= + //queueName\t(priority,resource,host,expression,#repeat,reserved) + "a\t" // app1 in a + + "(1,2,n1,,10,false)" // 10 in n1 + + "(1,2,n2,,10,false)" // 10 in n2 + + "(1,2,n3,,10,false)" // 10 in n3 + + "(1,2,n4,,10,false);" + // 10 in n4 + "b\t" // app2 in b + + "(1,9,n1,,1,true)"; // 1 container with size=5 reserved at n1 + + buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig); + policy.editSchedule(); + + // No preemption should happen + verify(mDisp, times(0)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n1", 1)))); + verify(mDisp, times(0)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n2", 1)))); + verify(mDisp, times(0)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n3", 1)))); + verify(mDisp, times(0)).handle(argThat( + new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a", + NodeId.newInstance("n4", 1)))); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org