Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3FF11200D08 for ; Thu, 7 Sep 2017 01:48:24 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3E1201609DB; Wed, 6 Sep 2017 23:48:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E769F160BCB for ; Thu, 7 Sep 2017 01:48:21 +0200 (CEST) Received: (qmail 83768 invoked by uid 500); 6 Sep 2017 23:48:20 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 83534 invoked by uid 99); 6 Sep 2017 23:48:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Sep 2017 23:48:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 01D09F56B9; Wed, 6 Sep 2017 23:48:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: subru@apache.org To: common-commits@hadoop.apache.org Date: Wed, 06 Sep 2017 23:48:20 -0000 Message-Id: In-Reply-To: <01166c4c1fd9485187ba0fafa6ff3c67@git.apache.org> References: <01166c4c1fd9485187ba0fafa6ff3c67@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] hadoop git commit: YARN-5328. Plan/ResourceAllocation data structure enhancements required to support recurring reservations in ReservationSystem. archived-at: Wed, 06 Sep 2017 23:48:24 -0000 YARN-5328. Plan/ResourceAllocation data structure enhancements required to support recurring reservations in ReservationSystem. (cherry picked from commit b6e7d1369690eaf50ce9ea7968f91a72ecb74de0) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5ccdd839 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5ccdd839 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5ccdd839 Branch: refs/heads/branch-2 Commit: 5ccdd83923d38006b49d4eed3870549232a31adb Parents: 1ef48f1 Author: Subru Krishnan Authored: Wed Sep 6 16:46:01 2017 -0700 Committer: Subru Krishnan Committed: Wed Sep 6 16:46:49 2017 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 6 + .../yarn/conf/TestYarnConfigurationFields.java | 4 +- .../reservation/AbstractReservationSystem.java | 90 ++-- .../AbstractSchedulerPlanFollower.java | 183 ++++---- .../reservation/InMemoryPlan.java | 400 ++++++++++++----- .../InMemoryReservationAllocation.java | 36 +- .../reservation/NoOverCommitPolicy.java | 2 +- .../PeriodicRLESparseResourceAllocation.java | 130 ++++-- .../resourcemanager/reservation/PlanEdit.java | 24 +- .../resourcemanager/reservation/PlanView.java | 94 ++-- .../RLESparseResourceAllocation.java | 112 ++--- .../reservation/ReservationAllocation.java | 60 ++- .../reservation/ReservationInputValidator.java | 134 +++--- .../reservation/ReservationSystem.java | 6 +- .../reservation/SharingPolicy.java | 13 +- .../reservation/planning/Planner.java | 2 +- .../reservation/planning/PlanningAlgorithm.java | 2 +- .../reservation/planning/StageAllocator.java | 2 +- .../planning/StageAllocatorGreedy.java | 2 +- .../planning/StageAllocatorGreedyRLE.java | 5 +- .../planning/StageAllocatorLowCostAligned.java | 4 +- .../reservation/ReservationSystemTestUtil.java | 135 +++--- .../reservation/TestInMemoryPlan.java | 431 ++++++++++++------- ...TestPeriodicRLESparseResourceAllocation.java | 109 +++-- .../TestRLESparseResourceAllocation.java | 122 +++--- .../planning/TestSimpleCapacityReplanner.java | 8 +- 26 files changed, 1340 insertions(+), 776 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 7b397f5..fb9f499 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -254,6 +254,12 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP = 1000L; + /** The maximum periodicity for the Reservation System. */ + public static final String RM_RESERVATION_SYSTEM_MAX_PERIODICITY = + RM_PREFIX + "reservation-system.max-periodicity"; + public static final long DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY = + 86400000L; + /** * Enable periodic monitor threads. * @see #RM_SCHEDULER_MONITOR_POLICIES http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index ee97fb4..8d2411d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -33,7 +33,7 @@ import org.apache.hadoop.conf.TestConfigurationFieldsBase; */ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { - @SuppressWarnings("deprecation") + @SuppressWarnings({"deprecation", "methodlength"}) @Override public void initializeMemberVariables() { xmlFilename = new String("yarn-default.xml"); @@ -67,6 +67,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { .add(YarnConfiguration .YARN_SECURITY_SERVICE_AUTHORIZATION_RESOURCETRACKER_PROTOCOL); configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR); + configurationPropsToSkipCompare + .add(YarnConfiguration.RM_RESERVATION_SYSTEM_MAX_PERIODICITY); // Ignore blacklisting nodes for AM failures feature since it is still a // "work in progress" http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index 5ef4912..5b8772c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -18,6 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -46,17 +57,6 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - /** * This is the implementation of {@link ReservationSystem} based on the * {@link ResourceScheduler} @@ -66,8 +66,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public abstract class AbstractReservationSystem extends AbstractService implements ReservationSystem { - private static final Logger LOG = LoggerFactory - .getLogger(AbstractReservationSystem.class); + private static final Logger LOG = + LoggerFactory.getLogger(AbstractReservationSystem.class); // private static final String DEFAULT_CAPACITY_SCHEDULER_PLAN @@ -103,6 +103,8 @@ public abstract class AbstractReservationSystem extends AbstractService private boolean isRecoveryEnabled = false; + private long maxPeriodicity; + /** * Construct the service. * @@ -143,36 +145,41 @@ public abstract class AbstractReservationSystem extends AbstractService this.conf = conf; scheduler = rmContext.getScheduler(); // Get the plan step size - planStepSize = - conf.getTimeDuration( - YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, - YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, - TimeUnit.MILLISECONDS); + planStepSize = conf.getTimeDuration( + YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, + YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, + TimeUnit.MILLISECONDS); if (planStepSize < 0) { planStepSize = YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP; } + maxPeriodicity = + conf.getLong(YarnConfiguration.RM_RESERVATION_SYSTEM_MAX_PERIODICITY, + YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY); + if (maxPeriodicity <= 0) { + maxPeriodicity = + YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY; + } // Create a plan corresponding to every reservable queue Set planQueueNames = scheduler.getPlanQueues(); for (String planQueueName : planQueueNames) { Plan plan = initializePlan(planQueueName); plans.put(planQueueName, plan); } - isRecoveryEnabled = conf.getBoolean( - YarnConfiguration.RECOVERY_ENABLED, + isRecoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED, YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); if (conf.getBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE, - YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE) && - conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE, - YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) { + YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE) + && conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE, + YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) { reservationsACLsManager = new ReservationsACLsManager(scheduler, conf); } } private void loadPlan(String planName, Map reservations) - throws PlanningException { + throws PlanningException { Plan plan = plans.get(planName); Resource minAllocation = getMinAllocation(); ResourceCalculator rescCalculator = getResourceCalculator(); @@ -248,8 +255,8 @@ public abstract class AbstractReservationSystem extends AbstractService Class planFollowerPolicyClazz = conf.getClassByName(planFollowerPolicyClassName); if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) { - return (PlanFollower) ReflectionUtils.newInstance( - planFollowerPolicyClazz, conf); + return (PlanFollower) ReflectionUtils + .newInstance(planFollowerPolicyClazz, conf); } else { throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName + " not instance of " + PlanFollower.class.getCanonicalName()); @@ -257,7 +264,8 @@ public abstract class AbstractReservationSystem extends AbstractService } catch (ClassNotFoundException e) { throw new YarnRuntimeException( "Could not instantiate PlanFollowerPolicy: " - + planFollowerPolicyClassName, e); + + planFollowerPolicyClassName, + e); } } @@ -371,9 +379,8 @@ public abstract class AbstractReservationSystem extends AbstractService public ReservationId getNewReservationId() { writeLock.lock(); try { - ReservationId resId = - ReservationId.newInstance(ResourceManager.getClusterTimeStamp(), - resCounter.incrementAndGet()); + ReservationId resId = ReservationId.newInstance( + ResourceManager.getClusterTimeStamp(), resCounter.incrementAndGet()); LOG.info("Allocated new reservationId: " + resId); return resId; } finally { @@ -390,8 +397,11 @@ public abstract class AbstractReservationSystem extends AbstractService * Get the default reservation system corresponding to the scheduler * * @param scheduler the scheduler for which the reservation system is required + * + * @return the {@link ReservationSystem} based on the configured scheduler */ - public static String getDefaultReservationSystem(ResourceScheduler scheduler) { + public static String getDefaultReservationSystem( + ResourceScheduler scheduler) { if (scheduler instanceof CapacityScheduler) { return CapacityReservationSystem.class.getName(); } else if (scheduler instanceof FairScheduler) { @@ -409,12 +419,11 @@ public abstract class AbstractReservationSystem extends AbstractService Resource maxAllocation = getMaxAllocation(); ResourceCalculator rescCalc = getResourceCalculator(); Resource totCap = getPlanQueueCapacity(planQueueName); - Plan plan = - new InMemoryPlan(getRootQueueMetrics(), adPolicy, - getAgent(planQueuePath), totCap, planStepSize, rescCalc, - minAllocation, maxAllocation, planQueueName, - getReplanner(planQueuePath), getReservationSchedulerConfiguration() - .getMoveOnExpiry(planQueuePath), rmContext); + Plan plan = new InMemoryPlan(getRootQueueMetrics(), adPolicy, + getAgent(planQueuePath), totCap, planStepSize, rescCalc, minAllocation, + maxAllocation, planQueueName, getReplanner(planQueuePath), + getReservationSchedulerConfiguration().getMoveOnExpiry(planQueuePath), + maxPeriodicity, rmContext); LOG.info("Initialized plan {} based on reservable queue {}", plan.toString(), planQueueName); return plan; @@ -477,8 +486,8 @@ public abstract class AbstractReservationSystem extends AbstractService Class admissionPolicyClazz = conf.getClassByName(admissionPolicyClassName); if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) { - return (SharingPolicy) ReflectionUtils.newInstance( - admissionPolicyClazz, conf); + return (SharingPolicy) ReflectionUtils.newInstance(admissionPolicyClazz, + conf); } else { throw new YarnRuntimeException("Class: " + admissionPolicyClassName + " not instance of " + SharingPolicy.class.getCanonicalName()); @@ -493,8 +502,7 @@ public abstract class AbstractReservationSystem extends AbstractService return this.reservationsACLsManager; } - protected abstract ReservationSchedulerConfiguration - getReservationSchedulerConfiguration(); + protected abstract ReservationSchedulerConfiguration getReservationSchedulerConfiguration(); protected abstract String getPlanQueuePath(String planQueueName); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java index 90357e3..9b6a0b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java @@ -18,6 +18,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -33,24 +41,17 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - public abstract class AbstractSchedulerPlanFollower implements PlanFollower { - private static final Logger LOG = LoggerFactory - .getLogger(AbstractSchedulerPlanFollower.class); + private static final Logger LOG = + LoggerFactory.getLogger(AbstractSchedulerPlanFollower.class); protected Collection plans = new ArrayList(); protected YarnScheduler scheduler; protected Clock clock; @Override - public void init(Clock clock, ResourceScheduler sched, Collection plans) { + public void init(Clock clock, ResourceScheduler sched, + Collection plans) { this.clock = clock; this.scheduler = sched; this.plans.addAll(plans); @@ -71,7 +72,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { @Override public synchronized void synchronizePlan(Plan plan, boolean shouldReplan) { - String planQueueName = plan.getQueueName(); + String planQueueName = plan.getQueueName(); if (LOG.isDebugEnabled()) { LOG.debug("Running plan follower edit policy for plan: " + planQueueName); } @@ -82,12 +83,14 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { now += step - (now % step); } Queue planQueue = getPlanQueue(planQueueName); - if (planQueue == null) return; + if (planQueue == null) { + return; + } // first we publish to the plan the current availability of resources Resource clusterResources = scheduler.getClusterResource(); - Resource planResources = getPlanResources(plan, planQueue, - clusterResources); + Resource planResources = + getPlanResources(plan, planQueue, clusterResources); Set currentReservations = plan.getReservationsAtTime(now); Set curReservationNames = new HashSet(); @@ -95,12 +98,11 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { int numRes = getReservedResources(now, currentReservations, curReservationNames, reservedResources); // create the default reservation queue if it doesnt exist - String defReservationId = getReservationIdFromQueueName(planQueueName) + - ReservationConstants.DEFAULT_QUEUE_SUFFIX; - String defReservationQueue = getReservationQueueName(planQueueName, - defReservationId); - createDefaultReservationQueue(planQueueName, planQueue, - defReservationId); + String defReservationId = getReservationIdFromQueueName(planQueueName) + + ReservationConstants.DEFAULT_QUEUE_SUFFIX; + String defReservationQueue = + getReservationQueueName(planQueueName, defReservationId); + createDefaultReservationQueue(planQueueName, planQueue, defReservationId); curReservationNames.add(defReservationId); // if the resources dedicated to this plan has shrunk invoke replanner boolean shouldResize = false; @@ -149,10 +151,8 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { // sort allocations from the one giving up the most resources, to the // one asking for the most avoid order-of-operation errors that // temporarily violate 100% capacity bound - List sortedAllocations = - sortByDelta( - new ArrayList(currentReservations), now, - plan); + List sortedAllocations = sortByDelta( + new ArrayList(currentReservations), now, plan); for (ReservationAllocation res : sortedAllocations) { String currResId = res.getReservationId().toString(); if (curReservationNames.contains(currResId)) { @@ -163,10 +163,9 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { if (planResources.getMemorySize() > 0 && planResources.getVirtualCores() > 0) { if (shouldResize) { - capToAssign = - calculateReservationToPlanProportion( - plan.getResourceCalculator(), planResources, - reservedResources, capToAssign); + capToAssign = calculateReservationToPlanProportion( + plan.getResourceCalculator(), planResources, reservedResources, + capToAssign); } targetCapacity = calculateReservationToPlanRatio(plan.getResourceCalculator(), @@ -185,7 +184,8 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { maxCapacity = targetCapacity; } try { - setQueueEntitlement(planQueueName, currResId, targetCapacity, maxCapacity); + setQueueEntitlement(planQueueName, currResId, targetCapacity, + maxCapacity); } catch (YarnException e) { LOG.warn("Exception while trying to size reservation for plan: {}", currResId, planQueueName, e); @@ -196,9 +196,10 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { // compute the default queue capacity float defQCap = 1.0f - totalAssignedCapacity; if (LOG.isDebugEnabled()) { - LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} " - + "currReservation: {} default-queue capacity: {}", planResources, - numRes, defQCap); + LOG.debug( + "PlanFollowerEditPolicyTask: total Plan Capacity: {} " + + "currReservation: {} default-queue capacity: {}", + planResources, numRes, defQCap); } // set the default queue to eat-up all remaining capacity try { @@ -225,12 +226,11 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { } protected void setQueueEntitlement(String planQueueName, String currResId, - float targetCapacity, - float maxCapacity) throws YarnException { - String reservationQueueName = getReservationQueueName(planQueueName, - currResId); - scheduler.setEntitlement(reservationQueueName, new QueueEntitlement( - targetCapacity, maxCapacity)); + float targetCapacity, float maxCapacity) throws YarnException { + String reservationQueueName = + getReservationQueueName(planQueueName, currResId); + scheduler.setEntitlement(reservationQueueName, + new QueueEntitlement(targetCapacity, maxCapacity)); } // Schedulers have different ways of naming queues. See YARN-2773 @@ -244,14 +244,21 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { * Then move all apps in the set of queues to the parent plan queue's default * reservation queue if move is enabled. Finally cleanups the queue by killing * any apps (if move is disabled or move failed) and removing the queue + * + * @param planQueueName the name of {@code PlanQueue} + * @param shouldMove flag to indicate if any running apps should be moved or + * killed + * @param toRemove the remnant apps to clean up + * @param defReservationQueue the default {@code ReservationQueue} of the + * {@link Plan} */ - protected void cleanupExpiredQueues(String planQueueName, - boolean shouldMove, Set toRemove, String defReservationQueue) { + protected void cleanupExpiredQueues(String planQueueName, boolean shouldMove, + Set toRemove, String defReservationQueue) { for (String expiredReservationId : toRemove) { try { // reduce entitlement to 0 - String expiredReservation = getReservationQueueName(planQueueName, - expiredReservationId); + String expiredReservation = + getReservationQueueName(planQueueName, expiredReservationId); setQueueEntitlement(planQueueName, expiredReservation, 0.0f, 0.0f); if (shouldMove) { moveAppsInQueueSync(expiredReservation, defReservationQueue); @@ -275,7 +282,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { * reservation queue in a synchronous fashion */ private void moveAppsInQueueSync(String expiredReservation, - String defReservationQueue) { + String defReservationQueue) { List activeApps = scheduler.getAppsInQueue(expiredReservation); if (activeApps.isEmpty()) { @@ -287,16 +294,16 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { scheduler.moveApplication(app.getApplicationId(), defReservationQueue); } catch (YarnException e) { LOG.warn( - "Encountered unexpected error during migration of application: {}" + - " from reservation: {}", + "Encountered unexpected error during migration of application: {}" + + " from reservation: {}", app, expiredReservation, e); } } } - protected int getReservedResources(long now, Set - currentReservations, Set curReservationNames, - Resource reservedResources) { + protected int getReservedResources(long now, + Set currentReservations, + Set curReservationNames, Resource reservedResources) { int numRes = 0; if (currentReservations != null) { numRes = currentReservations.size(); @@ -312,23 +319,30 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { * Sort in the order from the least new amount of resources asked (likely * negative) to the highest. This prevents "order-of-operation" errors related * to exceeding 100% capacity temporarily. + * + * @param currentReservations the currently active reservations + * @param now the current time + * @param plan the {@link Plan} that is being considered + * + * @return the sorted list of {@link ReservationAllocation}s */ protected List sortByDelta( List currentReservations, long now, Plan plan) { - Collections.sort(currentReservations, new ReservationAllocationComparator( - now, this, plan)); + Collections.sort(currentReservations, + new ReservationAllocationComparator(now, this, plan)); return currentReservations; } /** - * Get queue associated with reservable queue named - * @param planQueueName Name of the reservable queue + * Get queue associated with reservable queue named. + * + * @param planQueueName name of the reservable queue * @return queue associated with the reservable queue */ protected abstract Queue getPlanQueue(String planQueueName); /** - * Resizes reservations based on currently available resources + * Resizes reservations based on currently available resources. */ private Resource calculateReservationToPlanProportion( ResourceCalculator rescCalculator, Resource availablePlanResources, @@ -338,7 +352,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { } /** - * Calculates ratio of reservationResources to planResources + * Calculates ratio of reservationResources to planResources. */ private float calculateReservationToPlanRatio( ResourceCalculator rescCalculator, Resource clusterResources, @@ -348,7 +362,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { } /** - * Check if plan resources are less than expected reservation resources + * Check if plan resources are less than expected reservation resources. */ private boolean arePlanResourcesLessThanReservations( ResourceCalculator rescCalculator, Resource clusterResources, @@ -358,38 +372,56 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { } /** - * Get a list of reservation queues for this planQueue + * Get a list of reservation queues for this planQueue. + * + * @param planQueue the queue for the current {@link Plan} + * + * @return the queues corresponding to the reservations */ protected abstract List getChildReservationQueues( Queue planQueue); /** - * Add a new reservation queue for reservation currResId for this planQueue + * Add a new reservation queue for reservation currResId for this planQueue. */ - protected abstract void addReservationQueue( - String planQueueName, Queue queue, String currResId); + protected abstract void addReservationQueue(String planQueueName, Queue queue, + String currResId); /** - * Creates the default reservation queue for use when no reservation is - * used for applications submitted to this planQueue + * Creates the default reservation queue for use when no reservation is used + * for applications submitted to this planQueue. + * + * @param planQueueName name of the reservable queue + * @param queue the queue for the current {@link Plan} + * @param defReservationQueue name of the default {@code ReservationQueue} */ - protected abstract void createDefaultReservationQueue( - String planQueueName, Queue queue, String defReservationQueue); + protected abstract void createDefaultReservationQueue(String planQueueName, + Queue queue, String defReservationQueue); /** - * Get plan resources for this planQueue + * Get plan resources for this planQueue. + * + * @param plan the current {@link Plan} being considered + * @param clusterResources the resources available in the cluster + * + * @return the resources allocated to the specified {@link Plan} */ - protected abstract Resource getPlanResources( - Plan plan, Queue queue, Resource clusterResources); + protected abstract Resource getPlanResources(Plan plan, Queue queue, + Resource clusterResources); /** * Get reservation queue resources if it exists otherwise return null. + * + * @param plan the current {@link Plan} being considered + * @param reservationId the identifier of the reservation + * + * @return the resources allocated to the specified reservation */ protected abstract Resource getReservationQueueResourceIfExists(Plan plan, ReservationId reservationId); - private static class ReservationAllocationComparator implements - Comparator { + private static class ReservationAllocationComparator + implements Comparator { AbstractSchedulerPlanFollower planFollower; long now; Plan plan; @@ -404,14 +436,12 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { private Resource getUnallocatedReservedResources( ReservationAllocation reservation) { Resource resResource; - Resource reservationResource = planFollower - .getReservationQueueResourceIfExists - (plan, reservation.getReservationId()); + Resource reservationResource = + planFollower.getReservationQueueResourceIfExists(plan, + reservation.getReservationId()); if (reservationResource != null) { - resResource = - Resources.subtract( - reservation.getResourcesAtTime(now), - reservationResource); + resResource = Resources.subtract(reservation.getResourcesAtTime(now), + reservationResource); } else { resResource = reservation.getResourcesAtTime(now); } @@ -428,4 +458,3 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { } } } - http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java index 783fd09..9eb1820 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,9 +33,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; @@ -64,9 +65,14 @@ public class InMemoryPlan implements Plan { private RLESparseResourceAllocation rleSparseVector; + private PeriodicRLESparseResourceAllocation periodicRle; + private Map userResourceAlloc = new HashMap(); + private Map userPeriodicResourceAlloc = + new HashMap(); + private Map userActiveReservationCount = new HashMap(); @@ -96,15 +102,27 @@ public class InMemoryPlan implements Plan { String queueName, Planner replanner, boolean getMoveOnExpiry, RMContext rmContext) { this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc, - maxAlloc, queueName, replanner, getMoveOnExpiry, rmContext, - new UTCClock()); + maxAlloc, queueName, replanner, getMoveOnExpiry, + YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY, + rmContext); } public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, ReservationAgent agent, Resource totalCapacity, long step, ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, String queueName, Planner replanner, boolean getMoveOnExpiry, - RMContext rmContext, Clock clock) { + long maxPeriodicty, RMContext rmContext) { + this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc, + maxAlloc, queueName, replanner, getMoveOnExpiry, maxPeriodicty, + rmContext, new UTCClock()); + } + + @SuppressWarnings("checkstyle:parameternumber") + public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, + ReservationAgent agent, Resource totalCapacity, long step, + ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, + String queueName, Planner replanner, boolean getMoveOnExpiry, + long maxPeriodicty, RMContext rmContext, Clock clock) { this.queueMetrics = queueMetrics; this.policy = policy; this.agent = agent; @@ -114,6 +132,8 @@ public class InMemoryPlan implements Plan { this.minAlloc = minAlloc; this.maxAlloc = maxAlloc; this.rleSparseVector = new RLESparseResourceAllocation(resCalc); + this.periodicRle = + new PeriodicRLESparseResourceAllocation(resCalc, maxPeriodicty); this.queueName = queueName; this.replanner = replanner; this.getMoveOnExpiry = getMoveOnExpiry; @@ -126,6 +146,39 @@ public class InMemoryPlan implements Plan { return queueMetrics; } + private RLESparseResourceAllocation getUserRLEResourceAllocation(String user, + long period) { + RLESparseResourceAllocation resAlloc = null; + if (period > 0) { + if (userPeriodicResourceAlloc.containsKey(user)) { + resAlloc = userPeriodicResourceAlloc.get(user); + } else { + resAlloc = new PeriodicRLESparseResourceAllocation(resCalc, + periodicRle.getTimePeriod()); + userPeriodicResourceAlloc.put(user, resAlloc); + } + } else { + if (userResourceAlloc.containsKey(user)) { + resAlloc = userResourceAlloc.get(user); + } else { + resAlloc = new RLESparseResourceAllocation(resCalc); + userResourceAlloc.put(user, resAlloc); + } + } + return resAlloc; + } + + private void gcUserRLEResourceAllocation(String user, long period) { + if (period > 0) { + if (userPeriodicResourceAlloc.get(user).isEmpty()) { + userPeriodicResourceAlloc.remove(user); + } + } else { + if (userResourceAlloc.get(user).isEmpty()) { + userResourceAlloc.remove(user); + } + } + } private void incrementAllocation(ReservationAllocation reservation) { assert (readWriteLock.isWriteLockedByCurrentThread()); @@ -133,11 +186,10 @@ public class InMemoryPlan implements Plan { reservation.getAllocationRequests(); // check if we have encountered the user earlier and if not add an entry String user = reservation.getUser(); - RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user); - if (resAlloc == null) { - resAlloc = new RLESparseResourceAllocation(resCalc); - userResourceAlloc.put(user, resAlloc); - } + long period = reservation.getPeriodicity(); + RLESparseResourceAllocation resAlloc = + getUserRLEResourceAllocation(user, period); + RLESparseResourceAllocation resCount = userActiveReservationCount.get(user); if (resCount == null) { resCount = new RLESparseResourceAllocation(resCalc); @@ -149,14 +201,43 @@ public class InMemoryPlan implements Plan { for (Map.Entry r : allocationRequests .entrySet()) { - resAlloc.addInterval(r.getKey(), r.getValue()); - rleSparseVector.addInterval(r.getKey(), r.getValue()); - if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(), - ZERO_RESOURCE)) { - earliestActive = Math.min(earliestActive, r.getKey().getStartTime()); - latestActive = Math.max(latestActive, r.getKey().getEndTime()); + + if (period > 0L) { + for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) { + + long rStart = r.getKey().getStartTime() + i * period; + long rEnd = r.getKey().getEndTime() + i * period; + + // handle wrap-around + if (rEnd > periodicRle.getTimePeriod()) { + long diff = rEnd - periodicRle.getTimePeriod(); + rEnd = periodicRle.getTimePeriod(); + ReservationInterval newInterval = new ReservationInterval(0, diff); + periodicRle.addInterval(newInterval, r.getValue()); + resAlloc.addInterval(newInterval, r.getValue()); + } + + ReservationInterval newInterval = + new ReservationInterval(rStart, rEnd); + periodicRle.addInterval(newInterval, r.getValue()); + resAlloc.addInterval(newInterval, r.getValue()); + } + + } else { + rleSparseVector.addInterval(r.getKey(), r.getValue()); + resAlloc.addInterval(r.getKey(), r.getValue()); + if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(), + ZERO_RESOURCE)) { + earliestActive = Math.min(earliestActive, r.getKey().getStartTime()); + latestActive = Math.max(latestActive, r.getKey().getEndTime()); + } } } + // periodic reservations are active from start time and good till cancelled + if (period > 0L) { + earliestActive = reservation.getStartTime(); + latestActive = Long.MAX_VALUE; + } resCount.addInterval(new ReservationInterval(earliestActive, latestActive), Resource.newInstance(1, 1)); } @@ -166,27 +247,55 @@ public class InMemoryPlan implements Plan { Map allocationRequests = reservation.getAllocationRequests(); String user = reservation.getUser(); - RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user); + long period = reservation.getPeriodicity(); + RLESparseResourceAllocation resAlloc = + getUserRLEResourceAllocation(user, period); long earliestActive = Long.MAX_VALUE; long latestActive = Long.MIN_VALUE; for (Map.Entry r : allocationRequests .entrySet()) { - resAlloc.removeInterval(r.getKey(), r.getValue()); - rleSparseVector.removeInterval(r.getKey(), r.getValue()); - if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(), - ZERO_RESOURCE)) { - earliestActive = Math.min(earliestActive, r.getKey().getStartTime()); - latestActive = Math.max(latestActive, r.getKey().getEndTime()); + if (period > 0L) { + for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) { + + long rStart = r.getKey().getStartTime() + i * period; + long rEnd = r.getKey().getEndTime() + i * period; + + // handle wrap-around + if (rEnd > periodicRle.getTimePeriod()) { + long diff = rEnd - periodicRle.getTimePeriod(); + rEnd = periodicRle.getTimePeriod(); + ReservationInterval newInterval = new ReservationInterval(0, diff); + periodicRle.removeInterval(newInterval, r.getValue()); + resAlloc.removeInterval(newInterval, r.getValue()); + } + + ReservationInterval newInterval = + new ReservationInterval(rStart, rEnd); + periodicRle.removeInterval(newInterval, r.getValue()); + resAlloc.removeInterval(newInterval, r.getValue()); + } + } else { + rleSparseVector.removeInterval(r.getKey(), r.getValue()); + resAlloc.removeInterval(r.getKey(), r.getValue()); + if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(), + ZERO_RESOURCE)) { + earliestActive = Math.min(earliestActive, r.getKey().getStartTime()); + latestActive = Math.max(latestActive, r.getKey().getEndTime()); + } } } - if (resAlloc.isEmpty()) { - userResourceAlloc.remove(user); - } + gcUserRLEResourceAllocation(user, period); RLESparseResourceAllocation resCount = userActiveReservationCount.get(user); - resCount.removeInterval(new ReservationInterval(earliestActive, - latestActive), Resource.newInstance(1, 1)); + // periodic reservations are active from start time and good till cancelled + if (period > 0L) { + earliestActive = reservation.getStartTime(); + latestActive = Long.MAX_VALUE; + } + resCount.removeInterval( + new ReservationInterval(earliestActive, latestActive), + Resource.newInstance(1, 1)); if (resCount.isEmpty()) { userActiveReservationCount.remove(user); } @@ -198,9 +307,9 @@ public class InMemoryPlan implements Plan { if (currentReservations != null) { Set flattenedReservations = new TreeSet(); - for (Set reservationEntries : - currentReservations.values()) { - flattenedReservations.addAll(reservationEntries); + for (Set res : currentReservations + .values()) { + flattenedReservations.addAll(res); } return flattenedReservations; } else { @@ -218,19 +327,16 @@ public class InMemoryPlan implements Plan { InMemoryReservationAllocation inMemReservation = (InMemoryReservationAllocation) reservation; if (inMemReservation.getUser() == null) { - String errMsg = - "The specified Reservation with ID " - + inMemReservation.getReservationId() - + " is not mapped to any user"; + String errMsg = "The specified Reservation with ID " + + inMemReservation.getReservationId() + " is not mapped to any user"; LOG.error(errMsg); throw new IllegalArgumentException(errMsg); } writeLock.lock(); try { if (reservationTable.containsKey(inMemReservation.getReservationId())) { - String errMsg = - "The specified Reservation with ID " - + inMemReservation.getReservationId() + " already exists"; + String errMsg = "The specified Reservation with ID " + + inMemReservation.getReservationId() + " already exists"; LOG.error(errMsg); throw new IllegalArgumentException(errMsg); } @@ -246,9 +352,8 @@ public class InMemoryPlan implements Plan { getQueueName(), inMemReservation.getReservationId().toString()); } } - ReservationInterval searchInterval = - new ReservationInterval(inMemReservation.getStartTime(), - inMemReservation.getEndTime()); + ReservationInterval searchInterval = new ReservationInterval( + inMemReservation.getStartTime(), inMemReservation.getEndTime()); Set reservations = currentReservations.get(searchInterval); if (reservations == null) { @@ -280,9 +385,8 @@ public class InMemoryPlan implements Plan { ReservationId resId = reservation.getReservationId(); ReservationAllocation currReservation = getReservationById(resId); if (currReservation == null) { - String errMsg = - "The specified Reservation with ID " + resId - + " does not exist in the plan"; + String errMsg = "The specified Reservation with ID " + resId + + " does not exist in the plan"; LOG.error(errMsg); throw new IllegalArgumentException(errMsg); } @@ -318,9 +422,8 @@ public class InMemoryPlan implements Plan { private boolean removeReservation(ReservationAllocation reservation) { assert (readWriteLock.isWriteLockedByCurrentThread()); - ReservationInterval searchInterval = - new ReservationInterval(reservation.getStartTime(), - reservation.getEndTime()); + ReservationInterval searchInterval = new ReservationInterval( + reservation.getStartTime(), reservation.getEndTime()); Set reservations = currentReservations.get(searchInterval); if (reservations != null) { @@ -337,16 +440,15 @@ public class InMemoryPlan implements Plan { currentReservations.remove(searchInterval); } } else { - String errMsg = - "The specified Reservation with ID " + reservation.getReservationId() - + " does not exist in the plan"; + String errMsg = "The specified Reservation with ID " + + reservation.getReservationId() + " does not exist in the plan"; LOG.error(errMsg); throw new IllegalArgumentException(errMsg); } reservationTable.remove(reservation.getReservationId()); decrementAllocation(reservation); LOG.info("Sucessfully deleted reservation: {} in plan.", - reservation.getReservationId()); + reservation.getReservationId()); return true; } @@ -356,9 +458,8 @@ public class InMemoryPlan implements Plan { try { ReservationAllocation reservation = getReservationById(reservationID); if (reservation == null) { - String errMsg = - "The specified Reservation with ID " + reservationID - + " does not exist in the plan"; + String errMsg = "The specified Reservation with ID " + reservationID + + " does not exist in the plan"; LOG.error(errMsg); throw new IllegalArgumentException(errMsg); } @@ -453,66 +554,90 @@ public class InMemoryPlan implements Plan { long start, long end) { readLock.lock(); try { + // merge periodic and non-periodic allocations RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user); + RLESparseResourceAllocation userPeriodicResAlloc = + userPeriodicResourceAlloc.get(user); + if (userResAlloc != null && userPeriodicResAlloc != null) { + return RLESparseResourceAllocation.merge(resCalc, totalCapacity, + userResAlloc, userPeriodicResAlloc, RLEOperator.add, start, end); + } if (userResAlloc != null) { return userResAlloc.getRangeOverlapping(start, end); - } else { - return new RLESparseResourceAllocation(resCalc); } + if (userPeriodicResAlloc != null) { + return userPeriodicResAlloc.getRangeOverlapping(start, end); + } + } catch (PlanningException e) { + LOG.warn("Exception while trying to merge periodic" + + " and non-periodic user allocations: {}", e.getMessage(), e); } finally { readLock.unlock(); } + return new RLESparseResourceAllocation(resCalc); } @Override public Resource getTotalCommittedResources(long t) { readLock.lock(); try { - return rleSparseVector.getCapacityAtTime(t); + return Resources.add(rleSparseVector.getCapacityAtTime(t), + periodicRle.getCapacityAtTime(t)); } finally { readLock.unlock(); } } @Override - public Set getReservations(ReservationId - reservationID, ReservationInterval interval) { + public Set getReservations(ReservationId reservationID, + ReservationInterval interval) { return getReservations(reservationID, interval, null); } @Override - public Set getReservations(ReservationId - reservationID, ReservationInterval interval, String user) { + public Set getReservations(ReservationId reservationID, + ReservationInterval interval, String user) { if (reservationID != null) { ReservationAllocation allocation = getReservationById(reservationID); - if (allocation == null){ + if (allocation == null) { return Collections.emptySet(); } return Collections.singleton(allocation); } - long startTime = interval == null? 0 : interval.getStartTime(); - long endTime = interval == null? Long.MAX_VALUE : interval.getEndTime(); + long startTime = interval == null ? 0 : interval.getStartTime(); + long endTime = interval == null ? Long.MAX_VALUE : interval.getEndTime(); ReservationInterval searchInterval = - new ReservationInterval(endTime, Long.MAX_VALUE); + new ReservationInterval(endTime, Long.MAX_VALUE); readLock.lock(); try { - SortedMap> - reservations = currentReservations.headMap(searchInterval, true); - if (!reservations.isEmpty()) { - Set flattenedReservations = - new HashSet<>(); - for (Set reservationEntries : - reservations.values()) { - for (InMemoryReservationAllocation res : reservationEntries) { - if (res.getEndTime() > startTime) { - if (user != null && !user.isEmpty() - && !res.getUser().equals(user)) { - continue; + SortedMap> res = + currentReservations.headMap(searchInterval, true); + if (!res.isEmpty()) { + Set flattenedReservations = new HashSet<>(); + for (Set resEntries : res.values()) { + for (InMemoryReservationAllocation reservation : resEntries) { + // validate user + if (user != null && !user.isEmpty() + && !reservation.getUser().equals(user)) { + continue; + } + // handle periodic reservations + long period = reservation.getPeriodicity(); + if (period > 0) { + long t = endTime % period; + // check for both contained and wrap-around reservations + if ((t - startTime) * (t - endTime) + * (startTime - endTime) >= 0) { + flattenedReservations.add(reservation); + } + } else { + // check for non-periodic reservations + if (reservation.getEndTime() > startTime) { + flattenedReservations.add(reservation); } - flattenedReservations.add(res); } } } @@ -550,36 +675,82 @@ public class InMemoryPlan implements Plan { @Override public RLESparseResourceAllocation getAvailableResourceOverTime(String user, - ReservationId oldId, long start, long end) throws PlanningException { + ReservationId oldId, long start, long end, long period) + throws PlanningException { readLock.lock(); try { - // create RLE of totCapacity - TreeMap totAvailable = new TreeMap(); - totAvailable.put(start, Resources.clone(totalCapacity)); - RLESparseResourceAllocation totRLEAvail = - new RLESparseResourceAllocation(totAvailable, resCalc); - - // subtract used from available - RLESparseResourceAllocation netAvailable; - - netAvailable = - RLESparseResourceAllocation.merge(resCalc, - Resources.clone(totalCapacity), totRLEAvail, rleSparseVector, - RLEOperator.subtractTestNonNegative, start, end); - - // add back in old reservation used resources if any - ReservationAllocation old = reservationTable.get(oldId); - if (old != null) { - netAvailable = - RLESparseResourceAllocation.merge(resCalc, - Resources.clone(totalCapacity), netAvailable, - old.getResourcesOverTime(), RLEOperator.add, start, end); + + // for non-periodic return simple available resources + if (period == 0) { + + // create RLE of totCapacity + TreeMap totAvailable = new TreeMap(); + totAvailable.put(start, Resources.clone(totalCapacity)); + RLESparseResourceAllocation totRLEAvail = + new RLESparseResourceAllocation(totAvailable, resCalc); + + // subtract used from available + RLESparseResourceAllocation netAvailable; + + netAvailable = RLESparseResourceAllocation.merge(resCalc, + Resources.clone(totalCapacity), totRLEAvail, rleSparseVector, + RLEOperator.subtractTestNonNegative, start, end); + + // remove periodic component + netAvailable = RLESparseResourceAllocation.merge(resCalc, + Resources.clone(totalCapacity), netAvailable, periodicRle, + RLEOperator.subtractTestNonNegative, start, end); + + // add back in old reservation used resources if any + ReservationAllocation old = reservationTable.get(oldId); + if (old != null) { + + RLESparseResourceAllocation addBackPrevious = + old.getResourcesOverTime(start, end); + netAvailable = RLESparseResourceAllocation.merge(resCalc, + Resources.clone(totalCapacity), netAvailable, addBackPrevious, + RLEOperator.add, start, end); + } + // lower it if this is needed by the sharing policy + netAvailable = getSharingPolicy().availableResources(netAvailable, this, + user, oldId, start, end); + return netAvailable; + } else { + + if (periodicRle.getTimePeriod() % period != 0) { + throw new PlanningException("The reservation periodicity (" + period + + ") must be" + "an exact divider of the system maxPeriod (" + + periodicRle.getTimePeriod() + ")"); + } + + // find the minimum resources available among all the instances that fit + // in the LCM + long numInstInLCM = periodicRle.getTimePeriod() / period; + + RLESparseResourceAllocation minOverLCM = + getAvailableResourceOverTime(user, oldId, start, end, 0); + for (int i = 1; i < numInstInLCM; i++) { + + long rStart = start + i * period; + long rEnd = end + i * period; + + // recursive invocation of non-periodic range (to pick raw-info) + RLESparseResourceAllocation snapShot = + getAvailableResourceOverTime(user, oldId, rStart, rEnd, 0); + + // time-align on start + snapShot.shift(-(i * period)); + + // pick the minimum amount of resources in each time interval + minOverLCM = + RLESparseResourceAllocation.merge(resCalc, getTotalCapacity(), + minOverLCM, snapShot, RLEOperator.min, start, end); + + } + + return minOverLCM; + } - // lower it if this is needed by the sharing policy - netAvailable = - getSharingPolicy().availableResources(netAvailable, this, user, - oldId, start, end); - return netAvailable; } finally { readLock.unlock(); } @@ -637,7 +808,7 @@ public class InMemoryPlan implements Plan { public String toCumulativeString() { readLock.lock(); try { - return rleSparseVector.toString(); + return rleSparseVector.toString() + "\n" + periodicRle.toString(); } finally { readLock.unlock(); } @@ -689,11 +860,18 @@ public class InMemoryPlan implements Plan { } @Override - public RLESparseResourceAllocation getCumulativeLoadOverTime( - long start, long end) { + public RLESparseResourceAllocation getCumulativeLoadOverTime(long start, + long end) throws PlanningException { readLock.lock(); try { - return rleSparseVector.getRangeOverlapping(start, end); + + RLESparseResourceAllocation ret = + rleSparseVector.getRangeOverlapping(start, end); + ret = RLESparseResourceAllocation.merge(resCalc, totalCapacity, ret, + periodicRle.getRangeOverlapping(start, end), RLEOperator.add, start, + end); + + return ret; } finally { readLock.unlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java index 69fd43f..00c8e44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java @@ -42,6 +42,7 @@ public class InMemoryReservationAllocation implements ReservationAllocation { private final Map allocationRequests; private boolean hasGang = false; private long acceptedAt = -1; + private long periodicity = 0; private RLESparseResourceAllocation resourcesOverTime; @@ -67,9 +68,16 @@ public class InMemoryReservationAllocation implements ReservationAllocation { this.allocationRequests = allocations; this.planName = planName; this.hasGang = hasGang; - resourcesOverTime = new RLESparseResourceAllocation(calculator); - for (Map.Entry r : allocations - .entrySet()) { + if (contract != null && contract.getRecurrenceExpression() != null) { + this.periodicity = Long.parseLong(contract.getRecurrenceExpression()); + } + if (periodicity > 0) { + resourcesOverTime = + new PeriodicRLESparseResourceAllocation(calculator, periodicity); + } else { + resourcesOverTime = new RLESparseResourceAllocation(calculator); + } + for (Map.Entry r : allocations.entrySet()) { resourcesOverTime.addInterval(r.getKey(), r.getValue()); } } @@ -133,17 +141,33 @@ public class InMemoryReservationAllocation implements ReservationAllocation { } @Override - public RLESparseResourceAllocation getResourcesOverTime(){ + public RLESparseResourceAllocation getResourcesOverTime() { return resourcesOverTime; } @Override + public RLESparseResourceAllocation getResourcesOverTime(long start, + long end) { + return resourcesOverTime.getRangeOverlapping(start, end); + } + + @Override + public long getPeriodicity() { + return periodicity; + } + + @Override + public void setPeriodicity(long period) { + periodicity = period; + } + + @Override public String toString() { StringBuilder sBuf = new StringBuilder(); sBuf.append(getReservationId()).append(" user:").append(getUser()) .append(" startTime: ").append(getStartTime()).append(" endTime: ") - .append(getEndTime()).append(" alloc:\n[") - .append(resourcesOverTime.toString()).append("] "); + .append(getEndTime()).append(" Periodiciy: ").append(periodicity) + .append(" alloc:\n[").append(resourcesOverTime.toString()).append("] "); return sBuf.toString(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java index 55f1d00..49d4702 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java @@ -40,7 +40,7 @@ public class NoOverCommitPolicy implements SharingPolicy { RLESparseResourceAllocation available = plan.getAvailableResourceOverTime( reservation.getUser(), reservation.getReservationId(), - reservation.getStartTime(), reservation.getEndTime()); + reservation.getStartTime(), reservation.getEndTime(), 0); // test the reservation does not exceed what is available try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java index 8e3be8b3..7bc44f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java @@ -18,47 +18,94 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; + /** - * This data structure stores a periodic RLESparseResourceAllocation. + * This data structure stores a periodic {@link RLESparseResourceAllocation}. * Default period is 1 day (86400000ms). */ -public class PeriodicRLESparseResourceAllocation extends - RLESparseResourceAllocation { +public class PeriodicRLESparseResourceAllocation + extends RLESparseResourceAllocation { // Log - private static final Logger LOG = LoggerFactory - .getLogger(PeriodicRLESparseResourceAllocation.class); + private static final Logger LOG = + LoggerFactory.getLogger(PeriodicRLESparseResourceAllocation.class); private long timePeriod; /** * Constructor. * - * @param rleVector {@link RLESparseResourceAllocation} with the run-length - encoded data. + * @param resourceCalculator {@link ResourceCalculator} the resource + * calculator to use. * @param timePeriod Time period in milliseconds. */ public PeriodicRLESparseResourceAllocation( - RLESparseResourceAllocation rleVector, Long timePeriod) { - super(rleVector.getCumulative(), rleVector.getResourceCalculator()); + ResourceCalculator resourceCalculator, Long timePeriod) { + super(resourceCalculator); this.timePeriod = timePeriod; } /** * Constructor. Default time period set to 1 day. * + * @param resourceCalculator {@link ResourceCalculator} the resource + * calculator to use.. + */ + public PeriodicRLESparseResourceAllocation( + ResourceCalculator resourceCalculator) { + this(resourceCalculator, + YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY); + } + + /** + * Constructor. + * * @param rleVector {@link RLESparseResourceAllocation} with the run-length - encoded data. + * encoded data. + * @param timePeriod Time period in milliseconds. */ + @VisibleForTesting public PeriodicRLESparseResourceAllocation( - RLESparseResourceAllocation rleVector) { - this(rleVector, 86400000L); + RLESparseResourceAllocation rleVector, Long timePeriod) { + super(rleVector.getCumulative(), rleVector.getResourceCalculator()); + this.timePeriod = timePeriod; + + // make sure the PeriodicRLE is zero-based, and handles wrap-around + long delta = (getEarliestStartTime() % timePeriod - getEarliestStartTime()); + shift(delta); + + List toRemove = new ArrayList<>(); + Map toAdd = new TreeMap<>(); + + for (Map.Entry entry : cumulativeCapacity.entrySet()) { + if (entry.getKey() > timePeriod) { + toRemove.add(entry.getKey()); + if (entry.getValue() != null) { + toAdd.put(timePeriod, entry.getValue()); + long prev = entry.getKey() % timePeriod; + toAdd.put(prev, this.getCapacityAtTime(prev)); + toAdd.put(0L, entry.getValue()); + } + } + } + for (Long l : toRemove) { + cumulativeCapacity.remove(l); + } + cumulativeCapacity.putAll(toAdd); } /** @@ -78,24 +125,25 @@ public class PeriodicRLESparseResourceAllocation extends * The interval may include 0, but the end time must be strictly less than * timePeriod. * - * @param interval {@link ReservationInterval} to which the specified - * resource is to be added. + * @param interval {@link ReservationInterval} to which the specified resource + * is to be added. * @param resource {@link Resource} to be added to the interval specified. * @return true if addition is successful, false otherwise */ - public boolean addInterval(ReservationInterval interval, - Resource resource) { + public boolean addInterval(ReservationInterval interval, Resource resource) { long startTime = interval.getStartTime(); long endTime = interval.getEndTime(); + if (startTime >= 0 && endTime > startTime && endTime <= timePeriod) { return super.addInterval(interval, resource); } else { - LOG.info("Cannot set capacity beyond end time: " + timePeriod); + LOG.info("Cannot set capacity beyond end time: " + timePeriod + " was (" + + interval.toString() + ")"); return false; } } - /** + /** * Removes a resource for the specified interval. * * @param interval the {@link ReservationInterval} for which the resource is @@ -103,14 +151,15 @@ public class PeriodicRLESparseResourceAllocation extends * @param resource the {@link Resource} to be removed. * @return true if removal is successful, false otherwise */ - public boolean removeInterval( - ReservationInterval interval, Resource resource) { + public boolean removeInterval(ReservationInterval interval, + Resource resource) { long startTime = interval.getStartTime(); long endTime = interval.getEndTime(); // If the resource to be subtracted is less than the minimum resource in // the range, abort removal to avoid negative capacity. - if (!Resources.fitsIn( - resource, super.getMinimumCapacityInInterval(interval))) { + // TODO revesit decrementing endTime + if (!Resources.fitsIn(resource, getMinimumCapacityInInterval( + new ReservationInterval(startTime, endTime - 1)))) { LOG.info("Request to remove more resources than what is available"); return false; } @@ -125,17 +174,16 @@ public class PeriodicRLESparseResourceAllocation extends /** * Get maximum capacity at periodic offsets from the specified time. * - * @param tick UTC time base from which offsets are specified for finding - * the maximum capacity. - * @param period periodic offset at which capacities are evaluted. + * @param tick UTC time base from which offsets are specified for finding the + * maximum capacity. + * @param period periodic offset at which capacities are evaluated. * @return the maximum {@link Resource} across the specified time instants. * @return true if removal is successful, false otherwise */ public Resource getMaximumPeriodicCapacity(long tick, long period) { Resource maxResource; if (period < timePeriod) { - maxResource = - super.getMaximumPeriodicCapacity(tick % timePeriod, period); + maxResource = super.getMaximumPeriodicCapacity(tick % timePeriod, period); } else { // if period is greater than the length of PeriodicRLESparseAllocation, // only a single value exists in this interval. @@ -164,4 +212,30 @@ public class PeriodicRLESparseResourceAllocation extends return ret.toString(); } + @Override + public RLESparseResourceAllocation getRangeOverlapping(long start, long end) { + NavigableMap unrolledMap = new TreeMap<>(); + readLock.lock(); + try { + long relativeStart = (start >= 0) ? start % timePeriod : 0; + NavigableMap cumulativeMap = this.getCumulative(); + Long previous = cumulativeMap.floorKey(relativeStart); + previous = (previous != null) ? previous : 0; + for (long i = 0; i <= (end - start) / timePeriod; i++) { + for (Map.Entry e : cumulativeMap.entrySet()) { + long curKey = e.getKey() + (i * timePeriod); + if (curKey >= previous && (start + curKey - relativeStart) <= end) { + unrolledMap.put(curKey, e.getValue()); + } + } + } + RLESparseResourceAllocation rle = + new RLESparseResourceAllocation(unrolledMap, getResourceCalculator()); + rle.shift(start - relativeStart); + return rle; + } finally { + readLock.unlock(); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java index 504a250..9afa324 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java @@ -28,54 +28,58 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan public interface PlanEdit extends PlanContext, PlanView { /** - * Add a new {@link ReservationAllocation} to the plan + * Add a new {@link ReservationAllocation} to the plan. * * @param reservation the {@link ReservationAllocation} to be added to the * plan * @param isRecovering flag to indicate if reservation is being added as part * of failover or not * @return true if addition is successful, false otherwise + * @throws PlanningException if addition is unsuccessful */ - public boolean addReservation(ReservationAllocation reservation, + boolean addReservation(ReservationAllocation reservation, boolean isRecovering) throws PlanningException; /** * Updates an existing {@link ReservationAllocation} in the plan. This is - * required for re-negotiation + * required for re-negotiation. * * @param reservation the {@link ReservationAllocation} to be updated the plan * @return true if update is successful, false otherwise + * @throws PlanningException if update is unsuccessful */ - public boolean updateReservation(ReservationAllocation reservation) + boolean updateReservation(ReservationAllocation reservation) throws PlanningException; /** * Delete an existing {@link ReservationAllocation} from the plan identified * uniquely by its {@link ReservationId}. This will generally be used for - * garbage collection + * garbage collection. * * @param reservationID the {@link ReservationAllocation} to be deleted from * the plan identified uniquely by its {@link ReservationId} * @return true if delete is successful, false otherwise + * @throws PlanningException if deletion is unsuccessful */ - public boolean deleteReservation(ReservationId reservationID) + boolean deleteReservation(ReservationId reservationID) throws PlanningException; /** * Method invoked to garbage collect old reservations. It cleans up expired - * reservations that have fallen out of the sliding archival window + * reservations that have fallen out of the sliding archival window. * * @param tick the current time from which the archival window is computed + * @throws PlanningException if archival is unsuccessful */ - public void archiveCompletedReservations(long tick) throws PlanningException; + void archiveCompletedReservations(long tick) throws PlanningException; /** * Sets the overall capacity in terms of {@link Resource} assigned to this - * plan + * plan. * * @param capacity the overall capacity in terms of {@link Resource} assigned * to this plan */ - public void setTotalCapacity(Resource capacity); + void setTotalCapacity(Resource capacity); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5ccdd839/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java index 2767993..4035f68 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java @@ -17,50 +17,50 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.reservation; +import java.util.Set; + import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; -import java.util.Set; - /** * This interface provides a read-only view on the allocations made in this * plan. This methods are used for example by {@code ReservationAgent}s to * determine the free resources in a certain point in time, and by * PlanFollowerPolicy to publish this plan to the scheduler. */ -public interface PlanView extends PlanContext { +interface PlanView extends PlanContext { /** * Return a set of {@link ReservationAllocation} identified by the user who * made the reservation. * * @param reservationID the unqiue id to identify the - * {@link ReservationAllocation} + * {@link ReservationAllocation} * @param interval the time interval used to retrieve the reservation - * allocations from. Only reservations with start time no - * greater than the interval end time, and end time no less - * than the interval start time will be selected. + * allocations from. Only reservations with start time no greater + * than the interval end time, and end time no less than the interval + * start time will be selected. * @param user the user to retrieve the reservation allocation from. * @return a set of {@link ReservationAllocation} identified by the user who - * made the reservation + * made the reservation */ - Set getReservations(ReservationId - reservationID, ReservationInterval interval, String user); + Set getReservations(ReservationId reservationID, + ReservationInterval interval, String user); /** * Return a set of {@link ReservationAllocation} identified by any user. * * @param reservationID the unqiue id to identify the - * {@link ReservationAllocation} + * {@link ReservationAllocation} * @param interval the time interval used to retrieve the reservation - * allocations from. Only reservations with start time no - * greater than the interval end time, and end time no less - * than the interval start time will be selected. + * allocations from. Only reservations with start time no greater + * than the interval end time, and end time no less than the interval + * start time will be selected. * @return a set of {@link ReservationAllocation} identified by any user */ Set getReservations(ReservationId reservationID, - ReservationInterval interval); + ReservationInterval interval); /** * Return a {@link ReservationAllocation} identified by its @@ -70,7 +70,7 @@ public interface PlanView extends PlanContext { * {@link ReservationAllocation} * @return {@link ReservationAllocation} identified by the specified id */ - public ReservationAllocation getReservationById(ReservationId reservationID); + ReservationAllocation getReservationById(ReservationId reservationID); /** * Return a set of {@link ReservationAllocation} that belongs to a certain @@ -78,11 +78,10 @@ public interface PlanView extends PlanContext { * * @param user the user being considered * @param t the instant in time being considered - * @return set of active {@link ReservationAllocation}s for this - * user at this time + * @return set of active {@link ReservationAllocation}s for this user at this + * time */ - public Set getReservationByUserAtTime(String user, - long t); + Set getReservationByUserAtTime(String user, long t); /** * Gets all the active reservations at the specified point of time @@ -91,14 +90,14 @@ public interface PlanView extends PlanContext { * requested * @return set of active reservations at the specified time */ - public Set getReservationsAtTime(long tick); + Set getReservationsAtTime(long tick); /** * Gets all the reservations in the plan * * @return set of all reservations handled by this Plan */ - public Set getAllReservations(); + Set getAllReservations(); /** * Returns the total {@link Resource} reserved for all users at the specified @@ -126,61 +125,68 @@ public interface PlanView extends PlanContext { * * @return the time (UTC in ms) at which the first reservation starts */ - public long getEarliestStartTime(); + long getEarliestStartTime(); /** * Returns the time (UTC in ms) at which the last reservation terminates * * @return the time (UTC in ms) at which the last reservation terminates */ - public long getLastEndTime(); + long getLastEndTime(); /** * This method returns the amount of resources available to a given user * (optionally if removing a certain reservation) over the start-end time - * range. + * range. If the request is periodic (period is non-zero) we return the + * minimum amount of resources available to periodic reservations (in all + * "period" windows within the system maxPeriod / LCM). * - * @param user - * @param oldId - * @param start - * @param end + * @param user the user being considered + * @param oldId the identifier of the existing reservation + * @param start start of the time interval. + * @param end end of the time interval. + * @param period the ms periodicty for this request (loop and pick min till + * maxPeriodicity) * @return a view of the plan as it is available to this user - * @throws PlanningException + * @throws PlanningException if operation is unsuccessful */ - public RLESparseResourceAllocation getAvailableResourceOverTime(String user, - ReservationId oldId, long start, long end) throws PlanningException; + RLESparseResourceAllocation getAvailableResourceOverTime(String user, + ReservationId oldId, long start, long end, long period) + throws PlanningException; /** * This method returns a RLE encoded view of the user reservation count * utilization between start and end time. * - * @param user - * @param start - * @param end + * @param user the user being considered + * @param start start of the time interval. + * @param end end of the time interval. * @return RLE encoded view of reservation used over time */ - public RLESparseResourceAllocation getReservationCountForUserOverTime( - String user, long start, long end); + RLESparseResourceAllocation getReservationCountForUserOverTime(String user, + long start, long end); /** * This method returns a RLE encoded view of the user reservation utilization * between start and end time. * - * @param user - * @param start - * @param end + * @param user the user being considered + * @param start start of the time interval. + * @param end end of the time interval. * @return RLE encoded view of resources used over time */ - public RLESparseResourceAllocation getConsumptionForUserOverTime(String user, + RLESparseResourceAllocation getConsumptionForUserOverTime(String user, long start, long end); /** * Get the cumulative load over a time interval. * - * @param start Start of the time interval. - * @param end End of the time interval. + * @param start start of the time interval. + * @param end end of the time interval. * @return RLE sparse allocation. + * @throws PlanningException if operation is unsuccessful */ - RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end); + RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end) + throws PlanningException; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org