Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7480B200B2D for ; Thu, 16 Jun 2016 20:32:34 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 7326A160A52; Thu, 16 Jun 2016 18:32:34 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6E5EC1602C5 for ; Thu, 16 Jun 2016 20:32:33 +0200 (CEST) Received: (qmail 22420 invoked by uid 500); 16 Jun 2016 18:32:26 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 21523 invoked by uid 99); 16 Jun 2016 18:32:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jun 2016 18:32:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 26090E07F6; Thu, 16 Jun 2016 18:32:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: aengineer@apache.org To: common-commits@hadoop.apache.org Date: Thu, 16 Jun 2016 18:32:41 -0000 Message-Id: <302650afcd414735857a4d4b9231d371@git.apache.org> In-Reply-To: <92d5ec242da743e5b0220e461401c34f@git.apache.org> References: <92d5ec242da743e5b0220e461401c34f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [17/49] hadoop git commit: YARN-5082. Limit ContainerId increase in fair scheduler if the num of node app reserved reached the limit (sandflee via asuresh) archived-at: Thu, 16 Jun 2016 18:32:34 -0000 YARN-5082. Limit ContainerId increase in fair scheduler if the num of node app reserved reached the limit (sandflee via asuresh) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5279af7c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5279af7c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5279af7c Branch: refs/heads/HDFS-7240 Commit: 5279af7cd4afb090da742a96b5786d9dee6224bc Parents: e0f4620 Author: Arun Suresh Authored: Fri Jun 10 22:33:42 2016 -0700 Committer: Arun Suresh Committed: Fri Jun 10 22:33:42 2016 -0700 ---------------------------------------------------------------------- .../scheduler/fair/FSAppAttempt.java | 62 ++++++++++------- .../scheduler/fair/TestFairScheduler.java | 72 ++++++++++++++++++++ 2 files changed, 108 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5279af7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 5b83c9a..634f667 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -73,7 +73,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt = new DefaultResourceCalculator(); private long startTime; - private Priority priority; + private Priority appPriority; private ResourceWeights resourceWeights; private Resource demand = Resources.createResource(0); private FairScheduler scheduler; @@ -107,7 +107,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt this.scheduler = scheduler; this.startTime = scheduler.getClock().getTime(); - this.priority = Priority.newInstance(1); + this.appPriority = Priority.newInstance(1); this.resourceWeights = new ResourceWeights(); } @@ -309,7 +309,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } // default level is NODE_LOCAL - if (! allowedLocalityLevel.containsKey(priority)) { + if (!allowedLocalityLevel.containsKey(priority)) { // add the initial time of priority to prevent comparing with FsApp // startTime and allowedLocalityLevel degrade lastScheduledContainer.put(priority, currentTimeMs); @@ -353,7 +353,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt synchronized public RMContainer allocate(NodeType type, FSSchedulerNode node, Priority priority, ResourceRequest request, - Container container) { + Container reservedContainer) { // Update allowed locality level NodeType allowed = allowedLocalityLevel.get(priority); if (allowed != null) { @@ -373,9 +373,15 @@ public class FSAppAttempt extends SchedulerApplicationAttempt if (getTotalRequiredResources(priority) <= 0) { return null; } + + Container container = reservedContainer; + if (container == null) { + container = + createContainer(node, request.getCapability(), request.getPriority()); + } // Create RMContainer - RMContainer rmContainer = new RMContainerImpl(container, + RMContainer rmContainer = new RMContainerImpl(container, getApplicationAttemptId(), node.getNodeID(), appSchedulingInfo.getUser(), rmContext); ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName()); @@ -485,21 +491,26 @@ public class FSAppAttempt extends SchedulerApplicationAttempt * in {@link FSSchedulerNode}.. * return whether reservation was possible with the current threshold limits */ - private boolean reserve(Priority priority, FSSchedulerNode node, - Container container, NodeType type, boolean alreadyReserved) { + private boolean reserve(ResourceRequest request, FSSchedulerNode node, + Container reservedContainer, NodeType type) { + Priority priority = request.getPriority(); if (!reservationExceedsThreshold(node, type)) { LOG.info("Making reservation: node=" + node.getNodeName() + " app_id=" + getApplicationId()); - if (!alreadyReserved) { - getMetrics().reserveResource(getUser(), container.getResource()); + if (reservedContainer == null) { + reservedContainer = + createContainer(node, request.getCapability(), + request.getPriority()); + getMetrics().reserveResource(getUser(), + reservedContainer.getResource()); RMContainer rmContainer = - super.reserve(node, priority, null, container); + super.reserve(node, priority, null, reservedContainer); node.reserveResource(this, priority, rmContainer); setReservation(node); } else { RMContainer rmContainer = node.getReservedContainer(); - super.reserve(node, priority, rmContainer, container); + super.reserve(node, priority, rmContainer, reservedContainer); node.reserveResource(this, priority, rmContainer); setReservation(node); } @@ -615,18 +626,17 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // How much does the node have? Resource available = node.getUnallocatedResource(); - Container container = null; + Container reservedContainer = null; if (reserved) { - container = node.getReservedContainer().getContainer(); - } else { - container = createContainer(node, capability, request.getPriority()); + reservedContainer = node.getReservedContainer().getContainer(); } // Can we allocate a container on this node? if (Resources.fitsIn(capability, available)) { // Inform the application of the new container for this request RMContainer allocatedContainer = - allocate(type, node, request.getPriority(), request, container); + allocate(type, node, request.getPriority(), request, + reservedContainer); if (allocatedContainer == null) { // Did the application need this resource? if (reserved) { @@ -647,30 +657,30 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // the AM. Set the amResource for this app and update the leaf queue's AM // usage if (!isAmRunning() && !getUnmanagedAM()) { - setAMResource(container.getResource()); - getQueue().addAMResourceUsage(container.getResource()); + setAMResource(capability); + getQueue().addAMResourceUsage(capability); setAmRunning(true); } - return container.getResource(); + return capability; } // The desired container won't fit here, so reserve - if (isReservable(container) && - reserve(request.getPriority(), node, container, type, reserved)) { + if (isReservable(capability) && + reserve(request, node, reservedContainer, type)) { return FairScheduler.CONTAINER_RESERVED; } else { if (LOG.isDebugEnabled()) { - LOG.debug("Not creating reservation as container " + container.getId() - + " is not reservable"); + LOG.debug("Couldn't creating reservation for " + + getName() + ",at priority " + request.getPriority()); } return Resources.none(); } } - private boolean isReservable(Container container) { + private boolean isReservable(Resource capacity) { return scheduler.isAtLeastReservationThreshold( - getQueue().getPolicy().getResourceCalculator(), container.getResource()); + getQueue().getPolicy().getResourceCalculator(), capacity); } private boolean hasNodeOrRackLocalRequests(Priority priority) { @@ -907,7 +917,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt public Priority getPriority() { // Right now per-app priorities are not passed to scheduler, // so everyone has the same priority. - return priority; + return appPriority; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/5279af7c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 3e5a40f..ec77a9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -4480,4 +4480,76 @@ public class TestFairScheduler extends FairSchedulerTestBase { resourceManager.getResourceScheduler().handle(nodeAddEvent1); return nm; } + + @Test(timeout = 120000) + public void testContainerAllocationWithContainerIdLeap() throws Exception { + conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add two node + RMNode node1 = MockNodes.newNodeInfo(1, + Resources.createResource(3072, 10), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = MockNodes.newNodeInfo(1, + Resources.createResource(3072, 10), 1, "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + ApplicationAttemptId app1 = + createSchedulingRequest(2048, "queue1", "user1", 2); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + + ApplicationAttemptId app2 = + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + + assertEquals(4096, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + //container will be reserved at node1 + RMContainer reservedContainer1 = + scheduler.getSchedulerNode(node1.getNodeID()).getReservedContainer(); + assertNotEquals(reservedContainer1, null); + RMContainer reservedContainer2 = + scheduler.getSchedulerNode(node2.getNodeID()).getReservedContainer(); + assertEquals(reservedContainer2, null); + + for (int i = 0; i < 10; i++) { + scheduler.handle(new NodeUpdateSchedulerEvent(node1)); + scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + } + + // release resource + scheduler.handle(new AppAttemptRemovedSchedulerEvent( + app1, RMAppAttemptState.KILLED, false)); + + assertEquals(0, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + // container will be allocated at node2 + scheduler.handle(new NodeUpdateSchedulerEvent(node2)); + assertEquals(scheduler.getSchedulerApp(app2). + getLiveContainers().size(), 1); + + long maxId = 0; + for (RMContainer container : + scheduler.getSchedulerApp(app2).getLiveContainers()) { + assertTrue( + container.getContainer().getNodeId().equals(node2.getNodeID())); + if (container.getContainerId().getContainerId() > maxId) { + maxId = container.getContainerId().getContainerId(); + } + } + + long reservedId = reservedContainer1.getContainerId().getContainerId(); + assertEquals(reservedId + 1, maxId); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org