Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 21EAE17F69 for ; Tue, 27 Oct 2015 21:47:14 +0000 (UTC) Received: (qmail 92205 invoked by uid 500); 27 Oct 2015 21:47:12 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 91980 invoked by uid 500); 27 Oct 2015 21:47:12 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 91867 invoked by uid 99); 27 Oct 2015 21:47:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 27 Oct 2015 21:47:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2259BE05D9; Tue, 27 Oct 2015 21:47:12 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jing9@apache.org To: common-commits@hadoop.apache.org Date: Tue, 27 Oct 2015 21:47:16 -0000 Message-Id: <184b8b0442304e21bd88a08b529f3904@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [05/50] hadoop git commit: YARN-3739. Add reservation system recovery to RM recovery process. Contributed by Subru Krishnan. YARN-3739. Add reservation system recovery to RM recovery process. Contributed by Subru Krishnan. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2798723a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2798723a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2798723a Branch: refs/heads/HDFS-8966 Commit: 2798723a5443d04455b9d79c48d61f435ab52267 Parents: 381610d Author: Anubhav Dhoot Authored: Thu Oct 22 06:36:58 2015 -0700 Committer: Anubhav Dhoot Committed: Thu Oct 22 06:51:00 2015 -0700 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../server/resourcemanager/ClientRMService.java | 2 +- .../server/resourcemanager/ResourceManager.java | 4 + .../reservation/AbstractReservationSystem.java | 91 ++++- .../AbstractSchedulerPlanFollower.java | 76 ++-- .../CapacitySchedulerPlanFollower.java | 16 - .../reservation/FairSchedulerPlanFollower.java | 15 - .../reservation/InMemoryPlan.java | 35 +- .../resourcemanager/reservation/PlanEdit.java | 6 +- .../reservation/PlanFollower.java | 4 +- .../reservation/ReservationSystem.java | 7 +- .../reservation/planning/PlanningAlgorithm.java | 2 +- .../TestReservationSystemWithRMHA.java | 360 +++++++++++++++++-- .../reservation/ReservationSystemTestUtil.java | 1 - .../reservation/TestCapacityOverTimePolicy.java | 29 +- .../reservation/TestInMemoryPlan.java | 16 +- .../reservation/TestNoOverCommitPolicy.java | 19 +- .../TestSchedulerPlanFollowerBase.java | 17 +- .../planning/TestAlignedPlanner.java | 7 +- .../planning/TestGreedyReservationAgent.java | 17 +- .../planning/TestSimpleCapacityReplanner.java | 14 +- 21 files changed, 556 insertions(+), 185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index ae26386..79df1ce 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -238,6 +238,9 @@ Release 2.8.0 - UNRELEASED YARN-4262. Allow whitelisted users to run privileged docker containers. (Sidharta Seethana via vvasudev) + YARN-3739. Add reservation system recovery to RM recovery process. + (Subru Krishnan via adhoot) + IMPROVEMENTS YARN-644. Basic null check is not performed on passed in arguments before http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 4a02580..812267d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -1363,7 +1363,7 @@ public class ClientRMService extends AbstractService implements .format( "Reservation {0} is within threshold so attempting to create synchronously.", reservationId)); - reservationSystem.synchronizePlan(planName); + reservationSystem.synchronizePlan(planName, true); LOG.info(MessageFormat.format("Created reservation {0} synchronously.", reservationId)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index b38f188..88fb1cf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -1186,6 +1186,10 @@ public class ResourceManager extends CompositeService implements Recoverable { // recover AMRMTokenSecretManager rmContext.getAMRMTokenSecretManager().recover(state); + // recover reservations + if (reservationSystem != null) { + reservationSystem.recover(state); + } // recover applications rmAppManager.recover(state); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index cf57dbe..56423e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -18,16 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; @@ -38,8 +28,11 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -52,6 +45,17 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + /** * This is the implementation of {@link ReservationSystem} based on the * {@link ResourceScheduler} @@ -94,6 +98,8 @@ public abstract class AbstractReservationSystem extends AbstractService private PlanFollower planFollower; + private boolean isRecoveryEnabled = false; + /** * Construct the service. * @@ -149,6 +155,49 @@ public abstract class AbstractReservationSystem extends AbstractService Plan plan = initializePlan(planQueueName); plans.put(planQueueName, plan); } + isRecoveryEnabled = conf.getBoolean( + YarnConfiguration.RECOVERY_ENABLED, + YarnConfiguration.DEFAULT_RM_RECOVERY_ENABLED); + } + + private void loadPlan(String planName, + Map reservations) + throws PlanningException { + Plan plan = plans.get(planName); + Resource minAllocation = getMinAllocation(); + ResourceCalculator rescCalculator = getResourceCalculator(); + for (Entry currentReservation : reservations + .entrySet()) { + plan.addReservation(ReservationSystemUtil.toInMemoryAllocation(planName, + currentReservation.getKey(), currentReservation.getValue(), + minAllocation, rescCalculator), true); + resQMap.put(currentReservation.getKey(), planName); + } + LOG.info("Recovered reservations for Plan: {}", planName); + } + + @Override + public void recover(RMState state) throws Exception { + LOG.info("Recovering Reservation system"); + writeLock.lock(); + try { + Map> reservationSystemState = + state.getReservationState(); + if (planFollower != null) { + for (String plan : plans.keySet()) { + // recover reservations if any from state store + if (reservationSystemState.containsKey(plan)) { + loadPlan(plan, reservationSystemState.get(plan)); + } + synchronizePlan(plan, false); + } + startPlanFollower(conf.getLong( + YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS, + YarnConfiguration.DEFAULT_RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS)); + } + } finally { + writeLock.unlock(); + } } private void initializeNewPlans(Configuration conf) { @@ -162,7 +211,7 @@ public abstract class AbstractReservationSystem extends AbstractService Plan plan = initializePlan(planQueueName); plans.put(planQueueName, plan); } else { - LOG.warn("Plan based on reservation queue {0} already exists.", + LOG.warn("Plan based on reservation queue {} already exists.", planQueueName); } } @@ -236,18 +285,26 @@ public abstract class AbstractReservationSystem extends AbstractService } @Override - public void synchronizePlan(String planName) { + public void synchronizePlan(String planName, boolean shouldReplan) { writeLock.lock(); try { Plan plan = plans.get(planName); if (plan != null) { - planFollower.synchronizePlan(plan); + planFollower.synchronizePlan(plan, shouldReplan); } } finally { writeLock.unlock(); } } + private void startPlanFollower(long initialDelay) { + if (planFollower != null) { + scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + scheduledExecutorService.scheduleWithFixedDelay(planFollower, + initialDelay, planStepSize, TimeUnit.MILLISECONDS); + } + } + @Override public void serviceInit(Configuration conf) throws Exception { Configuration configuration = new Configuration(conf); @@ -262,10 +319,8 @@ public abstract class AbstractReservationSystem extends AbstractService @Override public void serviceStart() throws Exception { - if (planFollower != null) { - scheduledExecutorService = new ScheduledThreadPoolExecutor(1); - scheduledExecutorService.scheduleWithFixedDelay(planFollower, 0L, - planStepSize, TimeUnit.MILLISECONDS); + if (!isRecoveryEnabled) { + startPlanFollower(planStepSize); } super.serviceStart(); } @@ -350,7 +405,7 @@ public abstract class AbstractReservationSystem extends AbstractService minAllocation, maxAllocation, planQueueName, getReplanner(planQueuePath), getReservationSchedulerConfiguration() .getMoveOnExpiry(planQueuePath), rmContext); - LOG.info("Intialized plan {0} based on reservable queue {1}", + LOG.info("Intialized plan {} based on reservable queue {}", plan.toString(), planQueueName); return plan; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java index ea7f27d..eaf2902 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractSchedulerPlanFollower.java @@ -27,8 +27,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; - import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +43,7 @@ import java.util.Set; public abstract class AbstractSchedulerPlanFollower implements PlanFollower { private static final Logger LOG = LoggerFactory - .getLogger(CapacitySchedulerPlanFollower.class); + .getLogger(AbstractSchedulerPlanFollower.class); protected Collection plans = new ArrayList(); protected YarnScheduler scheduler; @@ -59,7 +59,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { @Override public synchronized void run() { for (Plan plan : plans) { - synchronizePlan(plan); + synchronizePlan(plan, true); } } @@ -70,7 +70,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { } @Override - public synchronized void synchronizePlan(Plan plan) { + public synchronized void synchronizePlan(Plan plan, boolean shouldReplan) { String planQueueName = plan.getQueueName(); if (LOG.isDebugEnabled()) { LOG.debug("Running plan follower edit policy for plan: " + planQueueName); @@ -88,14 +88,12 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { Resource clusterResources = scheduler.getClusterResource(); Resource planResources = getPlanResources(plan, planQueue, clusterResources); - Set currentReservations = plan.getReservationsAtTime(now); Set curReservationNames = new HashSet(); Resource reservedResources = Resource.newInstance(0, 0); int numRes = getReservedResources(now, currentReservations, curReservationNames, reservedResources); - // create the default reservation queue if it doesnt exist String defReservationId = getReservationIdFromQueueName(planQueueName) + ReservationConstants.DEFAULT_QUEUE_SUFFIX; @@ -104,14 +102,18 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { createDefaultReservationQueue(planQueueName, planQueue, defReservationId); curReservationNames.add(defReservationId); - // if the resources dedicated to this plan has shrunk invoke replanner - if (arePlanResourcesLessThanReservations(clusterResources, planResources, - reservedResources)) { - try { - plan.getReplanner().plan(plan, null); - } catch (PlanningException e) { - LOG.warn("Exception while trying to replan: {}", planQueueName, e); + boolean shouldResize = false; + if (arePlanResourcesLessThanReservations(plan.getResourceCalculator(), + clusterResources, planResources, reservedResources)) { + if (shouldReplan) { + try { + plan.getReplanner().plan(plan, null); + } catch (PlanningException e) { + LOG.warn("Exception while trying to replan: {}", planQueueName, e); + } + } else { + shouldResize = true; } } // identify the reservations that have expired and new reservations that @@ -133,7 +135,6 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { // garbage collect expired reservations cleanupExpiredQueues(planQueueName, plan.getMoveOnExpiry(), expired, defReservationQueue); - // Add new reservations and update existing ones float totalAssignedCapacity = 0f; if (currentReservations != null) { @@ -146,9 +147,8 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { planQueueName, e); } // sort allocations from the one giving up the most resources, to the - // one asking for the most - // avoid order-of-operation errors that temporarily violate 100% - // capacity bound + // one asking for the most avoid order-of-operation errors that + // temporarily violate 100% capacity bound List sortedAllocations = sortByDelta( new ArrayList(currentReservations), now, @@ -162,10 +162,15 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { float targetCapacity = 0f; if (planResources.getMemory() > 0 && planResources.getVirtualCores() > 0) { + if (shouldResize) { + capToAssign = + calculateReservationToPlanProportion( + plan.getResourceCalculator(), planResources, + reservedResources, capToAssign); + } targetCapacity = - calculateReservationToPlanRatio(clusterResources, - planResources, - capToAssign); + calculateReservationToPlanRatio(plan.getResourceCalculator(), + clusterResources, planResources, capToAssign); } if (LOG.isDebugEnabled()) { LOG.debug( @@ -211,7 +216,6 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { } LOG.info("Finished iteration of plan follower edit policy for plan: " + planQueueName); - // Extension: update plan with app states, // useful to support smart replanning } @@ -324,18 +328,34 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { protected abstract Queue getPlanQueue(String planQueueName); /** + * Resizes reservations based on currently available resources + */ + private Resource calculateReservationToPlanProportion( + ResourceCalculator rescCalculator, Resource availablePlanResources, + Resource totalReservationResources, Resource reservationResources) { + return Resources.multiply(availablePlanResources, Resources.ratio( + rescCalculator, reservationResources, totalReservationResources)); + } + + /** * Calculates ratio of reservationResources to planResources */ - protected abstract float calculateReservationToPlanRatio( - Resource clusterResources, Resource planResources, - Resource reservationResources); + private float calculateReservationToPlanRatio( + ResourceCalculator rescCalculator, Resource clusterResources, + Resource planResources, Resource reservationResources) { + return Resources.divide(rescCalculator, clusterResources, + reservationResources, planResources); + } /** * Check if plan resources are less than expected reservation resources */ - protected abstract boolean arePlanResourcesLessThanReservations( - Resource clusterResources, Resource planResources, - Resource reservedResources); + private boolean arePlanResourcesLessThanReservations( + ResourceCalculator rescCalculator, Resource clusterResources, + Resource planResources, Resource reservedResources) { + return Resources.greaterThan(rescCalculator, clusterResources, + reservedResources, planResources); + } /** * Get a list of reservation queues for this planQueue @@ -363,7 +383,7 @@ public abstract class AbstractSchedulerPlanFollower implements PlanFollower { Plan plan, Queue queue, Resource clusterResources); /** - * Get reservation queue resources if it exists otherwise return null + * Get reservation queue resources if it exists otherwise return null. */ protected abstract Resource getReservationQueueResourceIfExists(Plan plan, ReservationId reservationId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java index 61772c9..551f075 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java @@ -81,22 +81,6 @@ public class CapacitySchedulerPlanFollower extends AbstractSchedulerPlanFollower } @Override - protected float calculateReservationToPlanRatio( - Resource clusterResources, Resource planResources, - Resource reservationResources) { - return Resources.divide(cs.getResourceCalculator(), - clusterResources, reservationResources, planResources); - } - - @Override - protected boolean arePlanResourcesLessThanReservations( - Resource clusterResources, Resource planResources, - Resource reservedResources) { - return Resources.greaterThan(cs.getResourceCalculator(), - clusterResources, reservedResources, planResources); - } - - @Override protected List getChildReservationQueues(Queue queue) { PlanQueue planQueue = (PlanQueue)queue; List childQueues = planQueue.getChildQueues(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java index 7ca03c5..2a57b22 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairSchedulerPlanFollower.java @@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueu import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,20 +59,6 @@ public class FairSchedulerPlanFollower extends AbstractSchedulerPlanFollower { } @Override - protected float calculateReservationToPlanRatio(Resource clusterResources, - Resource planResources, Resource capToAssign) { - return Resources.divide(fs.getResourceCalculator(), - clusterResources, capToAssign, planResources); - } - - @Override - protected boolean arePlanResourcesLessThanReservations(Resource - clusterResources, Resource planResources, Resource reservedResources) { - return Resources.greaterThan(fs.getResourceCalculator(), - clusterResources, reservedResources, planResources); - } - - @Override protected List getChildReservationQueues(Queue queue) { FSQueue planQueue = (FSQueue)queue; List childQueues = planQueue.getChildQueues(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java index a887e24..7e2567b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/InMemoryPlan.java @@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.Planner; import org.apache.hadoop.yarn.server.resourcemanager.reservation.planning.ReservationAgent; @@ -54,7 +55,7 @@ public class InMemoryPlan implements Plan { private static final Logger LOG = LoggerFactory.getLogger(InMemoryPlan.class); private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0); - private final RMContext rmContext; + private final RMStateStore rmStateStore; private TreeMap> currentReservations = new TreeMap>(); @@ -112,7 +113,7 @@ public class InMemoryPlan implements Plan { this.replanner = replanner; this.getMoveOnExpiry = getMoveOnExpiry; this.clock = clock; - this.rmContext = rmContext; + this.rmStateStore = rmContext.getStateStore(); } @Override @@ -174,8 +175,8 @@ public class InMemoryPlan implements Plan { } @Override - public boolean addReservation(ReservationAllocation reservation) - throws PlanningException { + public boolean addReservation(ReservationAllocation reservation, + boolean isRecovering) throws PlanningException { // Verify the allocation is memory based otherwise it is not supported InMemoryReservationAllocation inMemReservation = (InMemoryReservationAllocation) reservation; @@ -198,9 +199,16 @@ public class InMemoryPlan implements Plan { } // Validate if we can accept this reservation, throws exception if // validation fails - policy.validate(this, inMemReservation); - // we record here the time in which the allocation has been accepted - reservation.setAcceptanceTimestamp(clock.getTime()); + if (!isRecovering) { + policy.validate(this, inMemReservation); + // we record here the time in which the allocation has been accepted + reservation.setAcceptanceTimestamp(clock.getTime()); + if (rmStateStore != null) { + rmStateStore.storeNewReservation( + ReservationSystemUtil.buildStateProto(inMemReservation), + getQueueName(), inMemReservation.getReservationId().toString()); + } + } ReservationInterval searchInterval = new ReservationInterval(inMemReservation.getStartTime(), inMemReservation.getEndTime()); @@ -217,9 +225,6 @@ public class InMemoryPlan implements Plan { currentReservations.put(searchInterval, reservations); reservationTable.put(inMemReservation.getReservationId(), inMemReservation); - rmContext.getStateStore().storeNewReservation( - ReservationSystemUtil.buildStateProto(inMemReservation), - getQueueName(), inMemReservation.getReservationId().toString()); incrementAllocation(inMemReservation); LOG.info("Sucessfully added reservation: {} to plan.", inMemReservation.getReservationId()); @@ -253,7 +258,7 @@ public class InMemoryPlan implements Plan { return result; } try { - result = addReservation(reservation); + result = addReservation(reservation, false); } catch (PlanningException e) { LOG.error("Unable to update reservation: {} from plan due to {}.", reservation.getReservationId(), e.getMessage()); @@ -264,7 +269,7 @@ public class InMemoryPlan implements Plan { return result; } else { // rollback delete - addReservation(currReservation); + addReservation(currReservation, false); LOG.info("Rollbacked update reservation: {} from plan.", reservation.getReservationId()); return result; @@ -282,6 +287,10 @@ public class InMemoryPlan implements Plan { Set reservations = currentReservations.get(searchInterval); if (reservations != null) { + if (rmStateStore != null) { + rmStateStore.removeReservation(getQueueName(), + reservation.getReservationId().toString()); + } if (!reservations.remove(reservation)) { LOG.error("Unable to remove reservation: {} from plan.", reservation.getReservationId()); @@ -298,8 +307,6 @@ public class InMemoryPlan implements Plan { throw new IllegalArgumentException(errMsg); } reservationTable.remove(reservation.getReservationId()); - rmContext.getStateStore().removeReservation( - getQueueName(), reservation.getReservationId().toString()); decrementAllocation(reservation); LOG.info("Sucessfully deleted reservation: {} in plan.", reservation.getReservationId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java index 60e201b..504a250 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanEdit.java @@ -32,10 +32,12 @@ public interface PlanEdit extends PlanContext, PlanView { * * @param reservation the {@link ReservationAllocation} to be added to the * plan + * @param isRecovering flag to indicate if reservation is being added as part + * of failover or not * @return true if addition is successful, false otherwise */ - public boolean addReservation(ReservationAllocation reservation) - throws PlanningException; + public boolean addReservation(ReservationAllocation reservation, + boolean isRecovering) throws PlanningException; /** * Updates an existing {@link ReservationAllocation} in the plan. This is http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java index 6635314..a5c8ee1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/PlanFollower.java @@ -71,8 +71,10 @@ public interface PlanFollower extends Runnable { * start time is imminent. * * @param plan the Plan to synchronize + * @param shouldReplan replan on reduction of plan capacity if true or + * proportionally scale down reservations if false */ - public void synchronizePlan(Plan plan); + public void synchronizePlan(Plan plan, boolean shouldReplan); /** * Setter for the list of plans. http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java index 7785885..56a08ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystem.java @@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -40,7 +41,7 @@ import java.util.Map; */ @LimitedPrivate("yarn") @Unstable -public interface ReservationSystem { +public interface ReservationSystem extends Recoverable { /** * Set RMContext for {@link ReservationSystem}. This method should be called @@ -82,8 +83,10 @@ public interface ReservationSystem { * the {@link ResourceScheduler} * * @param planName the name of the {@link Plan} to be synchronized + * @param shouldReplan replan on reduction of plan capacity if true or + * proportionally scale down reservations if false */ - void synchronizePlan(String planName); + void synchronizePlan(String planName, boolean shouldReplan); /** * Return the time step (ms) at which the {@link PlanFollower} is invoked http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java index 9a0a0f0..8b72b9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/PlanningAlgorithm.java @@ -94,7 +94,7 @@ public abstract class PlanningAlgorithm implements ReservationAgent { if (oldReservation != null) { return plan.updateReservation(capReservation); } else { - return plan.addReservation(capReservation); + return plan.addReservation(capReservation, false); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java index 6f74130..48a4d97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestReservationSystemWithRMHA.java @@ -18,15 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager; import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest; import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse; import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest; +import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse; +import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.ReservationDefinition; import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ReservationAllocationStateProto; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.Plan; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationAllocation; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; @@ -38,7 +43,7 @@ import org.junit.Test; import java.util.Map; -public class TestReservationSystemWithRMHA extends RMHATestBase{ +public class TestReservationSystemWithRMHA extends RMHATestBase { @Override public void setup() throws Exception { @@ -56,7 +61,7 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{ public void testSubmitReservationAndCheckAfterFailover() throws Exception { startRMs(); - addNodeCapacityToPlan(); + addNodeCapacityToPlan(rm1, 102400, 100); ClientRMService clientService = rm1.getClientRMService(); @@ -72,8 +77,6 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{ ReservationId reservationID = response.getReservationId(); Assert.assertNotNull(reservationID); LOG.info("Submit reservation response: " + reservationID); - ReservationDefinition reservationDefinition = request - .getReservationDefinition(); // Do the failover explicitFailover(); @@ -87,12 +90,11 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{ Assert.assertNotNull(reservationStateMap.get(reservationID)); } - @Test public void testUpdateReservationAndCheckAfterFailover() throws Exception { startRMs(); - addNodeCapacityToPlan(); + addNodeCapacityToPlan(rm1, 102400, 100); ClientRMService clientService = rm1.getClientRMService(); @@ -108,17 +110,15 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{ ReservationId reservationID = response.getReservationId(); Assert.assertNotNull(reservationID); LOG.info("Submit reservation response: " + reservationID); - ReservationDefinition reservationDefinition = request - .getReservationDefinition(); - + ReservationDefinition reservationDefinition = + request.getReservationDefinition(); // Change any field long newDeadline = reservationDefinition.getDeadline() + 100; reservationDefinition.setDeadline(newDeadline); - ReservationUpdateRequest updateRequest = - ReservationUpdateRequest.newInstance( - reservationDefinition, reservationID); + ReservationUpdateRequest updateRequest = ReservationUpdateRequest + .newInstance(reservationDefinition, reservationID); rm1.updateReservationState(updateRequest); // Do the failover @@ -140,7 +140,7 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{ public void testDeleteReservationAndCheckAfterFailover() throws Exception { startRMs(); - addNodeCapacityToPlan(); + addNodeCapacityToPlan(rm1, 102400, 100); ClientRMService clientService = rm1.getClientRMService(); @@ -156,7 +156,6 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{ ReservationId reservationID = response.getReservationId(); Assert.assertNotNull(reservationID); - // Delete the reservation ReservationDeleteRequest deleteRequest = ReservationDeleteRequest.newInstance(reservationID); @@ -168,32 +167,31 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{ rm2.registerNode("127.0.0.1:1", 102400, 100); RMState state = rm2.getRMContext().getStateStore().loadState(); - Assert.assertNull(state.getReservationState().get( - ReservationSystemTestUtil.reservationQ)); + Assert.assertNull(state.getReservationState() + .get(ReservationSystemTestUtil.reservationQ)); } - private void addNodeCapacityToPlan() { + private void addNodeCapacityToPlan(MockRM rm, int memory, int vCores) { try { - rm1.registerNode("127.0.0.1:1", 102400, 100); + rm.registerNode("127.0.0.1:1", memory, vCores); int attempts = 10; do { DrainDispatcher dispatcher = (DrainDispatcher) rm1.getRMContext().getDispatcher(); dispatcher.await(); - rm1.getRMContext().getReservationSystem().synchronizePlan( - ReservationSystemTestUtil.reservationQ); - if (rm1.getRMContext().getReservationSystem().getPlan - (ReservationSystemTestUtil.reservationQ).getTotalCapacity() + rm.getRMContext().getReservationSystem() + .synchronizePlan(ReservationSystemTestUtil.reservationQ, false); + if (rm.getRMContext().getReservationSystem() + .getPlan(ReservationSystemTestUtil.reservationQ).getTotalCapacity() .getMemory() > 0) { break; } LOG.info("Waiting for node capacity to be added to plan"); Thread.sleep(100); - } - while (attempts-- > 0); + } while (attempts-- > 0); if (attempts <= 0) { - Assert.fail("Exhausted attempts in checking if node capacity was " + - "added to the plan"); + Assert.fail("Exhausted attempts in checking if node capacity was " + + "added to the plan"); } } catch (Exception e) { @@ -205,8 +203,316 @@ public class TestReservationSystemWithRMHA extends RMHATestBase{ Clock clock = new UTCClock(); long arrival = clock.getTime(); long duration = 60000; - long deadline = (long) (arrival + 1.05 * duration); + long deadline = (long) (arrival + duration + 1500); return ReservationSystemTestUtil.createSimpleReservationRequest(4, arrival, deadline, duration); } + + private void validateReservation(Plan plan, ReservationId resId, + ReservationDefinition rDef) { + ReservationAllocation reservation = plan.getReservationById(resId); + Assert.assertNotNull(reservation); + Assert.assertEquals(rDef.getDeadline(), + reservation.getReservationDefinition().getDeadline()); + } + + @Test + public void testSubmitReservationFailoverAndDelete() throws Exception { + startRMs(); + + addNodeCapacityToPlan(rm1, 102400, 100); + + ClientRMService clientService = rm1.getClientRMService(); + + // create a reservation + ReservationSubmissionRequest request = createReservationSubmissionRequest(); + ReservationSubmissionResponse response = null; + try { + response = clientService.submitReservation(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + ReservationId reservationID = response.getReservationId(); + Assert.assertNotNull(reservationID); + LOG.info("Submit reservation response: " + reservationID); + ReservationDefinition reservationDefinition = + request.getReservationDefinition(); + + // Do the failover + explicitFailover(); + + addNodeCapacityToPlan(rm2, 102400, 100); + + // check if reservation exists after failover + Plan plan = rm2.getRMContext().getReservationSystem() + .getPlan(ReservationSystemTestUtil.reservationQ); + validateReservation(plan, reservationID, reservationDefinition); + + // delete the reservation + ReservationDeleteRequest deleteRequest = + ReservationDeleteRequest.newInstance(reservationID); + ReservationDeleteResponse deleteResponse = null; + clientService = rm2.getClientRMService(); + try { + deleteResponse = clientService.deleteReservation(deleteRequest); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(deleteResponse); + Assert.assertNull(plan.getReservationById(reservationID)); + } + + @Test + public void testFailoverAndSubmitReservation() throws Exception { + startRMs(); + + addNodeCapacityToPlan(rm1, 102400, 100); + + // Do the failover + explicitFailover(); + + addNodeCapacityToPlan(rm2, 102400, 100); + + // create a reservation + ClientRMService clientService = rm2.getClientRMService(); + ReservationSubmissionRequest request = createReservationSubmissionRequest(); + ReservationSubmissionResponse response = null; + try { + response = clientService.submitReservation(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + ReservationId reservationID = response.getReservationId(); + Assert.assertNotNull(reservationID); + LOG.info("Submit reservation response: " + reservationID); + ReservationDefinition reservationDefinition = + request.getReservationDefinition(); + + // check if reservation is submitted successfully + Plan plan = rm2.getRMContext().getReservationSystem() + .getPlan(ReservationSystemTestUtil.reservationQ); + validateReservation(plan, reservationID, reservationDefinition); + } + + @Test + public void testSubmitReservationFailoverAndUpdate() throws Exception { + startRMs(); + + addNodeCapacityToPlan(rm1, 102400, 100); + + ClientRMService clientService = rm1.getClientRMService(); + + // create a reservation + ReservationSubmissionRequest request = createReservationSubmissionRequest(); + ReservationSubmissionResponse response = null; + try { + response = clientService.submitReservation(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + ReservationId reservationID = response.getReservationId(); + Assert.assertNotNull(reservationID); + LOG.info("Submit reservation response: " + reservationID); + ReservationDefinition reservationDefinition = + request.getReservationDefinition(); + + // Do the failover + explicitFailover(); + + addNodeCapacityToPlan(rm2, 102400, 100); + + // check if reservation exists after failover + Plan plan = rm2.getRMContext().getReservationSystem() + .getPlan(ReservationSystemTestUtil.reservationQ); + validateReservation(plan, reservationID, reservationDefinition); + + // update the reservation + long newDeadline = reservationDefinition.getDeadline() + 100; + reservationDefinition.setDeadline(newDeadline); + ReservationUpdateRequest updateRequest = ReservationUpdateRequest + .newInstance(reservationDefinition, reservationID); + ReservationUpdateResponse updateResponse = null; + clientService = rm2.getClientRMService(); + try { + updateResponse = clientService.updateReservation(updateRequest); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(updateResponse); + validateReservation(plan, reservationID, reservationDefinition); + } + + @Test + public void testSubmitUpdateReservationFailoverAndDelete() throws Exception { + startRMs(); + + addNodeCapacityToPlan(rm1, 102400, 100); + + ClientRMService clientService = rm1.getClientRMService(); + + // create a reservation + ReservationSubmissionRequest request = createReservationSubmissionRequest(); + ReservationSubmissionResponse response = null; + try { + response = clientService.submitReservation(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + ReservationId reservationID = response.getReservationId(); + Assert.assertNotNull(reservationID); + LOG.info("Submit reservation response: " + reservationID); + ReservationDefinition reservationDefinition = + request.getReservationDefinition(); + + // check if reservation is submitted successfully + Plan plan = rm1.getRMContext().getReservationSystem() + .getPlan(ReservationSystemTestUtil.reservationQ); + validateReservation(plan, reservationID, reservationDefinition); + + // update the reservation + long newDeadline = reservationDefinition.getDeadline() + 100; + reservationDefinition.setDeadline(newDeadline); + ReservationUpdateRequest updateRequest = ReservationUpdateRequest + .newInstance(reservationDefinition, reservationID); + ReservationUpdateResponse updateResponse = null; + try { + updateResponse = clientService.updateReservation(updateRequest); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(updateResponse); + validateReservation(plan, reservationID, reservationDefinition); + + // Do the failover + explicitFailover(); + + addNodeCapacityToPlan(rm2, 102400, 100); + + // check if reservation exists after failover + plan = rm2.getRMContext().getReservationSystem() + .getPlan(ReservationSystemTestUtil.reservationQ); + validateReservation(plan, reservationID, reservationDefinition); + + // delete the reservation + ReservationDeleteRequest deleteRequest = + ReservationDeleteRequest.newInstance(reservationID); + ReservationDeleteResponse deleteResponse = null; + clientService = rm2.getClientRMService(); + try { + deleteResponse = clientService.deleteReservation(deleteRequest); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(deleteResponse); + Assert.assertNull(plan.getReservationById(reservationID)); + } + + @Test + public void testReservationResizeAfterFailover() throws Exception { + startRMs(); + + addNodeCapacityToPlan(rm1, 102400, 100); + + ClientRMService clientService = rm1.getClientRMService(); + + // create 3 reservations + ReservationSubmissionRequest request = createReservationSubmissionRequest(); + ReservationDefinition reservationDefinition = + request.getReservationDefinition(); + ReservationSubmissionResponse response = null; + try { + response = clientService.submitReservation(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + ReservationId resID1 = response.getReservationId(); + Assert.assertNotNull(resID1); + LOG.info("Submit reservation response: " + resID1); + try { + response = clientService.submitReservation(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + ReservationId resID2 = response.getReservationId(); + Assert.assertNotNull(resID2); + LOG.info("Submit reservation response: " + resID2); + try { + response = clientService.submitReservation(request); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + Assert.assertNotNull(response); + ReservationId resID3 = response.getReservationId(); + Assert.assertNotNull(resID3); + LOG.info("Submit reservation response: " + resID3); + + // allow the reservations to become active + waitForReservationActivation(rm1, resID1, + ReservationSystemTestUtil.reservationQ); + + // validate reservations before failover + Plan plan = rm1.getRMContext().getReservationSystem() + .getPlan(ReservationSystemTestUtil.reservationQ); + validateReservation(plan, resID1, reservationDefinition); + validateReservation(plan, resID2, reservationDefinition); + validateReservation(plan, resID3, reservationDefinition); + ResourceScheduler scheduler = rm1.getResourceScheduler(); + QueueInfo resQ1 = scheduler.getQueueInfo(resID1.toString(), false, false); + Assert.assertEquals(0.05, resQ1.getCapacity(), 0.001f); + QueueInfo resQ2 = scheduler.getQueueInfo(resID2.toString(), false, false); + Assert.assertEquals(0.05, resQ2.getCapacity(), 0.001f); + QueueInfo resQ3 = scheduler.getQueueInfo(resID3.toString(), false, false); + Assert.assertEquals(0.05, resQ3.getCapacity(), 0.001f); + + // Do the failover + explicitFailover(); + + addNodeCapacityToPlan(rm2, 5120, 5); + + // check if reservations exists after failover + plan = rm2.getRMContext().getReservationSystem() + .getPlan(ReservationSystemTestUtil.reservationQ); + validateReservation(plan, resID1, reservationDefinition); + validateReservation(plan, resID3, reservationDefinition); + + // verify if the reservations have been resized + scheduler = rm2.getResourceScheduler(); + resQ1 = scheduler.getQueueInfo(resID1.toString(), false, false); + Assert.assertEquals(1f / 3f, resQ1.getCapacity(), 0.001f); + resQ2 = scheduler.getQueueInfo(resID2.toString(), false, false); + Assert.assertEquals(1f / 3f, resQ2.getCapacity(), 0.001f); + resQ3 = scheduler.getQueueInfo(resID3.toString(), false, false); + Assert.assertEquals(1f / 3f, resQ3.getCapacity(), 0.001f); + } + + private void waitForReservationActivation(MockRM rm, + ReservationId reservationId, String planName) { + try { + int attempts = 20; + do { + rm.getRMContext().getReservationSystem().synchronizePlan(planName, + false); + if (rm.getResourceScheduler() + .getQueueInfo(reservationId.toString(), false, false) + .getCapacity() > 0f) { + break; + } + LOG.info("Waiting for reservation to be active"); + Thread.sleep(100); + } while (attempts-- > 0); + if (attempts <= 0) { + Assert + .fail("Exceeded attempts in waiting for reservation to be active"); + } + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 7729172..05933f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -215,7 +215,6 @@ public class ReservationSystemTestUtil { return context; } - @SuppressWarnings("unchecked") public CapacityScheduler mockCapacityScheduler(int numContainers) throws IOException { // stolen from TestCapacityScheduler http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java index 22ce6aa..4aed064 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacityOverTimePolicy.java @@ -75,10 +75,11 @@ public class TestCapacityOverTimePolicy { maxAlloc = Resource.newInstance(1024 * 8, 8); mAgent = mock(ReservationAgent.class); - ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); QueueMetrics rootQueueMetrics = mock(QueueMetrics.class); - String reservationQ = testUtil.getFullReservationQueueName(); - Resource clusterResource = testUtil.calculateClusterResource(totCont); + String reservationQ = + ReservationSystemTestUtil.getFullReservationQueueName(); + Resource clusterResource = + ReservationSystemTestUtil.calculateClusterResource(totCont); ReservationSchedulerConfiguration conf = ReservationSystemTestUtil.createConf(reservationQ, timeWindow, instConstraint, avgConstraint); @@ -113,7 +114,7 @@ public class TestCapacityOverTimePolicy { ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc))); + res, minAlloc), false)); } @Test @@ -130,7 +131,7 @@ public class TestCapacityOverTimePolicy { ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc))); + res, minAlloc), false)); } @Test @@ -146,7 +147,7 @@ public class TestCapacityOverTimePolicy { ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i, "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc))); + res, minAlloc), false)); } } @@ -163,7 +164,7 @@ public class TestCapacityOverTimePolicy { ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i, "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc))); + res, minAlloc), false)); } } @@ -179,7 +180,7 @@ public class TestCapacityOverTimePolicy { ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc))); + res, minAlloc), false)); Assert.fail("should not have accepted this"); } @@ -195,20 +196,20 @@ public class TestCapacityOverTimePolicy { ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc))); + res, minAlloc), false)); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc))); + res, minAlloc), false)); try { assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc))); + res, minAlloc), false)); Assert.fail(); } catch (PlanningQuotaException p) { // expected @@ -232,7 +233,7 @@ public class TestCapacityOverTimePolicy { assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", initTime, initTime + win, req, res, minAlloc))); + "dedicated", initTime, initTime + win, req, res, minAlloc), false)); } @Test @@ -251,13 +252,13 @@ public class TestCapacityOverTimePolicy { assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", initTime, initTime + win, req, res, minAlloc))); + "dedicated", initTime, initTime + win, req, res, minAlloc), false)); try { assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( ReservationSystemTestUtil.getNewReservationId(), null, "u1", - "dedicated", initTime, initTime + win, req, res, minAlloc))); + "dedicated", initTime, initTime + win, req, res, minAlloc), false)); Assert.fail("should not have accepted this"); } catch (PlanningQuotaException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java index c661271..2e262a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestInMemoryPlan.java @@ -113,7 +113,7 @@ public class TestInMemoryPlan { start, start + alloc.length, allocs, resCalc, minAlloc); Assert.assertNull(plan.getReservationById(reservationID)); try { - plan.addReservation(rAllocation); + plan.addReservation(rAllocation, false); } catch (PlanningException e) { Assert.fail(e.getMessage()); } @@ -147,7 +147,7 @@ public class TestInMemoryPlan { start, start + alloc.length, allocs, resCalc, minAlloc); Assert.assertNull(plan.getReservationById(reservationID)); try { - plan.addReservation(rAllocation); + plan.addReservation(rAllocation, false); } catch (PlanningException e) { Assert.fail(e.getMessage()); } @@ -175,7 +175,7 @@ public class TestInMemoryPlan { start, start + alloc.length, allocs, resCalc, minAlloc); Assert.assertNull(plan.getReservationById(reservationID)); try { - plan.addReservation(rAllocation); + plan.addReservation(rAllocation, false); } catch (PlanningException e) { Assert.fail(e.getMessage()); } @@ -189,7 +189,7 @@ public class TestInMemoryPlan { // Try to add it again try { - plan.addReservation(rAllocation); + plan.addReservation(rAllocation, false); Assert.fail("Add should fail as it already exists"); } catch (IllegalArgumentException e) { Assert.assertTrue(e.getMessage().endsWith("already exists")); @@ -221,7 +221,7 @@ public class TestInMemoryPlan { start, start + alloc.length, allocs, resCalc, minAlloc); Assert.assertNull(plan.getReservationById(reservationID)); try { - plan.addReservation(rAllocation); + plan.addReservation(rAllocation, false); } catch (PlanningException e) { Assert.fail(e.getMessage()); } @@ -316,7 +316,7 @@ public class TestInMemoryPlan { start, start + alloc.length, allocs, resCalc, minAlloc); Assert.assertNull(plan.getReservationById(reservationID)); try { - plan.addReservation(rAllocation); + plan.addReservation(rAllocation, false); } catch (PlanningException e) { Assert.fail(e.getMessage()); } @@ -388,7 +388,7 @@ public class TestInMemoryPlan { minAlloc); Assert.assertNull(plan.getReservationById(reservationID1)); try { - plan.addReservation(rAllocation); + plan.addReservation(rAllocation, false); } catch (PlanningException e) { Assert.fail(e.getMessage()); } @@ -419,7 +419,7 @@ public class TestInMemoryPlan { minAlloc); Assert.assertNull(plan.getReservationById(reservationID2)); try { - plan.addReservation(rAllocation); + plan.addReservation(rAllocation, false); } catch (PlanningException e) { Assert.fail(e.getMessage()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java index a5f3fb4df..28dd62e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestNoOverCommitPolicy.java @@ -61,10 +61,11 @@ public class TestNoOverCommitPolicy { maxAlloc = Resource.newInstance(1024 * 8, 8); mAgent = mock(ReservationAgent.class); - ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); - String reservationQ = testUtil.getFullReservationQueueName(); + String reservationQ = + ReservationSystemTestUtil.getFullReservationQueueName(); QueueMetrics rootQueueMetrics = mock(QueueMetrics.class); - Resource clusterResource = testUtil.calculateClusterResource(totCont); + Resource clusterResource = + ReservationSystemTestUtil.calculateClusterResource(totCont); ReservationSchedulerConfiguration conf = mock (ReservationSchedulerConfiguration.class); NoOverCommitPolicy policy = new NoOverCommitPolicy(); @@ -97,7 +98,7 @@ public class TestNoOverCommitPolicy { ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc))); + res, minAlloc), false)); } @Test @@ -113,7 +114,7 @@ public class TestNoOverCommitPolicy { ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc))); + res, minAlloc), false)); } @Test(expected = ResourceOverCommitException.class) @@ -123,7 +124,7 @@ public class TestNoOverCommitPolicy { plan.addReservation(new InMemoryReservationAllocation( ReservationSystemTestUtil.getNewReservationId(), null, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil - .generateAllocation(initTime, step, f), res, minAlloc)); + .generateAllocation(initTime, step, f), res, minAlloc), false); } @Test(expected = MismatchedUserException.class) @@ -137,7 +138,7 @@ public class TestNoOverCommitPolicy { plan.addReservation(new InMemoryReservationAllocation(rid, rDef, "u1", "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil - .generateAllocation(initTime, step, f), res, minAlloc)); + .generateAllocation(initTime, step, f), res, minAlloc), false); // trying to update a reservation with a mismatching user plan.updateReservation(new InMemoryReservationAllocation(rid, rDef, "u2", @@ -158,7 +159,7 @@ public class TestNoOverCommitPolicy { ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i, "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc))); + res, minAlloc), false)); } } @@ -175,7 +176,7 @@ public class TestNoOverCommitPolicy { ReservationSystemTestUtil.getNewReservationId(), rDef, "u" + i, "dedicated", initTime, initTime + f.length, ReservationSystemTestUtil.generateAllocation(initTime, step, f), - res, minAlloc))); + res, minAlloc), false)); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java index 9689fce..b604799 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestSchedulerPlanFollowerBase.java @@ -18,6 +18,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -38,12 +43,6 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.junit.Assert; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public abstract class TestSchedulerPlanFollowerBase { final static int GB = 1024; protected Clock mClock = null; @@ -75,20 +74,20 @@ public abstract class TestSchedulerPlanFollowerBase { assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3", "dedicated", 0, 0 + f1.length, ReservationSystemTestUtil - .generateAllocation(0L, 1L, f1), res, minAlloc))); + .generateAllocation(0L, 1L, f1), res, minAlloc), false)); ReservationId r2 = ReservationId.newInstance(ts, 2); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u3", "dedicated", 3, 3 + f1.length, ReservationSystemTestUtil - .generateAllocation(3L, 1L, f1), res, minAlloc))); + .generateAllocation(3L, 1L, f1), res, minAlloc), false)); ReservationId r3 = ReservationId.newInstance(ts, 3); int[] f2 = { 0, 10, 20, 10, 0 }; assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u4", "dedicated", 10, 10 + f2.length, ReservationSystemTestUtil - .generateAllocation(10L, 1L, f2), res, minAlloc))); + .generateAllocation(10L, 1L, f2), res, minAlloc), false)); AbstractSchedulerPlanFollower planFollower = createPlanFollower(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java index 01b7976..ec305a2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestAlignedPlanner.java @@ -711,9 +711,8 @@ public class TestAlignedPlanner { Resource clusterCapacity = Resource.newInstance(capacityMem, capacityCores); - // Set configuration - ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); - String reservationQ = testUtil.getFullReservationQueueName(); + String reservationQ = + ReservationSystemTestUtil.getFullReservationQueueName(); float instConstraint = 100; float avgConstraint = 100; @@ -792,7 +791,7 @@ public class TestAlignedPlanner { ReservationSystemTestUtil.getNewReservationId(), rDef, "user_fixed", "dedicated", start, start + f.length * step, ReservationSystemTestUtil.generateAllocation(start, step, f), res, - minAlloc))); + minAlloc), false)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java index f51cc75..cb4eaeb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestGreedyReservationAgent.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; - import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -76,8 +75,8 @@ public class TestGreedyReservationAgent { long timeWindow = 1000000L; Resource clusterCapacity = Resource.newInstance(100 * 1024, 100); step = 1000L; - ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); - String reservationQ = testUtil.getFullReservationQueueName(); + String reservationQ = + ReservationSystemTestUtil.getFullReservationQueueName(); float instConstraint = 100; float avgConstraint = 100; @@ -151,7 +150,7 @@ public class TestGreedyReservationAgent { ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", 30 * step, 30 * step + f.length * step, ReservationSystemTestUtil.generateAllocation(30 * step, step, f), - res, minAlloc))); + res, minAlloc), false)); // create a chain of 4 RR, mixing gang and non-gang ReservationDefinition rr = new ReservationDefinitionPBImpl(); @@ -208,7 +207,7 @@ public class TestGreedyReservationAgent { ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", 30 * step, 30 * step + f.length * step, ReservationSystemTestUtil.generateAllocation(30 * step, step, f), - res, minAlloc))); + res, minAlloc), false)); // create a chain of 4 RR, mixing gang and non-gang ReservationDefinition rr = new ReservationDefinitionPBImpl(); @@ -529,7 +528,7 @@ public class TestGreedyReservationAgent { plan.addReservation(new InMemoryReservationAllocation( ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", "dedicated", 0L, 0L + f.length * step, ReservationSystemTestUtil - .generateAllocation(0, step, f), res, minAlloc))); + .generateAllocation(0, step, f), res, minAlloc), false)); int[] f2 = { 5, 5, 5, 5, 5, 5, 5 }; Map alloc = @@ -537,7 +536,8 @@ public class TestGreedyReservationAgent { assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation( ReservationSystemTestUtil.getNewReservationId(), rDef, "u1", - "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc))); + "dedicated", 5000, 5000 + f2.length * step, alloc, res, minAlloc), + false)); System.out.println("--------BEFORE AGENT----------"); System.out.println(plan.toString()); @@ -563,7 +563,8 @@ public class TestGreedyReservationAgent { step = 1000L; ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); CapacityScheduler scheduler = testUtil.mockCapacityScheduler(500 * 100); - String reservationQ = testUtil.getFullReservationQueueName(); + String reservationQ = + ReservationSystemTestUtil.getFullReservationQueueName(); float instConstraint = 100; float avgConstraint = 100; ReservationSchedulerConfiguration conf = http://git-wip-us.apache.org/repos/asf/hadoop/blob/2798723a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java index b6e6667..eb0b0e2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/planning/TestSimpleCapacityReplanner.java @@ -93,44 +93,44 @@ public class TestSimpleCapacityReplanner { assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation(r1, rDef, "u3", "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, - minAlloc))); + minAlloc), false)); when(clock.getTime()).thenReturn(1L); ReservationId r2 = ReservationId.newInstance(ts, 2); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation(r2, rDef, "u4", "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, - minAlloc))); + minAlloc), false)); when(clock.getTime()).thenReturn(2L); ReservationId r3 = ReservationId.newInstance(ts, 3); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation(r3, rDef, "u5", "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, - minAlloc))); + minAlloc), false)); when(clock.getTime()).thenReturn(3L); ReservationId r4 = ReservationId.newInstance(ts, 4); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation(r4, rDef, "u6", "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, - minAlloc))); + minAlloc), false)); when(clock.getTime()).thenReturn(4L); ReservationId r5 = ReservationId.newInstance(ts, 5); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation(r5, rDef, "u7", "dedicated", 0, 0 + f5.length, generateAllocation(0, f5), res, - minAlloc))); + minAlloc), false)); int[] f6 = { 50, 50, 50, 50, 50 }; ReservationId r6 = ReservationId.newInstance(ts, 6); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation(r6, rDef, "u3", "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res, - minAlloc))); + minAlloc), false)); when(clock.getTime()).thenReturn(6L); ReservationId r7 = ReservationId.newInstance(ts, 7); assertTrue(plan.toString(), plan.addReservation(new InMemoryReservationAllocation(r7, rDef, "u4", "dedicated", 10, 10 + f6.length, generateAllocation(10, f6), res, - minAlloc))); + minAlloc), false)); // remove some of the resources (requires replanning) plan.setTotalCapacity(Resource.newInstance(70 * 1024, 70));