Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5ADAB18920 for ; Mon, 27 Jul 2015 20:08:11 +0000 (UTC) Received: (qmail 84108 invoked by uid 500); 27 Jul 2015 20:07:46 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 83456 invoked by uid 500); 27 Jul 2015 20:07:45 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 81020 invoked by uid 99); 27 Jul 2015 20:07:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Jul 2015 20:07:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 36E48E08E0; Mon, 27 Jul 2015 20:07:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjshen@apache.org To: common-commits@hadoop.apache.org Date: Mon, 27 Jul 2015 20:08:29 -0000 Message-Id: In-Reply-To: <0e7baf2391424ef092872afc9ad1fa83@git.apache.org> References: <0e7baf2391424ef092872afc9ad1fa83@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [47/50] [abbrv] hadoop git commit: YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino) YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. (Jonathan Yaniv and Ishai Menache via curino) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d32b8b9c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d32b8b9c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d32b8b9c Branch: refs/heads/YARN-2928 Commit: d32b8b9c76199891d26e941ca3c2d5994e0af5ac Parents: a02cd15 Author: ccurino Authored: Sat Jul 25 07:39:47 2015 -0700 Committer: Zhijie Shen Committed: Mon Jul 27 12:57:38 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../reservation/AbstractReservationSystem.java | 2 + .../reservation/GreedyReservationAgent.java | 390 --------- .../reservation/InMemoryPlan.java | 13 +- .../InMemoryReservationAllocation.java | 8 +- .../resourcemanager/reservation/Plan.java | 1 + .../reservation/PlanContext.java | 2 + .../resourcemanager/reservation/PlanView.java | 31 +- .../resourcemanager/reservation/Planner.java | 47 -- .../RLESparseResourceAllocation.java | 55 +- .../reservation/ReservationAgent.java | 72 -- .../ReservationSchedulerConfiguration.java | 6 +- .../reservation/ReservationSystem.java | 5 +- .../reservation/ReservationSystemUtil.java | 6 +- .../reservation/SimpleCapacityReplanner.java | 113 --- .../planning/AlignedPlannerWithGreedy.java | 123 +++ .../planning/GreedyReservationAgent.java | 97 +++ .../reservation/planning/IterativePlanner.java | 338 ++++++++ .../reservation/planning/Planner.java | 49 ++ .../reservation/planning/PlanningAlgorithm.java | 207 +++++ .../reservation/planning/ReservationAgent.java | 73 ++ .../planning/SimpleCapacityReplanner.java | 118 +++ .../reservation/planning/StageAllocator.java | 55 ++ .../planning/StageAllocatorGreedy.java | 152 ++++ .../planning/StageAllocatorLowCostAligned.java | 360 ++++++++ .../planning/StageEarliestStart.java | 46 ++ .../planning/StageEarliestStartByDemand.java | 106 +++ .../StageEarliestStartByJobArrival.java | 39 + .../planning/TryManyReservationAgents.java | 114 +++ .../reservation/ReservationSystemTestUtil.java | 5 +- .../reservation/TestCapacityOverTimePolicy.java | 2 +- .../TestCapacitySchedulerPlanFollower.java | 1 + .../reservation/TestFairReservationSystem.java | 1 - .../TestFairSchedulerPlanFollower.java | 1 + .../reservation/TestGreedyReservationAgent.java | 604 -------------- .../reservation/TestInMemoryPlan.java | 2 + .../reservation/TestNoOverCommitPolicy.java | 1 + .../TestRLESparseResourceAllocation.java | 51 +- .../TestSchedulerPlanFollowerBase.java | 1 + .../TestSimpleCapacityReplanner.java | 162 ---- .../planning/TestAlignedPlanner.java | 820 +++++++++++++++++++ .../planning/TestGreedyReservationAgent.java | 611 ++++++++++++++ .../planning/TestSimpleCapacityReplanner.java | 170 ++++ 43 files changed, 3634 insertions(+), 1429 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index fa364f1..611fd4b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -262,6 +262,9 @@ Release 2.8.0 - UNRELEASED YARN-2019. Retrospect on decision of making RM crashed if any exception throw in ZKRMStateStore. (Jian He via junping_du) + YARN-3656. LowCost: A Cost-Based Placement Agent for YARN Reservations. + (Jonathan Yaniv and Ishai Menache via curino) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index 8a15ac6..d2603c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -40,6 +40,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java deleted file mode 100644 index 214df1c..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/GreedyReservationAgent.java +++ /dev/null @@ -1,390 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.reservation; - -import java.util.HashMap; -import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.yarn.api.records.ReservationDefinition; -import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.api.records.ReservationRequest; -import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; -import org.apache.hadoop.yarn.util.resource.Resources; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This Agent employs a simple greedy placement strategy, placing the various - * stages of a {@link ReservationRequest} from the deadline moving backward - * towards the arrival. This allows jobs with earlier deadline to be scheduled - * greedily as well. Combined with an opportunistic anticipation of work if the - * cluster is not fully utilized also seems to provide good latency for - * best-effort jobs (i.e., jobs running without a reservation). - * - * This agent does not account for locality and only consider container - * granularity for validation purposes (i.e., you can't exceed max-container - * size). - */ -public class GreedyReservationAgent implements ReservationAgent { - - private static final Logger LOG = LoggerFactory - .getLogger(GreedyReservationAgent.class); - - @Override - public boolean createReservation(ReservationId reservationId, String user, - Plan plan, ReservationDefinition contract) throws PlanningException { - return computeAllocation(reservationId, user, plan, contract, null); - } - - @Override - public boolean updateReservation(ReservationId reservationId, String user, - Plan plan, ReservationDefinition contract) throws PlanningException { - return computeAllocation(reservationId, user, plan, contract, - plan.getReservationById(reservationId)); - } - - @Override - public boolean deleteReservation(ReservationId reservationId, String user, - Plan plan) throws PlanningException { - return plan.deleteReservation(reservationId); - } - - private boolean computeAllocation(ReservationId reservationId, String user, - Plan plan, ReservationDefinition contract, - ReservationAllocation oldReservation) throws PlanningException, - ContractValidationException { - LOG.info("placing the following ReservationRequest: " + contract); - - Resource totalCapacity = plan.getTotalCapacity(); - - // Here we can addd logic to adjust the ResourceDefinition to account for - // system "imperfections" (e.g., scheduling delays for large containers). - - // Align with plan step conservatively (i.e., ceil arrival, and floor - // deadline) - long earliestStart = contract.getArrival(); - long step = plan.getStep(); - if (earliestStart % step != 0) { - earliestStart = earliestStart + (step - (earliestStart % step)); - } - long deadline = - contract.getDeadline() - contract.getDeadline() % plan.getStep(); - - // setup temporary variables to handle time-relations between stages and - // intermediate answers - long curDeadline = deadline; - long oldDeadline = -1; - - Map allocations = - new HashMap(); - RLESparseResourceAllocation tempAssigned = - new RLESparseResourceAllocation(plan.getResourceCalculator(), - plan.getMinimumAllocation()); - - List stages = contract.getReservationRequests() - .getReservationResources(); - ReservationRequestInterpreter type = contract.getReservationRequests() - .getInterpreter(); - - boolean hasGang = false; - - // Iterate the stages in backward from deadline - for (ListIterator li = - stages.listIterator(stages.size()); li.hasPrevious();) { - - ReservationRequest currentReservationStage = li.previous(); - - // validate the RR respect basic constraints - validateInput(plan, currentReservationStage, totalCapacity); - - hasGang |= currentReservationStage.getConcurrency() > 1; - - // run allocation for a single stage - Map curAlloc = - placeSingleStage(plan, tempAssigned, currentReservationStage, - earliestStart, curDeadline, oldReservation, totalCapacity); - - if (curAlloc == null) { - // if we did not find an allocation for the currentReservationStage - // return null, unless the ReservationDefinition we are placing is of - // type ANY - if (type != ReservationRequestInterpreter.R_ANY) { - throw new PlanningException("The GreedyAgent" - + " couldn't find a valid allocation for your request"); - } else { - continue; - } - } else { - - // if we did find an allocation add it to the set of allocations - allocations.putAll(curAlloc); - - // if this request is of type ANY we are done searching (greedy) - // and can return the current allocation (break-out of the search) - if (type == ReservationRequestInterpreter.R_ANY) { - break; - } - - // if the request is of ORDER or ORDER_NO_GAP we constraint the next - // round of allocation to precede the current allocation, by setting - // curDeadline - if (type == ReservationRequestInterpreter.R_ORDER - || type == ReservationRequestInterpreter.R_ORDER_NO_GAP) { - curDeadline = findEarliestTime(curAlloc.keySet()); - - // for ORDER_NO_GAP verify that the allocation found so far has no - // gap, return null otherwise (the greedy procedure failed to find a - // no-gap - // allocation) - if (type == ReservationRequestInterpreter.R_ORDER_NO_GAP - && oldDeadline > 0) { - if (oldDeadline - findLatestTime(curAlloc.keySet()) > plan - .getStep()) { - throw new PlanningException("The GreedyAgent" - + " couldn't find a valid allocation for your request"); - } - } - // keep the variable oldDeadline pointing to the last deadline we - // found - oldDeadline = curDeadline; - } - } - } - - // / If we got here is because we failed to find an allocation for the - // ReservationDefinition give-up and report failure to the user - if (allocations.isEmpty()) { - throw new PlanningException("The GreedyAgent" - + " couldn't find a valid allocation for your request"); - } - - // create reservation with above allocations if not null/empty - - Resource ZERO_RES = Resource.newInstance(0, 0); - - long firstStartTime = findEarliestTime(allocations.keySet()); - - // add zero-padding from arrival up to the first non-null allocation - // to guarantee that the reservation exists starting at arrival - if (firstStartTime > earliestStart) { - allocations.put(new ReservationInterval(earliestStart, - firstStartTime), ZERO_RES); - firstStartTime = earliestStart; - // consider to add trailing zeros at the end for simmetry - } - - // Actually add/update the reservation in the plan. - // This is subject to validation as other agents might be placing - // in parallel and there might be sharing policies the agent is not - // aware off. - ReservationAllocation capReservation = - new InMemoryReservationAllocation(reservationId, contract, user, - plan.getQueueName(), firstStartTime, - findLatestTime(allocations.keySet()), allocations, - plan.getResourceCalculator(), plan.getMinimumAllocation(), hasGang); - if (oldReservation != null) { - return plan.updateReservation(capReservation); - } else { - return plan.addReservation(capReservation); - } - } - - private void validateInput(Plan plan, ReservationRequest rr, - Resource totalCapacity) throws ContractValidationException { - - if (rr.getConcurrency() < 1) { - throw new ContractValidationException("Gang Size should be >= 1"); - } - - if (rr.getNumContainers() <= 0) { - throw new ContractValidationException("Num containers should be >= 0"); - } - - // check that gangSize and numContainers are compatible - if (rr.getNumContainers() % rr.getConcurrency() != 0) { - throw new ContractValidationException( - "Parallelism must be an exact multiple of gang size"); - } - - // check that the largest container request does not exceed - // the cluster-wide limit for container sizes - if (Resources.greaterThan(plan.getResourceCalculator(), totalCapacity, - rr.getCapability(), plan.getMaximumAllocation())) { - throw new ContractValidationException("Individual" - + " capability requests should not exceed cluster's maxAlloc"); - } - } - - /** - * This method actually perform the placement of an atomic stage of the - * reservation. The key idea is to traverse the plan backward for a - * "lease-duration" worth of time, and compute what is the maximum multiple of - * our concurrency (gang) parameter we can fit. We do this and move towards - * previous instant in time until the time-window is exhausted or we placed - * all the user request. - */ - private Map placeSingleStage( - Plan plan, RLESparseResourceAllocation tempAssigned, - ReservationRequest rr, long earliestStart, long curDeadline, - ReservationAllocation oldResAllocation, final Resource totalCapacity) { - - Map allocationRequests = - new HashMap(); - - // compute the gang as a resource and get the duration - Resource gang = Resources.multiply(rr.getCapability(), rr.getConcurrency()); - long dur = rr.getDuration(); - long step = plan.getStep(); - - // ceil the duration to the next multiple of the plan step - if (dur % step != 0) { - dur += (step - (dur % step)); - } - - // we know for sure that this division has no remainder (part of contract - // with user, validate before - int gangsToPlace = rr.getNumContainers() / rr.getConcurrency(); - - int maxGang = 0; - - // loop trying to place until we are done, or we are considering - // an invalid range of times - while (gangsToPlace > 0 && curDeadline - dur >= earliestStart) { - - // as we run along we remember how many gangs we can fit, and what - // was the most constraining moment in time (we will restart just - // after that to place the next batch) - maxGang = gangsToPlace; - long minPoint = curDeadline; - int curMaxGang = maxGang; - - // start placing at deadline (excluded due to [,) interval semantics and - // move backward - for (long t = curDeadline - plan.getStep(); t >= curDeadline - dur - && maxGang > 0; t = t - plan.getStep()) { - - // As we run along we will logically remove the previous allocation for - // this reservation - // if one existed - Resource oldResCap = Resource.newInstance(0, 0); - if (oldResAllocation != null) { - oldResCap = oldResAllocation.getResourcesAtTime(t); - } - - // compute net available resources - Resource netAvailableRes = Resources.clone(totalCapacity); - Resources.addTo(netAvailableRes, oldResCap); - Resources.subtractFrom(netAvailableRes, - plan.getTotalCommittedResources(t)); - Resources.subtractFrom(netAvailableRes, - tempAssigned.getCapacityAtTime(t)); - - // compute maximum number of gangs we could fit - curMaxGang = - (int) Math.floor(Resources.divide(plan.getResourceCalculator(), - totalCapacity, netAvailableRes, gang)); - - // pick the minimum between available resources in this instant, and how - // many gangs we have to place - curMaxGang = Math.min(gangsToPlace, curMaxGang); - - // compare with previous max, and set it. also remember *where* we found - // the minimum (useful for next attempts) - if (curMaxGang <= maxGang) { - maxGang = curMaxGang; - minPoint = t; - } - } - - // if we were able to place any gang, record this, and decrement - // gangsToPlace - if (maxGang > 0) { - gangsToPlace -= maxGang; - - ReservationInterval reservationInt = - new ReservationInterval(curDeadline - dur, curDeadline); - ReservationRequest reservationRequest = - ReservationRequest.newInstance(rr.getCapability(), - rr.getConcurrency() * maxGang, rr.getConcurrency(), - rr.getDuration()); - // remember occupied space (plan is read-only till we find a plausible - // allocation for the entire request). This is needed since we might be - // placing other ReservationRequest within the same - // ReservationDefinition, - // and we must avoid double-counting the available resources - final Resource reservationRes = ReservationSystemUtil.toResource( - reservationRequest); - tempAssigned.addInterval(reservationInt, reservationRes); - allocationRequests.put(reservationInt, reservationRes); - - } - - // reset our new starting point (curDeadline) to the most constraining - // point so far, we will look "left" of that to find more places where - // to schedule gangs (for sure nothing on the "right" of this point can - // fit a full gang. - curDeadline = minPoint; - } - - // if no gangs are left to place we succeed and return the allocation - if (gangsToPlace == 0) { - return allocationRequests; - } else { - // If we are here is becasue we did not manage to satisfy this request. - // So we need to remove unwanted side-effect from tempAssigned (needed - // for ANY). - for (Map.Entry tempAllocation : - allocationRequests.entrySet()) { - tempAssigned.removeInterval(tempAllocation.getKey(), - tempAllocation.getValue()); - } - // and return null to signal failure in this allocation - return null; - } - } - - // finds the leftmost point of this set of ReservationInterval - private long findEarliestTime(Set resInt) { - long ret = Long.MAX_VALUE; - for (ReservationInterval s : resInt) { - if (s.getStartTime() < ret) { - ret = s.getStartTime(); - } - } - return ret; - } - - // finds the rightmost point of this set of ReservationIntervals - private long findLatestTime(Set resInt) { - long ret = Long.MIN_VALUE; - for (ReservationInterval s : resInt) { - if (s.getEndTime() > ret) { - ret = s.getEndTime(); - } - } - return ret; - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java index 50d66cf..abc9c98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -33,6 +33,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.UTCClock; @@ -41,7 +43,12 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class InMemoryPlan implements Plan { +/** + * This class represents an in memory representation of the state of our + * reservation system, and provides accelerated access to both individual + * reservations and aggregate utilization of resources over time. + */ +public class InMemoryPlan implements Plan { private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class); @@ -75,7 +82,7 @@ class InMemoryPlan implements Plan { private Resource totalCapacity; - InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, + public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, ReservationAgent agent, Resource totalCapacity, long step, ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, String queueName, Planner replanner, boolean getMoveOnExpiry) { @@ -83,7 +90,7 @@ class InMemoryPlan implements Plan { maxAlloc, queueName, replanner, getMoveOnExpiry, new UTCClock()); } - InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, + public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, ReservationAgent agent, Resource totalCapacity, long step, ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, String queueName, Planner replanner, boolean getMoveOnExpiry, Clock clock) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java index a4dd23b..42a2243 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java @@ -29,9 +29,9 @@ import org.apache.hadoop.yarn.util.resource.Resources; /** * An in memory implementation of a reservation allocation using the * {@link RLESparseResourceAllocation} - * + * */ -class InMemoryReservationAllocation implements ReservationAllocation { +public class InMemoryReservationAllocation implements ReservationAllocation { private final String planName; private final ReservationId reservationID; @@ -45,7 +45,7 @@ class InMemoryReservationAllocation implements ReservationAllocation { private RLESparseResourceAllocation resourcesOverTime; - InMemoryReservationAllocation(ReservationId reservationID, + public InMemoryReservationAllocation(ReservationId reservationID, ReservationDefinition contract, String user, String planName, long startTime, long endTime, Map allocations, @@ -54,7 +54,7 @@ class InMemoryReservationAllocation implements ReservationAllocation { allocations, calculator, minAlloc, false); } - InMemoryReservationAllocation(ReservationId reservationID, + public InMemoryReservationAllocation(ReservationId reservationID, ReservationDefinition contract, String user, String planName, long startTime, long endTime, Map allocations, http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java index e8e9e29..f7ffbd0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Plan.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; /** * A Plan represents the central data structure of a reservation system that http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java index 6d3506d..94e299e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanContext.java @@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java index b49e99e..be68906 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java @@ -1,26 +1,27 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import java.util.Set; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; /** * This interface provides a read-only view on the allocations made in this http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java deleted file mode 100644 index 57f28ff..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/Planner.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.reservation; - -import java.util.List; - -import org.apache.hadoop.yarn.api.records.ReservationDefinition; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; - -public interface Planner { - - /** - * Update the existing {@link Plan}, by adding/removing/updating existing - * reservations, and adding a subset of the reservation requests in the - * contracts parameter. - * - * @param plan the {@link Plan} to replan - * @param contracts the list of reservation requests - * @throws PlanningException - */ - public void plan(Plan plan, List contracts) - throws PlanningException; - - /** - * Initialize the replanner - * - * @param planQueueName the name of the queue for this plan - * @param conf the scheduler configuration - */ - void init(String planQueueName, ReservationSchedulerConfiguration conf); -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java index 2957cc6..80f2ff7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/RLESparseResourceAllocation.java @@ -38,7 +38,7 @@ import com.google.gson.stream.JsonWriter; /** * This is a run length encoded sparse data structure that maintains resource - * allocations over time + * allocations over time. */ public class RLESparseResourceAllocation { @@ -74,7 +74,7 @@ public class RLESparseResourceAllocation { /** * Add a resource for the specified interval - * + * * @param reservationInterval the interval for which the resource is to be * added * @param totCap the resource to be added @@ -138,7 +138,7 @@ public class RLESparseResourceAllocation { /** * Removes a resource for the specified interval - * + * * @param reservationInterval the interval for which the resource is to be * removed * @param totCap the resource to be removed @@ -189,7 +189,7 @@ public class RLESparseResourceAllocation { /** * Returns the capacity, i.e. total resources allocated at the specified point * of time - * + * * @param tick the time (UTC in ms) at which the capacity is requested * @return the resources allocated at the specified time */ @@ -208,7 +208,7 @@ public class RLESparseResourceAllocation { /** * Get the timestamp of the earliest resource allocation - * + * * @return the timestamp of the first resource allocation */ public long getEarliestStartTime() { @@ -226,7 +226,7 @@ public class RLESparseResourceAllocation { /** * Get the timestamp of the latest resource allocation - * + * * @return the timestamp of the last resource allocation */ public long getLatestEndTime() { @@ -244,7 +244,7 @@ public class RLESparseResourceAllocation { /** * Returns true if there are no non-zero entries - * + * * @return true if there are no allocations or false otherwise */ public boolean isEmpty() { @@ -287,7 +287,7 @@ public class RLESparseResourceAllocation { /** * Returns the JSON string representation of the current resources allocated * over time - * + * * @return the JSON string representation of the current resources allocated * over time */ @@ -312,4 +312,43 @@ public class RLESparseResourceAllocation { } } + /** + * Returns the representation of the current resources allocated over time as + * an interval map. + * + * @return the representation of the current resources allocated over time as + * an interval map. + */ + public Map toIntervalMap() { + + readLock.lock(); + try { + Map allocations = + new TreeMap(); + + // Empty + if (isEmpty()) { + return allocations; + } + + Map.Entry lastEntry = null; + for (Map.Entry entry : cumulativeCapacity.entrySet()) { + + if (lastEntry != null) { + ReservationInterval interval = + new ReservationInterval(lastEntry.getKey(), entry.getKey()); + Resource resource = lastEntry.getValue(); + + allocations.put(interval, resource); + } + + lastEntry = entry; + } + return allocations; + } finally { + readLock.unlock(); + } + + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java deleted file mode 100644 index 6955036..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationAgent.java +++ /dev/null @@ -1,72 +0,0 @@ -/******************************************************************************* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - *******************************************************************************/ -package org.apache.hadoop.yarn.server.resourcemanager.reservation; - -import org.apache.hadoop.yarn.api.records.ReservationDefinition; -import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; - -/** - * An entity that seeks to acquire resources to satisfy an user's contract - */ -public interface ReservationAgent { - - /** - * Create a reservation for the user that abides by the specified contract - * - * @param reservationId the identifier of the reservation to be created. - * @param user the user who wants to create the reservation - * @param plan the Plan to which the reservation must be fitted - * @param contract encapsulates the resources the user requires for his - * session - * - * @return whether the create operation was successful or not - * @throws PlanningException if the session cannot be fitted into the plan - */ - public boolean createReservation(ReservationId reservationId, String user, - Plan plan, ReservationDefinition contract) throws PlanningException; - - /** - * Update a reservation for the user that abides by the specified contract - * - * @param reservationId the identifier of the reservation to be updated - * @param user the user who wants to create the session - * @param plan the Plan to which the reservation must be fitted - * @param contract encapsulates the resources the user requires for his - * reservation - * - * @return whether the update operation was successful or not - * @throws PlanningException if the reservation cannot be fitted into the plan - */ - public boolean updateReservation(ReservationId reservationId, String user, - Plan plan, ReservationDefinition contract) throws PlanningException; - - /** - * Delete an user reservation - * - * @param reservationId the identifier of the reservation to be deleted - * @param user the user who wants to create the reservation - * @param plan the Plan to which the session must be fitted - * - * @return whether the delete operation was successful or not - * @throws PlanningException if the reservation cannot be fitted into the plan - */ - public boolean deleteReservation(ReservationId reservationId, String user, - Plan plan) throws PlanningException; - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java index 2af1ffd..c430b1f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSchedulerConfiguration.java @@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; public abstract class ReservationSchedulerConfiguration extends Configuration { @@ -33,11 +35,11 @@ public abstract class ReservationSchedulerConfiguration extends Configuration { @InterfaceAudience.Private public static final String DEFAULT_RESERVATION_AGENT_NAME = - "org.apache.hadoop.yarn.server.resourcemanager.reservation.GreedyReservationAgent"; + "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.AlignedPlannerWithGreedy"; @InterfaceAudience.Private public static final String DEFAULT_RESERVATION_PLANNER_NAME = - "org.apache.hadoop.yarn.server.resourcemanager.reservation.SimpleCapacityReplanner"; + "org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.SimpleCapacityReplanner"; @InterfaceAudience.Private public static final boolean DEFAULT_RESERVATION_MOVE_ON_EXPIRY = true; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java index cb76dcf..3309693 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java @@ -24,12 +24,13 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationId; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; /** * This interface is the one implemented by any system that wants to support http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java index 8affae4..5562adc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemUtil.java @@ -25,7 +25,11 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.util.HashMap; import java.util.Map; -final class ReservationSystemUtil { +/** + * Simple helper class for static methods used to transform across + * common formats in tests + */ +public final class ReservationSystemUtil { private ReservationSystemUtil() { // not called http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java deleted file mode 100644 index b5a6a99..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/SimpleCapacityReplanner.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.reservation; - -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.TreeSet; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.ReservationDefinition; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; -import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.UTCClock; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; -import org.apache.hadoop.yarn.util.resource.Resources; - -import com.google.common.annotations.VisibleForTesting; - -/** - * This (re)planner scan a period of time from now to a maximum time window (or - * the end of the last session, whichever comes first) checking the overall - * capacity is not violated. - * - * It greedily removes sessions in reversed order of acceptance (latest accepted - * is the first removed). - */ -public class SimpleCapacityReplanner implements Planner { - - private static final Log LOG = LogFactory - .getLog(SimpleCapacityReplanner.class); - - private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); - - private final Clock clock; - - // this allows to control to time-span of this replanning - // far into the future time instants might be worth replanning for - // later on - private long lengthOfCheckZone; - - public SimpleCapacityReplanner() { - this(new UTCClock()); - } - - @VisibleForTesting - SimpleCapacityReplanner(Clock clock) { - this.clock = clock; - } - - @Override - public void init(String planQueueName, - ReservationSchedulerConfiguration conf) { - this.lengthOfCheckZone = conf.getEnforcementWindow(planQueueName); - } - - @Override - public void plan(Plan plan, List contracts) - throws PlanningException { - - if (contracts != null) { - throw new RuntimeException( - "SimpleCapacityReplanner cannot handle new reservation contracts"); - } - - ResourceCalculator resCalc = plan.getResourceCalculator(); - Resource totCap = plan.getTotalCapacity(); - long now = clock.getTime(); - - // loop on all moment in time from now to the end of the check Zone - // or the end of the planned sessions whichever comes first - for (long t = now; (t < plan.getLastEndTime() && t < (now + lengthOfCheckZone)); t += - plan.getStep()) { - Resource excessCap = - Resources.subtract(plan.getTotalCommittedResources(t), totCap); - // if we are violating - if (Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE)) { - // sorted on reverse order of acceptance, so newest reservations first - Set curReservations = - new TreeSet(plan.getReservationsAtTime(t)); - for (Iterator resIter = - curReservations.iterator(); resIter.hasNext() - && Resources.greaterThan(resCalc, totCap, excessCap, ZERO_RESOURCE);) { - ReservationAllocation reservation = resIter.next(); - plan.deleteReservation(reservation.getReservationId()); - excessCap = - Resources.subtract(excessCap, reservation.getResourcesAtTime(t)); - LOG.info("Removing reservation " + reservation.getReservationId() - + " to repair physical-resource constraints in the plan: " - + plan.getQueueName()); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java new file mode 100644 index 0000000..a389928 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/AlignedPlannerWithGreedy.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A planning algorithm that first runs LowCostAligned, and if it fails runs + * Greedy. + */ +public class AlignedPlannerWithGreedy implements ReservationAgent { + + // Default smoothness factor + private static final int DEFAULT_SMOOTHNESS_FACTOR = 10; + + // Log + private static final Logger LOG = LoggerFactory + .getLogger(AlignedPlannerWithGreedy.class); + + // Smoothness factor + private final ReservationAgent planner; + + // Constructor + public AlignedPlannerWithGreedy() { + this(DEFAULT_SMOOTHNESS_FACTOR); + } + + // Constructor + public AlignedPlannerWithGreedy(int smoothnessFactor) { + + // List of algorithms + List listAlg = new LinkedList(); + + // LowCostAligned planning algorithm + ReservationAgent algAligned = + new IterativePlanner(new StageEarliestStartByDemand(), + new StageAllocatorLowCostAligned(smoothnessFactor)); + listAlg.add(algAligned); + + // Greedy planning algorithm + ReservationAgent algGreedy = + new IterativePlanner(new StageEarliestStartByJobArrival(), + new StageAllocatorGreedy()); + listAlg.add(algGreedy); + + // Set planner: + // 1. Attempt to execute algAligned + // 2. If failed, fall back to algGreedy + planner = new TryManyReservationAgents(listAlg); + + } + + @Override + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + LOG.info("placing the following ReservationRequest: " + contract); + + try { + boolean res = + planner.createReservation(reservationId, user, plan, contract); + + if (res) { + LOG.info("OUTCOME: SUCCESS, Reservation ID: " + + reservationId.toString() + ", Contract: " + contract.toString()); + } else { + LOG.info("OUTCOME: FAILURE, Reservation ID: " + + reservationId.toString() + ", Contract: " + contract.toString()); + } + return res; + } catch (PlanningException e) { + LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString() + + ", Contract: " + contract.toString()); + throw e; + } + + } + + @Override + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + LOG.info("updating the following ReservationRequest: " + contract); + + return planner.updateReservation(reservationId, user, plan, contract); + + } + + @Override + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + + LOG.info("removing the following ReservationId: " + reservationId); + + return planner.deleteReservation(reservationId, user, plan); + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java new file mode 100644 index 0000000..db82a66 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/GreedyReservationAgent.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This Agent employs a simple greedy placement strategy, placing the various + * stages of a {@link ReservationDefinition} from the deadline moving backward + * towards the arrival. This allows jobs with earlier deadline to be scheduled + * greedily as well. Combined with an opportunistic anticipation of work if the + * cluster is not fully utilized also seems to provide good latency for + * best-effort jobs (i.e., jobs running without a reservation). + * + * This agent does not account for locality and only consider container + * granularity for validation purposes (i.e., you can't exceed max-container + * size). + */ + +public class GreedyReservationAgent implements ReservationAgent { + + // Log + private static final Logger LOG = LoggerFactory + .getLogger(GreedyReservationAgent.class); + + // Greedy planner + private final ReservationAgent planner = new IterativePlanner( + new StageEarliestStartByJobArrival(), new StageAllocatorGreedy()); + + @Override + public boolean createReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + LOG.info("placing the following ReservationRequest: " + contract); + + try { + boolean res = + planner.createReservation(reservationId, user, plan, contract); + + if (res) { + LOG.info("OUTCOME: SUCCESS, Reservation ID: " + + reservationId.toString() + ", Contract: " + contract.toString()); + } else { + LOG.info("OUTCOME: FAILURE, Reservation ID: " + + reservationId.toString() + ", Contract: " + contract.toString()); + } + return res; + } catch (PlanningException e) { + LOG.info("OUTCOME: FAILURE, Reservation ID: " + reservationId.toString() + + ", Contract: " + contract.toString()); + throw e; + } + + } + + @Override + public boolean updateReservation(ReservationId reservationId, String user, + Plan plan, ReservationDefinition contract) throws PlanningException { + + LOG.info("updating the following ReservationRequest: " + contract); + + return planner.updateReservation(reservationId, user, plan, contract); + + } + + @Override + public boolean deleteReservation(ReservationId reservationId, String user, + Plan plan) throws PlanningException { + + LOG.info("removing the following ReservationId: " + reservationId); + + return planner.deleteReservation(reservationId, user, plan); + + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java new file mode 100644 index 0000000..342c2e7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/IterativePlanner.java @@ -0,0 +1,338 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.HashMap; +import java.util.ListIterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.api.records.ReservationRequest; +import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationInterval; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ContractValidationException; +import org.apache.hadoop.yarn.util.resource.Resources; + +/** + * A planning algorithm consisting of two main phases. The algorithm iterates + * over the job stages in descending order. For each stage, the algorithm: 1. + * Determines an interval [stageArrivalTime, stageDeadline) in which the stage + * is allocated. 2. Computes an allocation for the stage inside the interval. + * + * For ANY and ALL jobs, phase 1 sets the allocation window of each stage to be + * [jobArrival, jobDeadline]. For ORDER and ORDER_NO_GAP jobs, the deadline of + * each stage is set as succcessorStartTime - the starting time of its + * succeeding stage (or jobDeadline if it is the last stage). + * + * The phases are set using the two functions: 1. setAlgEarliestStartTime 2. + * setAlgComputeStageAllocation + */ +public class IterativePlanner extends PlanningAlgorithm { + + // Modifications performed by the algorithm that are not been reflected in the + // actual plan while a request is still pending. + private RLESparseResourceAllocation planModifications; + + // Data extracted from plan + private Map planLoads; + private Resource capacity; + private long step; + + // Job parameters + private ReservationRequestInterpreter jobType; + private long jobArrival; + private long jobDeadline; + + // Phase algorithms + private StageEarliestStart algStageEarliestStart = null; + private StageAllocator algStageAllocator = null; + + // Constructor + public IterativePlanner(StageEarliestStart algEarliestStartTime, + StageAllocator algStageAllocator) { + + setAlgStageEarliestStart(algEarliestStartTime); + setAlgStageAllocator(algStageAllocator); + + } + + @Override + public RLESparseResourceAllocation computeJobAllocation(Plan plan, + ReservationId reservationId, ReservationDefinition reservation) + throws ContractValidationException { + + // Initialize + initialize(plan, reservation); + + // If the job has been previously reserved, logically remove its allocation + ReservationAllocation oldReservation = + plan.getReservationById(reservationId); + if (oldReservation != null) { + ignoreOldAllocation(oldReservation); + } + + // Create the allocations data structure + RLESparseResourceAllocation allocations = + new RLESparseResourceAllocation(plan.getResourceCalculator(), + plan.getMinimumAllocation()); + + // Get a reverse iterator for the set of stages + ListIterator li = + reservation + .getReservationRequests() + .getReservationResources() + .listIterator( + reservation.getReservationRequests().getReservationResources() + .size()); + + // Current stage + ReservationRequest currentReservationStage; + + // Index, points on the current node + int index = + reservation.getReservationRequests().getReservationResources().size(); + + // Stage deadlines + long stageDeadline = stepRoundDown(reservation.getDeadline(), step); + long successorStartingTime = -1; + + // Iterate the stages in reverse order + while (li.hasPrevious()) { + + // Get current stage + currentReservationStage = li.previous(); + index -= 1; + + // Validate that the ReservationRequest respects basic constraints + validateInputStage(plan, currentReservationStage); + + // Compute an adjusted earliestStart for this resource + // (we need this to provision some space for the ORDER contracts) + long stageArrivalTime = reservation.getArrival(); + if (jobType == ReservationRequestInterpreter.R_ORDER + || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { + stageArrivalTime = + computeEarliestStartingTime(plan, reservation, index, + currentReservationStage, stageDeadline); + } + stageArrivalTime = stepRoundUp(stageArrivalTime, step); + stageArrivalTime = Math.max(stageArrivalTime, reservation.getArrival()); + + // Compute the allocation of a single stage + Map curAlloc = + computeStageAllocation(plan, currentReservationStage, + stageArrivalTime, stageDeadline); + + // If we did not find an allocation, return NULL + // (unless it's an ANY job, then we simply continue). + if (curAlloc == null) { + + // If it's an ANY job, we can move to the next possible request + if (jobType == ReservationRequestInterpreter.R_ANY) { + continue; + } + + // Otherwise, the job cannot be allocated + return null; + + } + + // Get the start & end time of the current allocation + Long stageStartTime = findEarliestTime(curAlloc.keySet()); + Long stageEndTime = findLatestTime(curAlloc.keySet()); + + // If we did find an allocation for the stage, add it + for (Entry entry : curAlloc.entrySet()) { + allocations.addInterval(entry.getKey(), entry.getValue()); + } + + // If this is an ANY clause, we have finished + if (jobType == ReservationRequestInterpreter.R_ANY) { + break; + } + + // If ORDER job, set the stageDeadline of the next stage to be processed + if (jobType == ReservationRequestInterpreter.R_ORDER + || jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP) { + + // Verify that there is no gap, in case the job is ORDER_NO_GAP + if (jobType == ReservationRequestInterpreter.R_ORDER_NO_GAP + && successorStartingTime != -1 + && successorStartingTime > stageEndTime) { + + return null; + + } + + // Store the stageStartTime and set the new stageDeadline + successorStartingTime = stageStartTime; + stageDeadline = stageStartTime; + + } + + } + + // If the allocation is empty, return an error + if (allocations.isEmpty()) { + return null; + } + + return allocations; + + } + + protected void initialize(Plan plan, ReservationDefinition reservation) { + + // Get plan step & capacity + capacity = plan.getTotalCapacity(); + step = plan.getStep(); + + // Get job parameters (type, arrival time & deadline) + jobType = reservation.getReservationRequests().getInterpreter(); + jobArrival = stepRoundUp(reservation.getArrival(), step); + jobDeadline = stepRoundDown(reservation.getDeadline(), step); + + // Dirty read of plan load + planLoads = getAllLoadsInInterval(plan, jobArrival, jobDeadline); + + // Initialize the plan modifications + planModifications = + new RLESparseResourceAllocation(plan.getResourceCalculator(), + plan.getMinimumAllocation()); + + } + + private Map getAllLoadsInInterval(Plan plan, long startTime, + long endTime) { + + // Create map + Map loads = new HashMap(); + + // Calculate the load for every time slot between [start,end) + for (long t = startTime; t < endTime; t += step) { + Resource load = plan.getTotalCommittedResources(t); + loads.put(t, load); + } + + // Return map + return loads; + + } + + private void ignoreOldAllocation(ReservationAllocation oldReservation) { + + // If there is no old reservation, return + if (oldReservation == null) { + return; + } + + // Subtract each allocation interval from the planModifications + for (Entry entry : oldReservation + .getAllocationRequests().entrySet()) { + + // Read the entry + ReservationInterval interval = entry.getKey(); + Resource resource = entry.getValue(); + + // Find the actual request + Resource negativeResource = Resources.multiply(resource, -1); + + // Insert it into planModifications as a 'negative' request, to + // represent available resources + planModifications.addInterval(interval, negativeResource); + + } + + } + + private void validateInputStage(Plan plan, ReservationRequest rr) + throws ContractValidationException { + + // Validate concurrency + if (rr.getConcurrency() < 1) { + throw new ContractValidationException("Gang Size should be >= 1"); + } + + // Validate number of containers + if (rr.getNumContainers() <= 0) { + throw new ContractValidationException("Num containers should be > 0"); + } + + // Check that gangSize and numContainers are compatible + if (rr.getNumContainers() % rr.getConcurrency() != 0) { + throw new ContractValidationException( + "Parallelism must be an exact multiple of gang size"); + } + + // Check that the largest container request does not exceed the cluster-wide + // limit for container sizes + if (Resources.greaterThan(plan.getResourceCalculator(), capacity, + rr.getCapability(), plan.getMaximumAllocation())) { + + throw new ContractValidationException( + "Individual capability requests should not exceed cluster's " + + "maxAlloc"); + + } + + } + + // Call algEarliestStartTime() + protected long computeEarliestStartingTime(Plan plan, + ReservationDefinition reservation, int index, + ReservationRequest currentReservationStage, long stageDeadline) { + + return algStageEarliestStart.setEarliestStartTime(plan, reservation, index, + currentReservationStage, stageDeadline); + + } + + // Call algStageAllocator + protected Map computeStageAllocation( + Plan plan, ReservationRequest rr, long stageArrivalTime, + long stageDeadline) { + + return algStageAllocator.computeStageAllocation(plan, planLoads, + planModifications, rr, stageArrivalTime, stageDeadline); + + } + + // Set the algorithm: algStageEarliestStart + public IterativePlanner setAlgStageEarliestStart(StageEarliestStart alg) { + + this.algStageEarliestStart = alg; + return this; // To allow concatenation of setAlg() functions + + } + + // Set the algorithm: algStageAllocator + public IterativePlanner setAlgStageAllocator(StageAllocator alg) { + + this.algStageAllocator = alg; + return this; // To allow concatenation of setAlg() functions + + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d32b8b9c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java new file mode 100644 index 0000000..abac6ac --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/Planner.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation.planning; + +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ReservationDefinition; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; + +public interface Planner { + + /** + * Update the existing {@link Plan}, by adding/removing/updating existing + * reservations, and adding a subset of the reservation requests in the + * contracts parameter. + * + * @param plan the {@link Plan} to replan + * @param contracts the list of reservation requests + * @throws PlanningException + */ + public void plan(Plan plan, List contracts) + throws PlanningException; + + /** + * Initialize the replanner + * + * @param planQueueName the name of the queue for this plan + * @param conf the scheduler configuration + */ + void init(String planQueueName, ReservationSchedulerConfiguration conf); +}