Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 08605200CF0 for ; Thu, 7 Sep 2017 22:20:11 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 063521613D3; Thu, 7 Sep 2017 20:20:11 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7BEB41609D0 for ; Thu, 7 Sep 2017 22:20:08 +0200 (CEST) Received: (qmail 39817 invoked by uid 500); 7 Sep 2017 20:20:06 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 38707 invoked by uid 99); 7 Sep 2017 20:20:05 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 Sep 2017 20:20:05 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DAEACF56F3; Thu, 7 Sep 2017 20:20:04 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: inigoiri@apache.org To: common-commits@hadoop.apache.org Date: Thu, 07 Sep 2017 20:20:10 -0000 Message-Id: <89f157c7dcc046d5bbe5eb5dad389c8e@git.apache.org> In-Reply-To: <1d9998d95bc44bf0b65342610a15e083@git.apache.org> References: <1d9998d95bc44bf0b65342610a15e083@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [07/40] hadoop git commit: Revert "Plan/ResourceAllocation data structure enhancements required to support recurring reservations in ReservationSystem." archived-at: Thu, 07 Sep 2017 20:20:11 -0000 Revert "Plan/ResourceAllocation data structure enhancements required to support recurring reservations in ReservationSystem." This reverts commit 7996eca7dcfaa1bdf970e32022274f2699bef8a1. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e3345e98 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e3345e98 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e3345e98 Branch: refs/heads/HDFS-10467 Commit: e3345e985bff93c6c74a76747e45376c6027f42c Parents: 22de944 Author: Subru Krishnan Authored: Wed Sep 6 16:39:23 2017 -0700 Committer: Subru Krishnan Committed: Wed Sep 6 16:39:23 2017 -0700 ---------------------------------------------------------------------- .../hadoop/yarn/conf/YarnConfiguration.java | 6 - .../yarn/conf/TestYarnConfigurationFields.java | 4 +- .../reservation/AbstractReservationSystem.java | 90 ++-- .../AbstractSchedulerPlanFollower.java | 183 ++++---- .../reservation/InMemoryPlan.java | 400 +++++------------ .../InMemoryReservationAllocation.java | 36 +- .../reservation/NoOverCommitPolicy.java | 2 +- .../PeriodicRLESparseResourceAllocation.java | 130 ++---- .../resourcemanager/reservation/PlanEdit.java | 24 +- .../resourcemanager/reservation/PlanView.java | 94 ++-- .../RLESparseResourceAllocation.java | 115 +++-- .../reservation/ReservationAllocation.java | 60 +-- .../reservation/ReservationInputValidator.java | 134 +++--- .../reservation/ReservationSystem.java | 6 +- .../reservation/SharingPolicy.java | 13 +- .../reservation/planning/Planner.java | 2 +- .../reservation/planning/PlanningAlgorithm.java | 2 +- .../reservation/planning/StageAllocator.java | 2 +- .../planning/StageAllocatorGreedy.java | 2 +- .../planning/StageAllocatorGreedyRLE.java | 5 +- .../planning/StageAllocatorLowCostAligned.java | 4 +- .../reservation/ReservationSystemTestUtil.java | 135 +++--- .../reservation/TestInMemoryPlan.java | 431 +++++++------------ ...TestPeriodicRLESparseResourceAllocation.java | 109 ++--- .../TestRLESparseResourceAllocation.java | 122 +++--- .../planning/TestSimpleCapacityReplanner.java | 8 +- 26 files changed, 777 insertions(+), 1342 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3345e98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 27ca957..4944821 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -262,12 +262,6 @@ public class YarnConfiguration extends Configuration { public static final long DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP = 1000L; - /** The maximum periodicity for the Reservation System. */ - public static final String RM_RESERVATION_SYSTEM_MAX_PERIODICITY = - RM_PREFIX + "reservation-system.max-periodicity"; - public static final long DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY = - 86400000L; - /** * Enable periodic monitor threads. * @see #RM_SCHEDULER_MONITOR_POLICIES http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3345e98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java index 1d3111c..bd7bf93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java @@ -33,7 +33,7 @@ import org.apache.hadoop.conf.TestConfigurationFieldsBase; */ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { - @SuppressWarnings({"deprecation", "methodlength"}) + @SuppressWarnings("deprecation") @Override public void initializeMemberVariables() { xmlFilename = new String("yarn-default.xml"); @@ -69,8 +69,6 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase { configurationPropsToSkipCompare.add(YarnConfiguration .YARN_SECURITY_SERVICE_AUTHORIZATION_COLLECTOR_NODEMANAGER_PROTOCOL); configurationPropsToSkipCompare.add(YarnConfiguration.CURATOR_LEADER_ELECTOR); - configurationPropsToSkipCompare - .add(YarnConfiguration.RM_RESERVATION_SYSTEM_MAX_PERIODICITY); // Federation default configs to be ignored configurationPropsToSkipCompare http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3345e98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index 5b8772c..5ef4912 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -18,17 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; -import java.util.HashMap; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -57,6 +46,17 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + /** * This is the implementation of {@link ReservationSystem} based on the * {@link ResourceScheduler} @@ -66,8 +66,8 @@ import org.slf4j.LoggerFactory; public abstract class AbstractReservationSystem extends AbstractService implements ReservationSystem { - private static final Logger LOG = - LoggerFactory.getLogger(AbstractReservationSystem.class); + private static final Logger LOG = LoggerFactory + .getLogger(AbstractReservationSystem.class); // private static final String DEFAULT_CAPACITY_SCHEDULER_PLAN @@ -103,8 +103,6 @@ public abstract class AbstractReservationSystem extends AbstractService private boolean isRecoveryEnabled = false; - private long maxPeriodicity; - /** * Construct the service. * @@ -145,41 +143,36 @@ public abstract class AbstractReservationSystem extends AbstractService this.conf = conf; scheduler = rmContext.getScheduler(); // Get the plan step size - planStepSize = conf.getTimeDuration( - YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, - YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, - TimeUnit.MILLISECONDS); + planStepSize = + conf.getTimeDuration( + YarnConfiguration.RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, + YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP, + TimeUnit.MILLISECONDS); if (planStepSize < 0) { planStepSize = YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_PLAN_FOLLOWER_TIME_STEP; } - maxPeriodicity = - conf.getLong(YarnConfiguration.RM_RESERVATION_SYSTEM_MAX_PERIODICITY, - YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY); - if (maxPeriodicity <= 0) { - maxPeriodicity = - YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY; - } // Create a plan corresponding to every reservable queue Set planQueueNames = scheduler.getPlanQueues(); for (String planQueueName : planQueueNames) { Plan plan = initializePlan(planQueueName); plans.put(planQueueName, plan); } - isRecoveryEnabled = conf.getBoolean(YarnConfiguration.RECOVERY_ENABLED, + isRecoveryEnabled = conf.getBoolean( + YarnConfiguration.RECOVERY_ENABLED, YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); if (conf.getBoolean(YarnConfiguration.YARN_RESERVATION_ACL_ENABLE, - YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE) - && conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE, - YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) { + YarnConfiguration.DEFAULT_YARN_RESERVATION_ACL_ENABLE) && + conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE, + YarnConfiguration.DEFAULT_YARN_ACL_ENABLE)) { reservationsACLsManager = new ReservationsACLsManager(scheduler, conf); } } private void loadPlan(String planName, Map reservations) - throws PlanningException { + throws PlanningException { Plan plan = plans.get(planName); Resource minAllocation = getMinAllocation(); ResourceCalculator rescCalculator = getResourceCalculator(); @@ -255,8 +248,8 @@ public abstract class AbstractReservationSystem extends AbstractService Class planFollowerPolicyClazz = conf.getClassByName(planFollowerPolicyClassName); if (PlanFollower.class.isAssignableFrom(planFollowerPolicyClazz)) { - return (PlanFollower) ReflectionUtils - .newInstance(planFollowerPolicyClazz, conf); + return (PlanFollower) ReflectionUtils.newInstance( + planFollowerPolicyClazz, conf); } else { throw new YarnRuntimeException("Class: " + planFollowerPolicyClassName + " not instance of " + PlanFollower.class.getCanonicalName()); @@ -264,8 +257,7 @@ public abstract class AbstractReservationSystem extends AbstractService } catch (ClassNotFoundException e) { throw new YarnRuntimeException( "Could not instantiate PlanFollowerPolicy: " - + planFollowerPolicyClassName, - e); + + planFollowerPolicyClassName, e); } } @@ -379,8 +371,9 @@ public abstract class AbstractReservationSystem extends AbstractService public ReservationId getNewReservationId() { writeLock.lock(); try { - ReservationId resId = ReservationId.newInstance( - ResourceManager.getClusterTimeStamp(), resCounter.incrementAndGet()); + ReservationId resId = + ReservationId.newInstance(ResourceManager.getClusterTimeStamp(), + resCounter.incrementAndGet()); LOG.info("Allocated new reservationId: " + resId); return resId; } finally { @@ -397,11 +390,8 @@ public abstract class AbstractReservationSystem extends AbstractService * Get the default reservation system corresponding to the scheduler * * @param scheduler the scheduler for which the reservation system is required - * - * @return the {@link ReservationSystem} based on the configured scheduler */ - public static String getDefaultReservationSystem( - ResourceScheduler scheduler) { + public static String getDefaultReservationSystem(ResourceScheduler scheduler) { if (scheduler instanceof CapacityScheduler) { return CapacityReservationSystem.class.getName(); } else if (scheduler instanceof FairScheduler) { @@ -419,11 +409,12 @@ public abstract class AbstractReservationSystem extends AbstractService Resource maxAllocation = getMaxAllocation(); ResourceCalculator rescCalc = getResourceCalculator(); Resource totCap = getPlanQueueCapacity(planQueueName); - Plan plan = new InMemoryPlan(getRootQueueMetrics(), adPolicy, - getAgent(planQueuePath), totCap, planStepSize, rescCalc, minAllocation, - maxAllocation, planQueueName, getReplanner(planQueuePath), - getReservationSchedulerConfiguration().getMoveOnExpiry(planQueuePath), - maxPeriodicity, rmContext); + Plan plan = + new InMemoryPlan(getRootQueueMetrics(), adPolicy, + getAgent(planQueuePath), totCap, planStepSize, rescCalc, + minAllocation, maxAllocation, planQueueName, + getReplanner(planQueuePath), getReservationSchedulerConfiguration() + .getMoveOnExpiry(planQueuePath), rmContext); LOG.info("Initialized plan {} based on reservable queue {}", plan.toString(), planQueueName); return plan; @@ -486,8 +477,8 @@ public abstract class AbstractReservationSystem extends AbstractService Class admissionPolicyClazz = conf.getClassByName(admissionPolicyClassName); if (SharingPolicy.class.isAssignableFrom(admissionPolicyClazz)) { - return (SharingPolicy) ReflectionUtils.newInstance(admissionPolicyClazz, - conf); + return (SharingPolicy) ReflectionUtils.newInstance( + admissionPolicyClazz, conf); } else { throw new YarnRuntimeException("Class: " + admissionPolicyClassName + " not instance of " + SharingPolicy.class.getCanonicalName()); @@ -502,7 +493,8 @@ public abstract class AbstractReservationSystem extends AbstractService return this.reservationsACLsManager; } - protected abstract ReservationSchedulerConfiguration getReservationSchedulerConfiguration(); + protected abstract ReservationSchedulerConfiguration + getReservationSchedulerConfiguration(); protected abstract String getPlanQueuePath(String planQueueName); http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3345e98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java index 9b6a0b0..90357e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java @@ -18,14 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; @@ -41,17 +33,24 @@ import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + public abstract class AbstractSchedulerPlanFollower implements PlanFollower { - private static final Logger LOG = - LoggerFactory.getLogger(AbstractSchedulerPlanFollower.class); + private static final Logger LOG = LoggerFactory + .getLogger(AbstractSchedulerPlanFollower.class); protected Collection plans = new ArrayList(); protected YarnScheduler scheduler; protected Clock clock; @Override - public void init(Clock clock, ResourceScheduler sched, - Collection plans) { + public void init(Clock clock, ResourceScheduler sched, Collection plans) { this.clock = clock; this.scheduler = sched; this.plans.addAll(plans); @@ -72,7 +71,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { @Override public synchronized void synchronizePlan(Plan plan, boolean shouldReplan) { - String planQueueName = plan.getQueueName(); + String planQueueName = plan.getQueueName(); if (LOG.isDebugEnabled()) { LOG.debug("Running plan follower edit policy for plan: " + planQueueName); } @@ -83,14 +82,12 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { now += step - (now % step); } Queue planQueue = getPlanQueue(planQueueName); - if (planQueue == null) { - return; - } + if (planQueue == null) return; // first we publish to the plan the current availability of resources Resource clusterResources = scheduler.getClusterResource(); - Resource planResources = - getPlanResources(plan, planQueue, clusterResources); + Resource planResources = getPlanResources(plan, planQueue, + clusterResources); Set currentReservations = plan.getReservationsAtTime(now); Set curReservationNames = new HashSet(); @@ -98,11 +95,12 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { int numRes = getReservedResources(now, currentReservations, curReservationNames, reservedResources); // create the default reservation queue if it doesnt exist - String defReservationId = getReservationIdFromQueueName(planQueueName) - + ReservationConstants.DEFAULT_QUEUE_SUFFIX; - String defReservationQueue = - getReservationQueueName(planQueueName, defReservationId); - createDefaultReservationQueue(planQueueName, planQueue, defReservationId); + String defReservationId = getReservationIdFromQueueName(planQueueName) + + ReservationConstants.DEFAULT_QUEUE_SUFFIX; + String defReservationQueue = getReservationQueueName(planQueueName, + defReservationId); + createDefaultReservationQueue(planQueueName, planQueue, + defReservationId); curReservationNames.add(defReservationId); // if the resources dedicated to this plan has shrunk invoke replanner boolean shouldResize = false; @@ -151,8 +149,10 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { // sort allocations from the one giving up the most resources, to the // one asking for the most avoid order-of-operation errors that // temporarily violate 100% capacity bound - List sortedAllocations = sortByDelta( - new ArrayList(currentReservations), now, plan); + List sortedAllocations = + sortByDelta( + new ArrayList(currentReservations), now, + plan); for (ReservationAllocation res : sortedAllocations) { String currResId = res.getReservationId().toString(); if (curReservationNames.contains(currResId)) { @@ -163,9 +163,10 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { if (planResources.getMemorySize() > 0 && planResources.getVirtualCores() > 0) { if (shouldResize) { - capToAssign = calculateReservationToPlanProportion( - plan.getResourceCalculator(), planResources, reservedResources, - capToAssign); + capToAssign = + calculateReservationToPlanProportion( + plan.getResourceCalculator(), planResources, + reservedResources, capToAssign); } targetCapacity = calculateReservationToPlanRatio(plan.getResourceCalculator(), @@ -184,8 +185,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { maxCapacity = targetCapacity; } try { - setQueueEntitlement(planQueueName, currResId, targetCapacity, - maxCapacity); + setQueueEntitlement(planQueueName, currResId, targetCapacity, maxCapacity); } catch (YarnException e) { LOG.warn("Exception while trying to size reservation for plan: {}", currResId, planQueueName, e); @@ -196,10 +196,9 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { // compute the default queue capacity float defQCap = 1.0f - totalAssignedCapacity; if (LOG.isDebugEnabled()) { - LOG.debug( - "PlanFollowerEditPolicyTask: total Plan Capacity: {} " - + "currReservation: {} default-queue capacity: {}", - planResources, numRes, defQCap); + LOG.debug("PlanFollowerEditPolicyTask: total Plan Capacity: {} " + + "currReservation: {} default-queue capacity: {}", planResources, + numRes, defQCap); } // set the default queue to eat-up all remaining capacity try { @@ -226,11 +225,12 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { } protected void setQueueEntitlement(String planQueueName, String currResId, - float targetCapacity, float maxCapacity) throws YarnException { - String reservationQueueName = - getReservationQueueName(planQueueName, currResId); - scheduler.setEntitlement(reservationQueueName, - new QueueEntitlement(targetCapacity, maxCapacity)); + float targetCapacity, + float maxCapacity) throws YarnException { + String reservationQueueName = getReservationQueueName(planQueueName, + currResId); + scheduler.setEntitlement(reservationQueueName, new QueueEntitlement( + targetCapacity, maxCapacity)); } // Schedulers have different ways of naming queues. See YARN-2773 @@ -244,21 +244,14 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { * Then move all apps in the set of queues to the parent plan queue's default * reservation queue if move is enabled. Finally cleanups the queue by killing * any apps (if move is disabled or move failed) and removing the queue - * - * @param planQueueName the name of {@code PlanQueue} - * @param shouldMove flag to indicate if any running apps should be moved or - * killed - * @param toRemove the remnant apps to clean up - * @param defReservationQueue the default {@code ReservationQueue} of the - * {@link Plan} */ - protected void cleanupExpiredQueues(String planQueueName, boolean shouldMove, - Set toRemove, String defReservationQueue) { + protected void cleanupExpiredQueues(String planQueueName, + boolean shouldMove, Set toRemove, String defReservationQueue) { for (String expiredReservationId : toRemove) { try { // reduce entitlement to 0 - String expiredReservation = - getReservationQueueName(planQueueName, expiredReservationId); + String expiredReservation = getReservationQueueName(planQueueName, + expiredReservationId); setQueueEntitlement(planQueueName, expiredReservation, 0.0f, 0.0f); if (shouldMove) { moveAppsInQueueSync(expiredReservation, defReservationQueue); @@ -282,7 +275,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { * reservation queue in a synchronous fashion */ private void moveAppsInQueueSync(String expiredReservation, - String defReservationQueue) { + String defReservationQueue) { List activeApps = scheduler.getAppsInQueue(expiredReservation); if (activeApps.isEmpty()) { @@ -294,16 +287,16 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { scheduler.moveApplication(app.getApplicationId(), defReservationQueue); } catch (YarnException e) { LOG.warn( - "Encountered unexpected error during migration of application: {}" - + " from reservation: {}", + "Encountered unexpected error during migration of application: {}" + + " from reservation: {}", app, expiredReservation, e); } } } - protected int getReservedResources(long now, - Set currentReservations, - Set curReservationNames, Resource reservedResources) { + protected int getReservedResources(long now, Set + currentReservations, Set curReservationNames, + Resource reservedResources) { int numRes = 0; if (currentReservations != null) { numRes = currentReservations.size(); @@ -319,30 +312,23 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { * Sort in the order from the least new amount of resources asked (likely * negative) to the highest. This prevents "order-of-operation" errors related * to exceeding 100% capacity temporarily. - * - * @param currentReservations the currently active reservations - * @param now the current time - * @param plan the {@link Plan} that is being considered - * - * @return the sorted list of {@link ReservationAllocation}s */ protected List sortByDelta( List currentReservations, long now, Plan plan) { - Collections.sort(currentReservations, - new ReservationAllocationComparator(now, this, plan)); + Collections.sort(currentReservations, new ReservationAllocationComparator( + now, this, plan)); return currentReservations; } /** - * Get queue associated with reservable queue named. - * - * @param planQueueName name of the reservable queue + * Get queue associated with reservable queue named + * @param planQueueName Name of the reservable queue * @return queue associated with the reservable queue */ protected abstract Queue getPlanQueue(String planQueueName); /** - * Resizes reservations based on currently available resources. + * Resizes reservations based on currently available resources */ private Resource calculateReservationToPlanProportion( ResourceCalculator rescCalculator, Resource availablePlanResources, @@ -352,7 +338,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { } /** - * Calculates ratio of reservationResources to planResources. + * Calculates ratio of reservationResources to planResources */ private float calculateReservationToPlanRatio( ResourceCalculator rescCalculator, Resource clusterResources, @@ -362,7 +348,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { } /** - * Check if plan resources are less than expected reservation resources. + * Check if plan resources are less than expected reservation resources */ private boolean arePlanResourcesLessThanReservations( ResourceCalculator rescCalculator, Resource clusterResources, @@ -372,56 +358,38 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { } /** - * Get a list of reservation queues for this planQueue. - * - * @param planQueue the queue for the current {@link Plan} - * - * @return the queues corresponding to the reservations + * Get a list of reservation queues for this planQueue */ protected abstract List getChildReservationQueues( Queue planQueue); /** - * Add a new reservation queue for reservation currResId for this planQueue. + * Add a new reservation queue for reservation currResId for this planQueue */ - protected abstract void addReservationQueue(String planQueueName, Queue queue, - String currResId); + protected abstract void addReservationQueue( + String planQueueName, Queue queue, String currResId); /** - * Creates the default reservation queue for use when no reservation is used - * for applications submitted to this planQueue. - * - * @param planQueueName name of the reservable queue - * @param queue the queue for the current {@link Plan} - * @param defReservationQueue name of the default {@code ReservationQueue} + * Creates the default reservation queue for use when no reservation is + * used for applications submitted to this planQueue */ - protected abstract void createDefaultReservationQueue(String planQueueName, - Queue queue, String defReservationQueue); + protected abstract void createDefaultReservationQueue( + String planQueueName, Queue queue, String defReservationQueue); /** - * Get plan resources for this planQueue. - * - * @param plan the current {@link Plan} being considered - * @param clusterResources the resources available in the cluster - * - * @return the resources allocated to the specified {@link Plan} + * Get plan resources for this planQueue */ - protected abstract Resource getPlanResources(Plan plan, Queue queue, - Resource clusterResources); + protected abstract Resource getPlanResources( + Plan plan, Queue queue, Resource clusterResources); /** * Get reservation queue resources if it exists otherwise return null. - * - * @param plan the current {@link Plan} being considered - * @param reservationId the identifier of the reservation - * - * @return the resources allocated to the specified reservation */ protected abstract Resource getReservationQueueResourceIfExists(Plan plan, ReservationId reservationId); - private static class ReservationAllocationComparator - implements Comparator { + private static class ReservationAllocationComparator implements + Comparator { AbstractSchedulerPlanFollower planFollower; long now; Plan plan; @@ -436,12 +404,14 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { private Resource getUnallocatedReservedResources( ReservationAllocation reservation) { Resource resResource; - Resource reservationResource = - planFollower.getReservationQueueResourceIfExists(plan, - reservation.getReservationId()); + Resource reservationResource = planFollower + .getReservationQueueResourceIfExists + (plan, reservation.getReservationId()); if (reservationResource != null) { - resResource = Resources.subtract(reservation.getResourcesAtTime(now), - reservationResource); + resResource = + Resources.subtract( + reservation.getResourcesAtTime(now), + reservationResource); } else { resResource = reservation.getResourcesAtTime(now); } @@ -458,3 +428,4 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { } } } + http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3345e98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java index 9eb1820..783fd09 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * + * http://www.apache.org/licenses/LICENSE-2.0 + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,10 +33,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; -import org.apache.hadoop.yarn.server.resourcemanager.reservation.RLESparseResourceAllocation.RLEOperator; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; @@ -65,14 +64,9 @@ public class InMemoryPlan implements Plan { private RLESparseResourceAllocation rleSparseVector; - private PeriodicRLESparseResourceAllocation periodicRle; - private Map userResourceAlloc = new HashMap(); - private Map userPeriodicResourceAlloc = - new HashMap(); - private Map userActiveReservationCount = new HashMap(); @@ -102,27 +96,15 @@ public class InMemoryPlan implements Plan { String queueName, Planner replanner, boolean getMoveOnExpiry, RMContext rmContext) { this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc, - maxAlloc, queueName, replanner, getMoveOnExpiry, - YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY, - rmContext); + maxAlloc, queueName, replanner, getMoveOnExpiry, rmContext, + new UTCClock()); } public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, ReservationAgent agent, Resource totalCapacity, long step, ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, String queueName, Planner replanner, boolean getMoveOnExpiry, - long maxPeriodicty, RMContext rmContext) { - this(queueMetrics, policy, agent, totalCapacity, step, resCalc, minAlloc, - maxAlloc, queueName, replanner, getMoveOnExpiry, maxPeriodicty, - rmContext, new UTCClock()); - } - - @SuppressWarnings("checkstyle:parameternumber") - public InMemoryPlan(QueueMetrics queueMetrics, SharingPolicy policy, - ReservationAgent agent, Resource totalCapacity, long step, - ResourceCalculator resCalc, Resource minAlloc, Resource maxAlloc, - String queueName, Planner replanner, boolean getMoveOnExpiry, - long maxPeriodicty, RMContext rmContext, Clock clock) { + RMContext rmContext, Clock clock) { this.queueMetrics = queueMetrics; this.policy = policy; this.agent = agent; @@ -132,8 +114,6 @@ public class InMemoryPlan implements Plan { this.minAlloc = minAlloc; this.maxAlloc = maxAlloc; this.rleSparseVector = new RLESparseResourceAllocation(resCalc); - this.periodicRle = - new PeriodicRLESparseResourceAllocation(resCalc, maxPeriodicty); this.queueName = queueName; this.replanner = replanner; this.getMoveOnExpiry = getMoveOnExpiry; @@ -146,39 +126,6 @@ public class InMemoryPlan implements Plan { return queueMetrics; } - private RLESparseResourceAllocation getUserRLEResourceAllocation(String user, - long period) { - RLESparseResourceAllocation resAlloc = null; - if (period > 0) { - if (userPeriodicResourceAlloc.containsKey(user)) { - resAlloc = userPeriodicResourceAlloc.get(user); - } else { - resAlloc = new PeriodicRLESparseResourceAllocation(resCalc, - periodicRle.getTimePeriod()); - userPeriodicResourceAlloc.put(user, resAlloc); - } - } else { - if (userResourceAlloc.containsKey(user)) { - resAlloc = userResourceAlloc.get(user); - } else { - resAlloc = new RLESparseResourceAllocation(resCalc); - userResourceAlloc.put(user, resAlloc); - } - } - return resAlloc; - } - - private void gcUserRLEResourceAllocation(String user, long period) { - if (period > 0) { - if (userPeriodicResourceAlloc.get(user).isEmpty()) { - userPeriodicResourceAlloc.remove(user); - } - } else { - if (userResourceAlloc.get(user).isEmpty()) { - userResourceAlloc.remove(user); - } - } - } private void incrementAllocation(ReservationAllocation reservation) { assert (readWriteLock.isWriteLockedByCurrentThread()); @@ -186,10 +133,11 @@ public class InMemoryPlan implements Plan { reservation.getAllocationRequests(); // check if we have encountered the user earlier and if not add an entry String user = reservation.getUser(); - long period = reservation.getPeriodicity(); - RLESparseResourceAllocation resAlloc = - getUserRLEResourceAllocation(user, period); - + RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user); + if (resAlloc == null) { + resAlloc = new RLESparseResourceAllocation(resCalc); + userResourceAlloc.put(user, resAlloc); + } RLESparseResourceAllocation resCount = userActiveReservationCount.get(user); if (resCount == null) { resCount = new RLESparseResourceAllocation(resCalc); @@ -201,43 +149,14 @@ public class InMemoryPlan implements Plan { for (Map.Entry r : allocationRequests .entrySet()) { - - if (period > 0L) { - for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) { - - long rStart = r.getKey().getStartTime() + i * period; - long rEnd = r.getKey().getEndTime() + i * period; - - // handle wrap-around - if (rEnd > periodicRle.getTimePeriod()) { - long diff = rEnd - periodicRle.getTimePeriod(); - rEnd = periodicRle.getTimePeriod(); - ReservationInterval newInterval = new ReservationInterval(0, diff); - periodicRle.addInterval(newInterval, r.getValue()); - resAlloc.addInterval(newInterval, r.getValue()); - } - - ReservationInterval newInterval = - new ReservationInterval(rStart, rEnd); - periodicRle.addInterval(newInterval, r.getValue()); - resAlloc.addInterval(newInterval, r.getValue()); - } - - } else { - rleSparseVector.addInterval(r.getKey(), r.getValue()); - resAlloc.addInterval(r.getKey(), r.getValue()); - if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(), - ZERO_RESOURCE)) { - earliestActive = Math.min(earliestActive, r.getKey().getStartTime()); - latestActive = Math.max(latestActive, r.getKey().getEndTime()); - } + resAlloc.addInterval(r.getKey(), r.getValue()); + rleSparseVector.addInterval(r.getKey(), r.getValue()); + if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(), + ZERO_RESOURCE)) { + earliestActive = Math.min(earliestActive, r.getKey().getStartTime()); + latestActive = Math.max(latestActive, r.getKey().getEndTime()); } } - // periodic reservations are active from start time and good till cancelled - if (period > 0L) { - earliestActive = reservation.getStartTime(); - latestActive = Long.MAX_VALUE; - } resCount.addInterval(new ReservationInterval(earliestActive, latestActive), Resource.newInstance(1, 1)); } @@ -247,55 +166,27 @@ public class InMemoryPlan implements Plan { Map allocationRequests = reservation.getAllocationRequests(); String user = reservation.getUser(); - long period = reservation.getPeriodicity(); - RLESparseResourceAllocation resAlloc = - getUserRLEResourceAllocation(user, period); + RLESparseResourceAllocation resAlloc = userResourceAlloc.get(user); long earliestActive = Long.MAX_VALUE; long latestActive = Long.MIN_VALUE; for (Map.Entry r : allocationRequests .entrySet()) { - if (period > 0L) { - for (int i = 0; i < periodicRle.getTimePeriod() / period; i++) { - - long rStart = r.getKey().getStartTime() + i * period; - long rEnd = r.getKey().getEndTime() + i * period; - - // handle wrap-around - if (rEnd > periodicRle.getTimePeriod()) { - long diff = rEnd - periodicRle.getTimePeriod(); - rEnd = periodicRle.getTimePeriod(); - ReservationInterval newInterval = new ReservationInterval(0, diff); - periodicRle.removeInterval(newInterval, r.getValue()); - resAlloc.removeInterval(newInterval, r.getValue()); - } - - ReservationInterval newInterval = - new ReservationInterval(rStart, rEnd); - periodicRle.removeInterval(newInterval, r.getValue()); - resAlloc.removeInterval(newInterval, r.getValue()); - } - } else { - rleSparseVector.removeInterval(r.getKey(), r.getValue()); - resAlloc.removeInterval(r.getKey(), r.getValue()); - if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(), - ZERO_RESOURCE)) { - earliestActive = Math.min(earliestActive, r.getKey().getStartTime()); - latestActive = Math.max(latestActive, r.getKey().getEndTime()); - } + resAlloc.removeInterval(r.getKey(), r.getValue()); + rleSparseVector.removeInterval(r.getKey(), r.getValue()); + if (Resources.greaterThan(resCalc, totalCapacity, r.getValue(), + ZERO_RESOURCE)) { + earliestActive = Math.min(earliestActive, r.getKey().getStartTime()); + latestActive = Math.max(latestActive, r.getKey().getEndTime()); } } - gcUserRLEResourceAllocation(user, period); + if (resAlloc.isEmpty()) { + userResourceAlloc.remove(user); + } RLESparseResourceAllocation resCount = userActiveReservationCount.get(user); - // periodic reservations are active from start time and good till cancelled - if (period > 0L) { - earliestActive = reservation.getStartTime(); - latestActive = Long.MAX_VALUE; - } - resCount.removeInterval( - new ReservationInterval(earliestActive, latestActive), - Resource.newInstance(1, 1)); + resCount.removeInterval(new ReservationInterval(earliestActive, + latestActive), Resource.newInstance(1, 1)); if (resCount.isEmpty()) { userActiveReservationCount.remove(user); } @@ -307,9 +198,9 @@ public class InMemoryPlan implements Plan { if (currentReservations != null) { Set flattenedReservations = new TreeSet(); - for (Set res : currentReservations - .values()) { - flattenedReservations.addAll(res); + for (Set reservationEntries : + currentReservations.values()) { + flattenedReservations.addAll(reservationEntries); } return flattenedReservations; } else { @@ -327,16 +218,19 @@ public class InMemoryPlan implements Plan { InMemoryReservationAllocation inMemReservation = (InMemoryReservationAllocation) reservation; if (inMemReservation.getUser() == null) { - String errMsg = "The specified Reservation with ID " - + inMemReservation.getReservationId() + " is not mapped to any user"; + String errMsg = + "The specified Reservation with ID " + + inMemReservation.getReservationId() + + " is not mapped to any user"; LOG.error(errMsg); throw new IllegalArgumentException(errMsg); } writeLock.lock(); try { if (reservationTable.containsKey(inMemReservation.getReservationId())) { - String errMsg = "The specified Reservation with ID " - + inMemReservation.getReservationId() + " already exists"; + String errMsg = + "The specified Reservation with ID " + + inMemReservation.getReservationId() + " already exists"; LOG.error(errMsg); throw new IllegalArgumentException(errMsg); } @@ -352,8 +246,9 @@ public class InMemoryPlan implements Plan { getQueueName(), inMemReservation.getReservationId().toString()); } } - ReservationInterval searchInterval = new ReservationInterval( - inMemReservation.getStartTime(), inMemReservation.getEndTime()); + ReservationInterval searchInterval = + new ReservationInterval(inMemReservation.getStartTime(), + inMemReservation.getEndTime()); Set reservations = currentReservations.get(searchInterval); if (reservations == null) { @@ -385,8 +280,9 @@ public class InMemoryPlan implements Plan { ReservationId resId = reservation.getReservationId(); ReservationAllocation currReservation = getReservationById(resId); if (currReservation == null) { - String errMsg = "The specified Reservation with ID " + resId - + " does not exist in the plan"; + String errMsg = + "The specified Reservation with ID " + resId + + " does not exist in the plan"; LOG.error(errMsg); throw new IllegalArgumentException(errMsg); } @@ -422,8 +318,9 @@ public class InMemoryPlan implements Plan { private boolean removeReservation(ReservationAllocation reservation) { assert (readWriteLock.isWriteLockedByCurrentThread()); - ReservationInterval searchInterval = new ReservationInterval( - reservation.getStartTime(), reservation.getEndTime()); + ReservationInterval searchInterval = + new ReservationInterval(reservation.getStartTime(), + reservation.getEndTime()); Set reservations = currentReservations.get(searchInterval); if (reservations != null) { @@ -440,15 +337,16 @@ public class InMemoryPlan implements Plan { currentReservations.remove(searchInterval); } } else { - String errMsg = "The specified Reservation with ID " - + reservation.getReservationId() + " does not exist in the plan"; + String errMsg = + "The specified Reservation with ID " + reservation.getReservationId() + + " does not exist in the plan"; LOG.error(errMsg); throw new IllegalArgumentException(errMsg); } reservationTable.remove(reservation.getReservationId()); decrementAllocation(reservation); LOG.info("Sucessfully deleted reservation: {} in plan.", - reservation.getReservationId()); + reservation.getReservationId()); return true; } @@ -458,8 +356,9 @@ public class InMemoryPlan implements Plan { try { ReservationAllocation reservation = getReservationById(reservationID); if (reservation == null) { - String errMsg = "The specified Reservation with ID " + reservationID - + " does not exist in the plan"; + String errMsg = + "The specified Reservation with ID " + reservationID + + " does not exist in the plan"; LOG.error(errMsg); throw new IllegalArgumentException(errMsg); } @@ -554,90 +453,66 @@ public class InMemoryPlan implements Plan { long start, long end) { readLock.lock(); try { - // merge periodic and non-periodic allocations RLESparseResourceAllocation userResAlloc = userResourceAlloc.get(user); - RLESparseResourceAllocation userPeriodicResAlloc = - userPeriodicResourceAlloc.get(user); - if (userResAlloc != null && userPeriodicResAlloc != null) { - return RLESparseResourceAllocation.merge(resCalc, totalCapacity, - userResAlloc, userPeriodicResAlloc, RLEOperator.add, start, end); - } if (userResAlloc != null) { return userResAlloc.getRangeOverlapping(start, end); + } else { + return new RLESparseResourceAllocation(resCalc); } - if (userPeriodicResAlloc != null) { - return userPeriodicResAlloc.getRangeOverlapping(start, end); - } - } catch (PlanningException e) { - LOG.warn("Exception while trying to merge periodic" - + " and non-periodic user allocations: {}", e.getMessage(), e); } finally { readLock.unlock(); } - return new RLESparseResourceAllocation(resCalc); } @Override public Resource getTotalCommittedResources(long t) { readLock.lock(); try { - return Resources.add(rleSparseVector.getCapacityAtTime(t), - periodicRle.getCapacityAtTime(t)); + return rleSparseVector.getCapacityAtTime(t); } finally { readLock.unlock(); } } @Override - public Set getReservations(ReservationId reservationID, - ReservationInterval interval) { + public Set getReservations(ReservationId + reservationID, ReservationInterval interval) { return getReservations(reservationID, interval, null); } @Override - public Set getReservations(ReservationId reservationID, - ReservationInterval interval, String user) { + public Set getReservations(ReservationId + reservationID, ReservationInterval interval, String user) { if (reservationID != null) { ReservationAllocation allocation = getReservationById(reservationID); - if (allocation == null) { + if (allocation == null){ return Collections.emptySet(); } return Collections.singleton(allocation); } - long startTime = interval == null ? 0 : interval.getStartTime(); - long endTime = interval == null ? Long.MAX_VALUE : interval.getEndTime(); + long startTime = interval == null? 0 : interval.getStartTime(); + long endTime = interval == null? Long.MAX_VALUE : interval.getEndTime(); ReservationInterval searchInterval = - new ReservationInterval(endTime, Long.MAX_VALUE); + new ReservationInterval(endTime, Long.MAX_VALUE); readLock.lock(); try { - SortedMap> res = - currentReservations.headMap(searchInterval, true); - if (!res.isEmpty()) { - Set flattenedReservations = new HashSet<>(); - for (Set resEntries : res.values()) { - for (InMemoryReservationAllocation reservation : resEntries) { - // validate user - if (user != null && !user.isEmpty() - && !reservation.getUser().equals(user)) { - continue; - } - // handle periodic reservations - long period = reservation.getPeriodicity(); - if (period > 0) { - long t = endTime % period; - // check for both contained and wrap-around reservations - if ((t - startTime) * (t - endTime) - * (startTime - endTime) >= 0) { - flattenedReservations.add(reservation); - } - } else { - // check for non-periodic reservations - if (reservation.getEndTime() > startTime) { - flattenedReservations.add(reservation); + SortedMap> + reservations = currentReservations.headMap(searchInterval, true); + if (!reservations.isEmpty()) { + Set flattenedReservations = + new HashSet<>(); + for (Set reservationEntries : + reservations.values()) { + for (InMemoryReservationAllocation res : reservationEntries) { + if (res.getEndTime() > startTime) { + if (user != null && !user.isEmpty() + && !res.getUser().equals(user)) { + continue; } + flattenedReservations.add(res); } } } @@ -675,82 +550,36 @@ public class InMemoryPlan implements Plan { @Override public RLESparseResourceAllocation getAvailableResourceOverTime(String user, - ReservationId oldId, long start, long end, long period) - throws PlanningException { + ReservationId oldId, long start, long end) throws PlanningException { readLock.lock(); try { - - // for non-periodic return simple available resources - if (period == 0) { - - // create RLE of totCapacity - TreeMap totAvailable = new TreeMap(); - totAvailable.put(start, Resources.clone(totalCapacity)); - RLESparseResourceAllocation totRLEAvail = - new RLESparseResourceAllocation(totAvailable, resCalc); - - // subtract used from available - RLESparseResourceAllocation netAvailable; - - netAvailable = RLESparseResourceAllocation.merge(resCalc, - Resources.clone(totalCapacity), totRLEAvail, rleSparseVector, - RLEOperator.subtractTestNonNegative, start, end); - - // remove periodic component - netAvailable = RLESparseResourceAllocation.merge(resCalc, - Resources.clone(totalCapacity), netAvailable, periodicRle, - RLEOperator.subtractTestNonNegative, start, end); - - // add back in old reservation used resources if any - ReservationAllocation old = reservationTable.get(oldId); - if (old != null) { - - RLESparseResourceAllocation addBackPrevious = - old.getResourcesOverTime(start, end); - netAvailable = RLESparseResourceAllocation.merge(resCalc, - Resources.clone(totalCapacity), netAvailable, addBackPrevious, - RLEOperator.add, start, end); - } - // lower it if this is needed by the sharing policy - netAvailable = getSharingPolicy().availableResources(netAvailable, this, - user, oldId, start, end); - return netAvailable; - } else { - - if (periodicRle.getTimePeriod() % period != 0) { - throw new PlanningException("The reservation periodicity (" + period - + ") must be" + "an exact divider of the system maxPeriod (" - + periodicRle.getTimePeriod() + ")"); - } - - // find the minimum resources available among all the instances that fit - // in the LCM - long numInstInLCM = periodicRle.getTimePeriod() / period; - - RLESparseResourceAllocation minOverLCM = - getAvailableResourceOverTime(user, oldId, start, end, 0); - for (int i = 1; i < numInstInLCM; i++) { - - long rStart = start + i * period; - long rEnd = end + i * period; - - // recursive invocation of non-periodic range (to pick raw-info) - RLESparseResourceAllocation snapShot = - getAvailableResourceOverTime(user, oldId, rStart, rEnd, 0); - - // time-align on start - snapShot.shift(-(i * period)); - - // pick the minimum amount of resources in each time interval - minOverLCM = - RLESparseResourceAllocation.merge(resCalc, getTotalCapacity(), - minOverLCM, snapShot, RLEOperator.min, start, end); - - } - - return minOverLCM; - + // create RLE of totCapacity + TreeMap totAvailable = new TreeMap(); + totAvailable.put(start, Resources.clone(totalCapacity)); + RLESparseResourceAllocation totRLEAvail = + new RLESparseResourceAllocation(totAvailable, resCalc); + + // subtract used from available + RLESparseResourceAllocation netAvailable; + + netAvailable = + RLESparseResourceAllocation.merge(resCalc, + Resources.clone(totalCapacity), totRLEAvail, rleSparseVector, + RLEOperator.subtractTestNonNegative, start, end); + + // add back in old reservation used resources if any + ReservationAllocation old = reservationTable.get(oldId); + if (old != null) { + netAvailable = + RLESparseResourceAllocation.merge(resCalc, + Resources.clone(totalCapacity), netAvailable, + old.getResourcesOverTime(), RLEOperator.add, start, end); } + // lower it if this is needed by the sharing policy + netAvailable = + getSharingPolicy().availableResources(netAvailable, this, user, + oldId, start, end); + return netAvailable; } finally { readLock.unlock(); } @@ -808,7 +637,7 @@ public class InMemoryPlan implements Plan { public String toCumulativeString() { readLock.lock(); try { - return rleSparseVector.toString() + "\n" + periodicRle.toString(); + return rleSparseVector.toString(); } finally { readLock.unlock(); } @@ -860,18 +689,11 @@ public class InMemoryPlan implements Plan { } @Override - public RLESparseResourceAllocation getCumulativeLoadOverTime(long start, - long end) throws PlanningException { + public RLESparseResourceAllocation getCumulativeLoadOverTime( + long start, long end) { readLock.lock(); try { - - RLESparseResourceAllocation ret = - rleSparseVector.getRangeOverlapping(start, end); - ret = RLESparseResourceAllocation.merge(resCalc, totalCapacity, ret, - periodicRle.getRangeOverlapping(start, end), RLEOperator.add, start, - end); - - return ret; + return rleSparseVector.getRangeOverlapping(start, end); } finally { readLock.unlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3345e98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java index 00c8e44..69fd43f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryReservationAllocation.java @@ -42,7 +42,6 @@ public class InMemoryReservationAllocation implements ReservationAllocation { private final Map allocationRequests; private boolean hasGang = false; private long acceptedAt = -1; - private long periodicity = 0; private RLESparseResourceAllocation resourcesOverTime; @@ -68,16 +67,9 @@ public class InMemoryReservationAllocation implements ReservationAllocation { this.allocationRequests = allocations; this.planName = planName; this.hasGang = hasGang; - if (contract != null && contract.getRecurrenceExpression() != null) { - this.periodicity = Long.parseLong(contract.getRecurrenceExpression()); - } - if (periodicity > 0) { - resourcesOverTime = - new PeriodicRLESparseResourceAllocation(calculator, periodicity); - } else { - resourcesOverTime = new RLESparseResourceAllocation(calculator); - } - for (Map.Entry r : allocations.entrySet()) { + resourcesOverTime = new RLESparseResourceAllocation(calculator); + for (Map.Entry r : allocations + .entrySet()) { resourcesOverTime.addInterval(r.getKey(), r.getValue()); } } @@ -141,33 +133,17 @@ public class InMemoryReservationAllocation implements ReservationAllocation { } @Override - public RLESparseResourceAllocation getResourcesOverTime() { + public RLESparseResourceAllocation getResourcesOverTime(){ return resourcesOverTime; } @Override - public RLESparseResourceAllocation getResourcesOverTime(long start, - long end) { - return resourcesOverTime.getRangeOverlapping(start, end); - } - - @Override - public long getPeriodicity() { - return periodicity; - } - - @Override - public void setPeriodicity(long period) { - periodicity = period; - } - - @Override public String toString() { StringBuilder sBuf = new StringBuilder(); sBuf.append(getReservationId()).append(" user:").append(getUser()) .append(" startTime: ").append(getStartTime()).append(" endTime: ") - .append(getEndTime()).append(" Periodiciy: ").append(periodicity) - .append(" alloc:\n[").append(resourcesOverTime.toString()).append("] "); + .append(getEndTime()).append(" alloc:\n[") + .append(resourcesOverTime.toString()).append("] "); return sBuf.toString(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3345e98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java index 49d4702..55f1d00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/NoOverCommitPolicy.java @@ -40,7 +40,7 @@ public class NoOverCommitPolicy implements SharingPolicy { RLESparseResourceAllocation available = plan.getAvailableResourceOverTime( reservation.getUser(), reservation.getReservationId(), - reservation.getStartTime(), reservation.getEndTime(), 0); + reservation.getStartTime(), reservation.getEndTime()); // test the reservation does not exceed what is available try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3345e98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java index 7bc44f5..8e3be8b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PeriodicRLESparseResourceAllocation.java @@ -18,94 +18,47 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; - import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; - /** - * This data structure stores a periodic {@link RLESparseResourceAllocation}. + * This data structure stores a periodic RLESparseResourceAllocation. * Default period is 1 day (86400000ms). */ -public class PeriodicRLESparseResourceAllocation - extends RLESparseResourceAllocation { +public class PeriodicRLESparseResourceAllocation extends + RLESparseResourceAllocation { // Log - private static final Logger LOG = - LoggerFactory.getLogger(PeriodicRLESparseResourceAllocation.class); + private static final Logger LOG = LoggerFactory + .getLogger(PeriodicRLESparseResourceAllocation.class); private long timePeriod; /** * Constructor. * - * @param resourceCalculator {@link ResourceCalculator} the resource - * calculator to use. + * @param rleVector {@link RLESparseResourceAllocation} with the run-length + encoded data. * @param timePeriod Time period in milliseconds. */ public PeriodicRLESparseResourceAllocation( - ResourceCalculator resourceCalculator, Long timePeriod) { - super(resourceCalculator); + RLESparseResourceAllocation rleVector, Long timePeriod) { + super(rleVector.getCumulative(), rleVector.getResourceCalculator()); this.timePeriod = timePeriod; } /** * Constructor. Default time period set to 1 day. * - * @param resourceCalculator {@link ResourceCalculator} the resource - * calculator to use.. - */ - public PeriodicRLESparseResourceAllocation( - ResourceCalculator resourceCalculator) { - this(resourceCalculator, - YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_MAX_PERIODICITY); - } - - /** - * Constructor. - * * @param rleVector {@link RLESparseResourceAllocation} with the run-length - * encoded data. - * @param timePeriod Time period in milliseconds. + encoded data. */ - @VisibleForTesting public PeriodicRLESparseResourceAllocation( - RLESparseResourceAllocation rleVector, Long timePeriod) { - super(rleVector.getCumulative(), rleVector.getResourceCalculator()); - this.timePeriod = timePeriod; - - // make sure the PeriodicRLE is zero-based, and handles wrap-around - long delta = (getEarliestStartTime() % timePeriod - getEarliestStartTime()); - shift(delta); - - List toRemove = new ArrayList<>(); - Map toAdd = new TreeMap<>(); - - for (Map.Entry entry : cumulativeCapacity.entrySet()) { - if (entry.getKey() > timePeriod) { - toRemove.add(entry.getKey()); - if (entry.getValue() != null) { - toAdd.put(timePeriod, entry.getValue()); - long prev = entry.getKey() % timePeriod; - toAdd.put(prev, this.getCapacityAtTime(prev)); - toAdd.put(0L, entry.getValue()); - } - } - } - for (Long l : toRemove) { - cumulativeCapacity.remove(l); - } - cumulativeCapacity.putAll(toAdd); + RLESparseResourceAllocation rleVector) { + this(rleVector, 86400000L); } /** @@ -125,25 +78,24 @@ public class PeriodicRLESparseResourceAllocation * The interval may include 0, but the end time must be strictly less than * timePeriod. * - * @param interval {@link ReservationInterval} to which the specified resource - * is to be added. + * @param interval {@link ReservationInterval} to which the specified + * resource is to be added. * @param resource {@link Resource} to be added to the interval specified. * @return true if addition is successful, false otherwise */ - public boolean addInterval(ReservationInterval interval, Resource resource) { + public boolean addInterval(ReservationInterval interval, + Resource resource) { long startTime = interval.getStartTime(); long endTime = interval.getEndTime(); - if (startTime >= 0 && endTime > startTime && endTime <= timePeriod) { return super.addInterval(interval, resource); } else { - LOG.info("Cannot set capacity beyond end time: " + timePeriod + " was (" - + interval.toString() + ")"); + LOG.info("Cannot set capacity beyond end time: " + timePeriod); return false; } } - /** + /** * Removes a resource for the specified interval. * * @param interval the {@link ReservationInterval} for which the resource is @@ -151,15 +103,14 @@ public class PeriodicRLESparseResourceAllocation * @param resource the {@link Resource} to be removed. * @return true if removal is successful, false otherwise */ - public boolean removeInterval(ReservationInterval interval, - Resource resource) { + public boolean removeInterval( + ReservationInterval interval, Resource resource) { long startTime = interval.getStartTime(); long endTime = interval.getEndTime(); // If the resource to be subtracted is less than the minimum resource in // the range, abort removal to avoid negative capacity. - // TODO revesit decrementing endTime - if (!Resources.fitsIn(resource, getMinimumCapacityInInterval( - new ReservationInterval(startTime, endTime - 1)))) { + if (!Resources.fitsIn( + resource, super.getMinimumCapacityInInterval(interval))) { LOG.info("Request to remove more resources than what is available"); return false; } @@ -174,16 +125,17 @@ public class PeriodicRLESparseResourceAllocation /** * Get maximum capacity at periodic offsets from the specified time. * - * @param tick UTC time base from which offsets are specified for finding the - * maximum capacity. - * @param period periodic offset at which capacities are evaluated. + * @param tick UTC time base from which offsets are specified for finding + * the maximum capacity. + * @param period periodic offset at which capacities are evaluted. * @return the maximum {@link Resource} across the specified time instants. * @return true if removal is successful, false otherwise */ public Resource getMaximumPeriodicCapacity(long tick, long period) { Resource maxResource; if (period < timePeriod) { - maxResource = super.getMaximumPeriodicCapacity(tick % timePeriod, period); + maxResource = + super.getMaximumPeriodicCapacity(tick % timePeriod, period); } else { // if period is greater than the length of PeriodicRLESparseAllocation, // only a single value exists in this interval. @@ -212,30 +164,4 @@ public class PeriodicRLESparseResourceAllocation return ret.toString(); } - @Override - public RLESparseResourceAllocation getRangeOverlapping(long start, long end) { - NavigableMap unrolledMap = new TreeMap<>(); - readLock.lock(); - try { - long relativeStart = (start >= 0) ? start % timePeriod : 0; - NavigableMap cumulativeMap = this.getCumulative(); - Long previous = cumulativeMap.floorKey(relativeStart); - previous = (previous != null) ? previous : 0; - for (long i = 0; i <= (end - start) / timePeriod; i++) { - for (Map.Entry e : cumulativeMap.entrySet()) { - long curKey = e.getKey() + (i * timePeriod); - if (curKey >= previous && (start + curKey - relativeStart) <= end) { - unrolledMap.put(curKey, e.getValue()); - } - } - } - RLESparseResourceAllocation rle = - new RLESparseResourceAllocation(unrolledMap, getResourceCalculator()); - rle.shift(start - relativeStart); - return rle; - } finally { - readLock.unlock(); - } - } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3345e98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java index 9afa324..504a250 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java @@ -28,58 +28,54 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.Plan public interface PlanEdit extends PlanContext, PlanView { /** - * Add a new {@link ReservationAllocation} to the plan. + * Add a new {@link ReservationAllocation} to the plan * * @param reservation the {@link ReservationAllocation} to be added to the * plan * @param isRecovering flag to indicate if reservation is being added as part * of failover or not * @return true if addition is successful, false otherwise - * @throws PlanningException if addition is unsuccessful */ - boolean addReservation(ReservationAllocation reservation, + public boolean addReservation(ReservationAllocation reservation, boolean isRecovering) throws PlanningException; /** * Updates an existing {@link ReservationAllocation} in the plan. This is - * required for re-negotiation. + * required for re-negotiation * * @param reservation the {@link ReservationAllocation} to be updated the plan * @return true if update is successful, false otherwise - * @throws PlanningException if update is unsuccessful */ - boolean updateReservation(ReservationAllocation reservation) + public boolean updateReservation(ReservationAllocation reservation) throws PlanningException; /** * Delete an existing {@link ReservationAllocation} from the plan identified * uniquely by its {@link ReservationId}. This will generally be used for - * garbage collection. + * garbage collection * * @param reservationID the {@link ReservationAllocation} to be deleted from * the plan identified uniquely by its {@link ReservationId} * @return true if delete is successful, false otherwise - * @throws PlanningException if deletion is unsuccessful */ - boolean deleteReservation(ReservationId reservationID) + public boolean deleteReservation(ReservationId reservationID) throws PlanningException; /** * Method invoked to garbage collect old reservations. It cleans up expired - * reservations that have fallen out of the sliding archival window. + * reservations that have fallen out of the sliding archival window * * @param tick the current time from which the archival window is computed - * @throws PlanningException if archival is unsuccessful */ - void archiveCompletedReservations(long tick) throws PlanningException; + public void archiveCompletedReservations(long tick) throws PlanningException; /** * Sets the overall capacity in terms of {@link Resource} assigned to this - * plan. + * plan * * @param capacity the overall capacity in terms of {@link Resource} assigned * to this plan */ - void setTotalCapacity(Resource capacity); + public void setTotalCapacity(Resource capacity); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/e3345e98/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java index 4035f68..2767993 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanView.java @@ -17,50 +17,50 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.reservation; -import java.util.Set; - import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; +import java.util.Set; + /** * This interface provides a read-only view on the allocations made in this * plan. This methods are used for example by {@code ReservationAgent}s to * determine the free resources in a certain point in time, and by * PlanFollowerPolicy to publish this plan to the scheduler. */ -interface PlanView extends PlanContext { +public interface PlanView extends PlanContext { /** * Return a set of {@link ReservationAllocation} identified by the user who * made the reservation. * * @param reservationID the unqiue id to identify the - * {@link ReservationAllocation} + * {@link ReservationAllocation} * @param interval the time interval used to retrieve the reservation - * allocations from. Only reservations with start time no greater - * than the interval end time, and end time no less than the interval - * start time will be selected. + * allocations from. Only reservations with start time no + * greater than the interval end time, and end time no less + * than the interval start time will be selected. * @param user the user to retrieve the reservation allocation from. * @return a set of {@link ReservationAllocation} identified by the user who - * made the reservation + * made the reservation */ - Set getReservations(ReservationId reservationID, - ReservationInterval interval, String user); + Set getReservations(ReservationId + reservationID, ReservationInterval interval, String user); /** * Return a set of {@link ReservationAllocation} identified by any user. * * @param reservationID the unqiue id to identify the - * {@link ReservationAllocation} + * {@link ReservationAllocation} * @param interval the time interval used to retrieve the reservation - * allocations from. Only reservations with start time no greater - * than the interval end time, and end time no less than the interval - * start time will be selected. + * allocations from. Only reservations with start time no + * greater than the interval end time, and end time no less + * than the interval start time will be selected. * @return a set of {@link ReservationAllocation} identified by any user */ Set getReservations(ReservationId reservationID, - ReservationInterval interval); + ReservationInterval interval); /** * Return a {@link ReservationAllocation} identified by its @@ -70,7 +70,7 @@ interface PlanView extends PlanContext { * {@link ReservationAllocation} * @return {@link ReservationAllocation} identified by the specified id */ - ReservationAllocation getReservationById(ReservationId reservationID); + public ReservationAllocation getReservationById(ReservationId reservationID); /** * Return a set of {@link ReservationAllocation} that belongs to a certain @@ -78,10 +78,11 @@ interface PlanView extends PlanContext { * * @param user the user being considered * @param t the instant in time being considered - * @return set of active {@link ReservationAllocation}s for this user at this - * time + * @return set of active {@link ReservationAllocation}s for this + * user at this time */ - Set getReservationByUserAtTime(String user, long t); + public Set getReservationByUserAtTime(String user, + long t); /** * Gets all the active reservations at the specified point of time @@ -90,14 +91,14 @@ interface PlanView extends PlanContext { * requested * @return set of active reservations at the specified time */ - Set getReservationsAtTime(long tick); + public Set getReservationsAtTime(long tick); /** * Gets all the reservations in the plan * * @return set of all reservations handled by this Plan */ - Set getAllReservations(); + public Set getAllReservations(); /** * Returns the total {@link Resource} reserved for all users at the specified @@ -125,68 +126,61 @@ interface PlanView extends PlanContext { * * @return the time (UTC in ms) at which the first reservation starts */ - long getEarliestStartTime(); + public long getEarliestStartTime(); /** * Returns the time (UTC in ms) at which the last reservation terminates * * @return the time (UTC in ms) at which the last reservation terminates */ - long getLastEndTime(); + public long getLastEndTime(); /** * This method returns the amount of resources available to a given user * (optionally if removing a certain reservation) over the start-end time - * range. If the request is periodic (period is non-zero) we return the - * minimum amount of resources available to periodic reservations (in all - * "period" windows within the system maxPeriod / LCM). + * range. * - * @param user the user being considered - * @param oldId the identifier of the existing reservation - * @param start start of the time interval. - * @param end end of the time interval. - * @param period the ms periodicty for this request (loop and pick min till - * maxPeriodicity) + * @param user + * @param oldId + * @param start + * @param end * @return a view of the plan as it is available to this user - * @throws PlanningException if operation is unsuccessful + * @throws PlanningException */ - RLESparseResourceAllocation getAvailableResourceOverTime(String user, - ReservationId oldId, long start, long end, long period) - throws PlanningException; + public RLESparseResourceAllocation getAvailableResourceOverTime(String user, + ReservationId oldId, long start, long end) throws PlanningException; /** * This method returns a RLE encoded view of the user reservation count * utilization between start and end time. * - * @param user the user being considered - * @param start start of the time interval. - * @param end end of the time interval. + * @param user + * @param start + * @param end * @return RLE encoded view of reservation used over time */ - RLESparseResourceAllocation getReservationCountForUserOverTime(String user, - long start, long end); + public RLESparseResourceAllocation getReservationCountForUserOverTime( + String user, long start, long end); /** * This method returns a RLE encoded view of the user reservation utilization * between start and end time. * - * @param user the user being considered - * @param start start of the time interval. - * @param end end of the time interval. + * @param user + * @param start + * @param end * @return RLE encoded view of resources used over time */ - RLESparseResourceAllocation getConsumptionForUserOverTime(String user, + public RLESparseResourceAllocation getConsumptionForUserOverTime(String user, long start, long end); /** * Get the cumulative load over a time interval. * - * @param start start of the time interval. - * @param end end of the time interval. + * @param start Start of the time interval. + * @param end End of the time interval. * @return RLE sparse allocation. - * @throws PlanningException if operation is unsuccessful */ - RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end) - throws PlanningException; + RLESparseResourceAllocation getCumulativeLoadOverTime(long start, long end); } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org