Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 78532177B3 for ; Mon, 6 Oct 2014 17:39:22 +0000 (UTC) Received: (qmail 27863 invoked by uid 500); 6 Oct 2014 17:39:17 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 27685 invoked by uid 500); 6 Oct 2014 17:39:17 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 27167 invoked by uid 99); 6 Oct 2014 17:39:17 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Oct 2014 17:39:17 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 29EB9320ADB; Mon, 6 Oct 2014 17:39:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cdouglas@apache.org To: common-commits@hadoop.apache.org Date: Mon, 06 Oct 2014 17:39:31 -0000 Message-Id: <4451d3de16cc4fb5a653c4bd4f72057d@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/33] git commit: YARN-2475. Logic for responding to capacity drops for the ReservationSystem. Contributed by Carlo Curino and Subru Krishnan. (cherry picked from commit f83a07f266f2c5e6eead554d8a331ed7e75e10d5) (cherry picked from commit 1c6950354f3c3 YARN-2475. Logic for responding to capacity drops for the ReservationSystem. Contributed by Carlo Curino and Subru Krishnan. (cherry picked from commit f83a07f266f2c5e6eead554d8a331ed7e75e10d5) (cherry picked from commit 1c6950354f3c35a7824770dc251d5aec3be4876a) (cherry picked from commit b81f571e6064cee119bcbfed090ab646cdd927a7) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/950da32b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/950da32b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/950da32b Branch: refs/heads/branch-2.6 Commit: 950da32b4c08865cac5d95c4ed5fa9576e271e5b Parents: 5ac08c5 Author: carlo curino Authored: Fri Sep 12 16:52:54 2014 -0700 Committer: Chris Douglas Committed: Mon Oct 6 10:29:12 2014 -0700 ---------------------------------------------------------------------- YARN-1051-CHANGES.txt | 5 + .../reservation/SimpleCapacityReplanner.java | 95 ++++++++++++ .../TestSimpleCapacityReplanner.java | 149 +++++++++++++++++++ 3 files changed, 249 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/950da32b/YARN-1051-CHANGES.txt ---------------------------------------------------------------------- diff --git a/YARN-1051-CHANGES.txt b/YARN-1051-CHANGES.txt new file mode 100644 index 0000000..9fd4b3b --- /dev/null +++ b/YARN-1051-CHANGES.txt @@ -0,0 +1,5 @@ +YARN-1707. Introduce APIs to add/remove/resize queues in the +CapacityScheduler. (Carlo Curino and Subru Krishnan via curino) + +YARN-2475. Logic for responding to capacity drops for the +ReservationSystem. (Carlo Curino and Subru Krishnan via curino) http://git-wip-us.apache.org/repos/asf/hadoop/blob/950da32b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java new file mode 100644 index 0000000..8384538 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java @@ -0,0 +1,95 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.UTCClock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This (re)planner scan a period of time from now to a maximum time window (or + * the end of the last session, whichever comes first) checking the overall + * capacity is not violated. + * + * It greedily removes sessions in reversed order of acceptance (latest accepted + * is the first removed). + */ +public class SimpleCapacityReplanner implements Planner { + + private static final Log LOG = LogFactory + .getLog(SimpleCapacityReplanner.class); + + private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); + + private final Clock clock; + + // this allows to control to time-span of this replanning + // far into the future time instants might be worth replanning for + // later on + private long lengthOfCheckZone; + + public SimpleCapacityReplanner() { + this(new UTCClock()); + } + + @VisibleForTesting + SimpleCapacityReplanner(Clock clock) { + this.clock = clock; + } + + @Override + public void init(String planQueueName, CapacitySchedulerConfiguration conf) { + this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName); + } + + @Override + public void plan(Plan plan, List contracts) + throws PlanningException { + + if (contracts != null) { + throw new RuntimeException( + "SimpleCapacityReplanner cannot handle new reservation contracts"); + } + + ResourceCalculator resCalc = plan.getResourceCalculator(); + Resource totCap = plan.getTotalCapacity(); + long now = clock.getTime(); + + // loop on all moment in time from now to the end of the check Zone + // or the end of the planned sessions whichever comes first + for (long t = now; (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); t += + plan.getStep()) { + Resource excessCap = + Resources.subtract(plan.getTotalCommittedResources(t), totCap); + // if we are violating + if (Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE)) { + // sorted on reverse order of acceptance, so newest reservations first + Set curReservations = + new TreeSet(plan.getReservationsAtTime(t)); + for (Iterator resIter = + curReservations.iterator(); resIter.hasNext() + && Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE);) { + ReservationAllocation reservation = resIter.next(); + plan.deleteReservation(reservation.getReservationId()); + excessCap = + Resources.subtract(excessCap, reservation.getResourcesAtTime(t)); + LOG.info("Removing reservation " + reservation.getReservationId() + + " to repair physical-resource constraints in the plan: " + + plan.getQueueName()); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/950da32b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java new file mode 100644 index 0000000..f2313e6 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSimpleCapacityReplanner.java @@ -0,0 +1,149 @@ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashSet; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.junit.Test; + +public class TestSimpleCapacityReplanner { + + @Test + public void testReplanningPlanCapacityLoss() throws PlanningException { + + Resource clusterCapacity = Resource.newInstance(100 * 1024, 10); + Resource minAlloc = Resource.newInstance(1024, 1); + Resource maxAlloc = Resource.newInstance(1024 * 8, 8); + + ResourceCalculator res = new DefaultResourceCalculator(); + long step = 1L; + Clock clock = mock(Clock.class); + ReservationAgent agent = mock(ReservationAgent.class); + + SharingPolicy policy = new NoOverCommitPolicy(); + policy.init("root.dedicated", null, new HashSet()); + + QueueMetrics queueMetrics = mock(QueueMetrics.class); + + when(clock.getTime()).thenReturn(0L); + SimpleCapacityReplanner enf = new SimpleCapacityReplanner(clock); + + CapacitySchedulerConfiguration conf = + mock(CapacitySchedulerConfiguration.class); + when(conf.getEnforcementWindow(any(String.class))).thenReturn(6L); + + conf.setLong(CapacitySchedulerConfiguration.PREFIX + "blah" + + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.RESERVATION_ENFORCEMENT_WINDOW, 6); + enf.init("blah", conf); + + // Initialize the plan with more resources + InMemoryPlan plan = + new InMemoryPlan(queueMetrics, policy, agent, clusterCapacity, step, + res, minAlloc, maxAlloc, "dedicated", enf, true, clock); + + // add reservation filling the plan (separating them 1ms, so we are sure + // s2 follows s1 on acceptance + long ts = System.currentTimeMillis(); + ReservationId r1 = ReservationId.newInstance(ts, 1); + int[] f5 = { 20, 20, 20, 20, 20 }; + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r1, null, "u3", + "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, + minAlloc))); + when(clock.getTime()).thenReturn(1L); + ReservationId r2 = ReservationId.newInstance(ts, 2); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r2, null, "u4", + "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, + minAlloc))); + when(clock.getTime()).thenReturn(2L); + ReservationId r3 = ReservationId.newInstance(ts, 3); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r3, null, "u5", + "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, + minAlloc))); + when(clock.getTime()).thenReturn(3L); + ReservationId r4 = ReservationId.newInstance(ts, 4); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r4, null, "u6", + "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, + minAlloc))); + when(clock.getTime()).thenReturn(4L); + ReservationId r5 = ReservationId.newInstance(ts, 5); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r5, null, "u7", + "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, + minAlloc))); + + int[] f6 = { 50, 50, 50, 50, 50 }; + ReservationId r6 = ReservationId.newInstance(ts, 6); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r6, null, "u3", + "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res, + minAlloc))); + when(clock.getTime()).thenReturn(6L); + ReservationId r7 = ReservationId.newInstance(ts, 7); + assertTrue(plan.toString(), + plan.addReservation(new InMemoryReservationAllocation(r7, null, "u4", + "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res, + minAlloc))); + + // remove some of the resources (requires replanning) + plan.setTotalCapacity(Resource.newInstance(70 * 1024, 70)); + + when(clock.getTime()).thenReturn(0L); + + // run the replanner + enf.plan(plan, null); + + // check which reservation are still present + assertNotNull(plan.getReservationById(r1)); + assertNotNull(plan.getReservationById(r2)); + assertNotNull(plan.getReservationById(r3)); + assertNotNull(plan.getReservationById(r6)); + assertNotNull(plan.getReservationById(r7)); + + // and which ones are removed + assertNull(plan.getReservationById(r4)); + assertNull(plan.getReservationById(r5)); + + // check resources at each moment in time no more exceed capacity + for (int i = 0; i < 20; i++) { + int tot = 0; + for (ReservationAllocation r : plan.getReservationsAtTime(i)) { + tot = r.getResourcesAtTime(i).getMemory(); + } + assertTrue(tot <= 70 * 1024); + } + } + + private Map generateAllocation( + int startTime, int[] alloc) { + Map req = + new TreeMap(); + for (int i = 0; i < alloc.length; i++) { + req.put(new ReservationInterval(startTime + i, startTime + i + 1), + ReservationRequest.newInstance(Resource.newInstance(1024, 1), + alloc[i])); + } + return req; + } + +}