Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5768A10763 for ; Fri, 19 Dec 2014 23:37:03 +0000 (UTC) Received: (qmail 10536 invoked by uid 500); 19 Dec 2014 23:37:03 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 10463 invoked by uid 500); 19 Dec 2014 23:37:03 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 10454 invoked by uid 99); 19 Dec 2014 23:37:03 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 19 Dec 2014 23:37:03 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id A90ACA30A07; Fri, 19 Dec 2014 23:37:02 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kasha@apache.org To: common-commits@hadoop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler. (Anubhav Dhoot via kasha) Date: Fri, 19 Dec 2014 23:37:02 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/trunk 6f1e3667c -> a22ffc318 YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler. (Anubhav Dhoot via kasha) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a22ffc31 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a22ffc31 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a22ffc31 Branch: refs/heads/trunk Commit: a22ffc318801698e86cd0e316b4824015f2486ac Parents: 6f1e366 Author: Karthik Kambatla Authored: Fri Dec 19 14:23:43 2014 -0800 Committer: Karthik Kambatla Committed: Fri Dec 19 15:37:12 2014 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../reservation/AbstractReservationSystem.java | 4 +- .../reservation/FairReservationSystem.java | 90 +++++++++++ .../scheduler/fair/AllocationConfiguration.java | 67 +++++++- .../fair/AllocationFileLoaderService.java | 55 ++++++- .../scheduler/fair/FairScheduler.java | 12 ++ .../fair/ReservationQueueConfiguration.java | 105 +++++++++++++ .../webapp/dao/FairSchedulerQueueInfo.java | 9 +- .../reservation/ReservationSystemTestUtil.java | 58 +++++++ .../reservation/TestFairReservationSystem.java | 151 +++++++++++++++++++ .../fair/TestAllocationFileLoaderService.java | 81 ++++++++++ 11 files changed, 622 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/a22ffc31/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index d0d5c72..54ca3d2 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -67,6 +67,9 @@ Release 2.7.0 - UNRELEASED YARN-2203. [YARN-1492] Web UI for cache manager. (Chris Trezzo via kasha) + YARN-2738. [YARN-2574] Add FairReservationSystem for FairScheduler. + (Anubhav Dhoot via kasha) + IMPROVEMENTS YARN-2950. Change message to mandate, not suggest JS requirement on UI. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a22ffc31/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java index 1539a6e..fa0835a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/AbstractReservationSystem.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -322,9 +323,10 @@ public abstract class AbstractReservationSystem extends AbstractService * @param scheduler the scheduler for which the reservation system is required */ public static String getDefaultReservationSystem(ResourceScheduler scheduler) { - // currently only capacity scheduler is supported if (scheduler instanceof CapacityScheduler) { return CapacityReservationSystem.class.getName(); + } else if (scheduler instanceof FairScheduler) { + return FairReservationSystem.class.getName(); } return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a22ffc31/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairReservationSystem.java new file mode 100644 index 0000000..9bf92c2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/FairReservationSystem.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + +public class FairReservationSystem extends AbstractReservationSystem { + + private FairScheduler fairScheduler; + + public FairReservationSystem() { + super(FairReservationSystem.class.getName()); + } + + @Override + public void reinitialize(Configuration conf, RMContext rmContext) + throws YarnException { + // Validate if the scheduler is fair scheduler + ResourceScheduler scheduler = rmContext.getScheduler(); + if (!(scheduler instanceof FairScheduler)) { + throw new YarnRuntimeException("Class " + + scheduler.getClass().getCanonicalName() + " not instance of " + + FairScheduler.class.getCanonicalName()); + } + fairScheduler = (FairScheduler) scheduler; + this.conf = conf; + super.reinitialize(conf, rmContext); + } + + @Override + protected ReservationSchedulerConfiguration + getReservationSchedulerConfiguration() { + return fairScheduler.getAllocationConfiguration(); + } + + @Override + protected ResourceCalculator getResourceCalculator() { + return fairScheduler.getResourceCalculator(); + } + + @Override + protected QueueMetrics getRootQueueMetrics() { + return fairScheduler.getRootQueueMetrics(); + } + + @Override + protected Resource getMinAllocation() { + return fairScheduler.getMinimumResourceCapability(); + } + + @Override + protected Resource getMaxAllocation() { + return fairScheduler.getMaximumResourceCapability(); + } + + @Override + protected String getPlanQueuePath(String planQueueName) { + return planQueueName; } + + @Override + protected Resource getPlanQueueCapacity(String planQueueName) { + return fairScheduler.getQueueManager().getParentQueue(planQueueName, false) + .getSteadyFairShare(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a22ffc31/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 70a6496..fd99d65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -27,12 +27,13 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; -public class AllocationConfiguration { +public class AllocationConfiguration extends ReservationSchedulerConfiguration { private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*"); private static final AccessControlList NOBODY_ACL = new AccessControlList(" "); @@ -76,6 +77,8 @@ public class AllocationConfiguration { // preempt other queues' tasks. private final Map fairSharePreemptionThresholds; + private final Set reservableQueues; + private final Map schedulingPolicies; private final SchedulingPolicy defaultSchedulingPolicy; @@ -87,7 +90,10 @@ public class AllocationConfiguration { //Configured queues in the alloc xml @VisibleForTesting Map> configuredQueues; - + + // Reservation system configuration + private ReservationQueueConfiguration globalReservationQueueConfig; + public AllocationConfiguration(Map minQueueResources, Map maxQueueResources, Map queueMaxApps, Map userMaxApps, @@ -101,7 +107,9 @@ public class AllocationConfiguration { Map fairSharePreemptionThresholds, Map> queueAcls, QueuePlacementPolicy placementPolicy, - Map> configuredQueues) { + Map> configuredQueues, + ReservationQueueConfiguration globalReservationQueueConfig, + Set reservableQueues) { this.minQueueResources = minQueueResources; this.maxQueueResources = maxQueueResources; this.queueMaxApps = queueMaxApps; @@ -117,6 +125,8 @@ public class AllocationConfiguration { this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts; this.fairSharePreemptionThresholds = fairSharePreemptionThresholds; this.queueAcls = queueAcls; + this.reservableQueues = reservableQueues; + this.globalReservationQueueConfig = globalReservationQueueConfig; this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; } @@ -137,6 +147,7 @@ public class AllocationConfiguration { fairSharePreemptionThresholds = new HashMap(); schedulingPolicies = new HashMap(); defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; + reservableQueues = new HashSet<>(); configuredQueues = new HashMap>(); for (FSQueueType queueType : FSQueueType.values()) { configuredQueues.put(queueType, new HashSet()); @@ -262,4 +273,54 @@ public class AllocationConfiguration { public QueuePlacementPolicy getPlacementPolicy() { return placementPolicy; } + + @Override + public boolean isReservable(String queue) { + return reservableQueues.contains(queue); + } + + @Override + public long getReservationWindow(String queue) { + return globalReservationQueueConfig.getReservationWindowMsec(); + } + + @Override + public float getAverageCapacity(String queue) { + return globalReservationQueueConfig.getAvgOverTimeMultiplier() * 100; + } + + @Override + public float getInstantaneousMaxCapacity(String queue) { + return globalReservationQueueConfig.getMaxOverTimeMultiplier() * 100; + } + + @Override + public String getReservationAdmissionPolicy(String queue) { + return globalReservationQueueConfig.getReservationAdmissionPolicy(); + } + + @Override + public String getReservationAgent(String queue) { + return globalReservationQueueConfig.getReservationAgent(); + } + + @Override + public boolean getShowReservationAsQueues(String queue) { + return globalReservationQueueConfig.shouldShowReservationAsQueues(); + } + + @Override + public String getReplanner(String queue) { + return globalReservationQueueConfig.getPlanner(); + } + + @Override + public boolean getMoveOnExpiry(String queue) { + return globalReservationQueueConfig.shouldMoveOnExpiry(); + } + + @Override + public long getEnforcementWindow(String queue) { + return globalReservationQueueConfig.getEnforcementWindowMsec(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/a22ffc31/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index e0e23e0..76fa588 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -86,7 +86,7 @@ public class AllocationFileLoaderService extends AbstractService { private Thread reloadThread; private volatile boolean running = true; - + public AllocationFileLoaderService() { this(new SystemClock()); } @@ -222,6 +222,7 @@ public class AllocationFileLoaderService extends AbstractService { new HashMap(); Map> queueAcls = new HashMap>(); + Set reservableQueues = new HashSet(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; float queueMaxAMShareDefault = 0.5f; @@ -230,6 +231,11 @@ public class AllocationFileLoaderService extends AbstractService { float defaultFairSharePreemptionThreshold = 0.5f; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; + // Reservation global configuration knobs + String planner = null; + String reservationAgent = null; + String reservationAdmissionPolicy = null; + QueuePlacementPolicy newPlacementPolicy = null; // Remember all queue names so we can display them on web UI, etc. @@ -317,6 +323,15 @@ public class AllocationFileLoaderService extends AbstractService { defaultSchedPolicy = SchedulingPolicy.parse(text); } else if ("queuePlacementPolicy".equals(element.getTagName())) { placementPolicyElement = element; + } else if ("reservation-planner".equals(element.getTagName())) { + String text = ((Text) element.getFirstChild()).getData().trim(); + planner = text; + } else if ("reservation-agent".equals(element.getTagName())) { + String text = ((Text) element.getFirstChild()).getData().trim(); + reservationAgent = text; + } else if ("reservation-policy".equals(element.getTagName())) { + String text = ((Text) element.getFirstChild()).getData().trim(); + reservationAdmissionPolicy = text; } else { LOG.warn("Bad element in allocations file: " + element.getTagName()); } @@ -337,7 +352,8 @@ public class AllocationFileLoaderService extends AbstractService { loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, - fairSharePreemptionThresholds, queueAcls, configuredQueues); + fairSharePreemptionThresholds, queueAcls, configuredQueues, + reservableQueues); } // Load placement policy and pass it configured queues @@ -366,13 +382,27 @@ public class AllocationFileLoaderService extends AbstractService { defaultFairSharePreemptionThreshold); } + ReservationQueueConfiguration globalReservationQueueConfig = new + ReservationQueueConfiguration(); + if (planner != null) { + globalReservationQueueConfig.setPlanner(planner); + } + if (reservationAdmissionPolicy != null) { + globalReservationQueueConfig.setReservationAdmissionPolicy + (reservationAdmissionPolicy); + } + if (reservationAgent != null) { + globalReservationQueueConfig.setReservationAgent(reservationAgent); + } + AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, - newPlacementPolicy, configuredQueues); + newPlacementPolicy, configuredQueues, globalReservationQueueConfig, + reservableQueues); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -392,8 +422,9 @@ public class AllocationFileLoaderService extends AbstractService { Map minSharePreemptionTimeouts, Map fairSharePreemptionTimeouts, Map fairSharePreemptionThresholds, - Map> queueAcls, - Map> configuredQueues) + Map> queueAcls, + Map> configuredQueues, + Set reservableQueues) throws AllocationConfigurationException { String queueName = element.getAttribute("name"); @@ -460,14 +491,17 @@ public class AllocationFileLoaderService extends AbstractService { } else if ("aclAdministerApps".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData(); acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text)); + } else if ("reservation".equals(field.getTagName())) { + isLeaf = false; + reservableQueues.add(queueName); + configuredQueues.get(FSQueueType.PARENT).add(queueName); } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, - queueAcls, configuredQueues); - configuredQueues.get(FSQueueType.PARENT).add(queueName); + queueAcls, configuredQueues, reservableQueues); isLeaf = false; } } @@ -479,6 +513,13 @@ public class AllocationFileLoaderService extends AbstractService { } else { configuredQueues.get(FSQueueType.LEAF).add(queueName); } + } else { + if ("parent".equals(element.getAttribute("type"))) { + throw new AllocationConfigurationException("Both and " + + "type=\"parent\" found for queue " + queueName + " which is " + + "unsupported"); + } + configuredQueues.get(FSQueueType.PARENT).add(queueName); } queueAcls.put(queueName, acls); if (maxQueueResources.containsKey(queueName) && http://git-wip-us.apache.org/repos/asf/hadoop/blob/a22ffc31/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a687e71..4106285 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1568,4 +1568,16 @@ public class FairScheduler extends return EnumSet .of(SchedulerResourceTypes.MEMORY, SchedulerResourceTypes.CPU); } + + @Override + public Set getPlanQueues() throws YarnException { + Set planQueues = new HashSet(); + for (FSQueue fsQueue : queueMgr.getQueues()) { + String queueName = fsQueue.getName(); + if (allocConf.isReservable(queueName)) { + planQueues.add(queueName); + } + } + return planQueues; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a22ffc31/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java new file mode 100644 index 0000000..747a4c2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/ReservationQueueConfiguration.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; + +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class ReservationQueueConfiguration { + private long reservationWindow; + private long enforcementWindow; + private String reservationAdmissionPolicy; + private String reservationAgent; + private String planner; + private boolean showReservationAsQueues; + private boolean moveOnExpiry; + private float avgOverTimeMultiplier; + private float maxOverTimeMultiplier; + + public ReservationQueueConfiguration() { + this.reservationWindow = ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_WINDOW; + this.enforcementWindow = ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_ENFORCEMENT_WINDOW; + this.reservationAdmissionPolicy = ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_ADMISSION_POLICY; + this.reservationAgent = ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_AGENT_NAME; + this.planner = ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_PLANNER_NAME; + this.showReservationAsQueues = ReservationSchedulerConfiguration + .DEFAULT_SHOW_RESERVATIONS_AS_QUEUES; + this.moveOnExpiry = ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_MOVE_ON_EXPIRY; + this.avgOverTimeMultiplier = ReservationSchedulerConfiguration + .DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER; + this.maxOverTimeMultiplier = ReservationSchedulerConfiguration + .DEFAULT_CAPACITY_OVER_TIME_MULTIPLIER; + } + + public long getReservationWindowMsec() { + return reservationWindow; + } + + public long getEnforcementWindowMsec() { + return enforcementWindow; + } + + public boolean shouldShowReservationAsQueues() { + return showReservationAsQueues; + } + + public boolean shouldMoveOnExpiry() { + return moveOnExpiry; + } + + public String getReservationAdmissionPolicy() { + return reservationAdmissionPolicy; + } + + public String getReservationAgent() { + return reservationAgent; + } + + public String getPlanner() { + return planner; + } + + public float getAvgOverTimeMultiplier() { + return avgOverTimeMultiplier; + } + + public float getMaxOverTimeMultiplier() { + return maxOverTimeMultiplier; + } + + public void setPlanner(String planner) { + this.planner = planner; + } + + public void setReservationAdmissionPolicy(String reservationAdmissionPolicy) { + this.reservationAdmissionPolicy = reservationAdmissionPolicy; + } + + public void setReservationAgent(String reservationAgent) { + this.reservationAgent = reservationAgent; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a22ffc31/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java index c62aaf0..5fbfe51 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java @@ -94,9 +94,14 @@ public class FairSchedulerQueueInfo { fractionMemMaxShare = (float)maxResources.getMemory() / clusterResources.getMemory(); maxApps = allocConf.getQueueMaxApps(queueName); - - Collection children = queue.getChildQueues(); + childQueues = new ArrayList(); + if (allocConf.isReservable(queueName) && + !allocConf.getShowReservationAsQueues(queueName)) { + return; + } + + Collection children = queue.getChildQueues(); for (FSQueue child : children) { if (child instanceof FSLeafQueue) { childQueues.add(new FairSchedulerLeafQueueInfo((FSLeafQueue)child, scheduler)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a22ffc31/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java index 90e71bf..d93af38 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/ReservationSystemTestUtil.java @@ -23,7 +23,9 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.FileWriter; import java.io.IOException; +import java.io.PrintWriter; import java.util.Collections; import java.util.Map; import java.util.Random; @@ -102,6 +104,62 @@ public class ReservationSystemTestUtil { .assertTrue(newPlan.getSharingPolicy() instanceof CapacityOverTimePolicy); } + static void setupFSAllocationFile(String allocationFile) + throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(allocationFile)); + out.println(""); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println(""); + out.println("1"); + out.println(""); + out.println("3"); + out.println(""); + out.println(""); + out.println("7"); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("8"); + out.println(""); + out.println("drf"); + out.println(""); + out.close(); + } + + static void updateFSAllocationFile(String allocationFile) + throws IOException { + PrintWriter out = new PrintWriter(new FileWriter(allocationFile)); + out.println(""); + out.println(""); + out.println(""); + out.println("5"); + out.println(""); + out.println(""); + out.println("5"); + out.println(""); + out.println("3"); + out.println(""); + out.println(""); + out.println("7"); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("80"); + out.println(""); + out.println(""); + out.println(""); + out.println("10"); + out.println(""); + out.println("drf"); + out.println(""); + out.close(); + } + @SuppressWarnings("unchecked") public CapacityScheduler mockCapacityScheduler(int numContainers) throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a22ffc31/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java new file mode 100644 index 0000000..82ba731 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestFairReservationSystem.java @@ -0,0 +1,151 @@ +/******************************************************************************* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *******************************************************************************/ +package org.apache.hadoop.yarn.server.resourcemanager.reservation; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerTestBase; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +import static org.mockito.Mockito.when; + +public class TestFairReservationSystem extends FairSchedulerTestBase { + private final static String ALLOC_FILE = new File(TEST_DIR, + TestFairReservationSystem.class.getName() + ".xml").getAbsolutePath(); + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + return conf; + } + + @Before + public void setup() throws IOException { + conf = createConfiguration(); + } + + @After + public void teardown() { + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + conf = null; + } + + @Test + public void testFairReservationSystemInitialize() throws IOException { + ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE); + + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + + // Setup + RMContext mockRMContext = testUtil.createRMContext(conf); + setupFairScheduler(testUtil, mockRMContext); + + FairReservationSystem reservationSystem = new FairReservationSystem(); + reservationSystem.setRMContext(mockRMContext); + + try { + reservationSystem.reinitialize(scheduler.getConf(), mockRMContext); + } catch (YarnException e) { + Assert.fail(e.getMessage()); + } + + ReservationSystemTestUtil.validateReservationQueue(reservationSystem, + testUtil.getFullReservationQueueName()); + } + + @Test + public void testFairReservationSystemReinitialize() throws IOException { + ReservationSystemTestUtil.setupFSAllocationFile(ALLOC_FILE); + + ReservationSystemTestUtil testUtil = new ReservationSystemTestUtil(); + + // Setup + RMContext mockContext = testUtil.createRMContext(conf); + setupFairScheduler(testUtil, mockContext); + + FairReservationSystem reservationSystem = new FairReservationSystem(); + reservationSystem.setRMContext(mockContext); + + try { + reservationSystem.reinitialize(scheduler.getConf(), mockContext); + } catch (YarnException e) { + Assert.fail(e.getMessage()); + } + + // Assert queue in original config + final String planQNam = testUtil.getFullReservationQueueName(); + ReservationSystemTestUtil.validateReservationQueue(reservationSystem, + planQNam); + + // Dynamically add a plan + ReservationSystemTestUtil.updateFSAllocationFile(ALLOC_FILE); + scheduler.reinitialize(conf, mockContext); + + try { + reservationSystem.reinitialize(conf, mockContext); + } catch (YarnException e) { + Assert.fail(e.getMessage()); + } + + String newQueue = "root.reservation"; + ReservationSystemTestUtil.validateNewReservationQueue + (reservationSystem, newQueue); + } + + private void setupFairScheduler(ReservationSystemTestUtil testUtil, + RMContext rmContext) throws + IOException { + + scheduler = new FairScheduler(); + scheduler.setRMContext(rmContext); + + int numContainers = 10; + when(rmContext.getScheduler()).thenReturn(scheduler); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, rmContext); + + Resource resource = testUtil.calculateClusterResource(numContainers); + RMNode node1 = MockNodes.newNodeInfo(1, resource, 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/a22ffc31/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 9a66a94..3c166a5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -27,6 +27,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; @@ -550,6 +551,86 @@ public class TestAllocationFileLoaderService { allocLoader.reloadAllocations(); } + + @Test + public void testReservableQueue() throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("DummyAgentName"); + out.println("AnyAdmissionPolicy"); + out.println(""); + out.close(); + + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); + allocLoader.reloadAllocations(); + + AllocationConfiguration allocConf = confHolder.allocConf; + String reservableQueueName = "root.reservable"; + String nonreservableQueueName = "root.other"; + assertFalse(allocConf.isReservable(nonreservableQueueName)); + assertTrue(allocConf.isReservable(reservableQueueName)); + + assertTrue(allocConf.getMoveOnExpiry(reservableQueueName)); + assertEquals(ReservationSchedulerConfiguration.DEFAULT_RESERVATION_WINDOW, + allocConf.getReservationWindow(reservableQueueName)); + assertEquals(100, allocConf.getInstantaneousMaxCapacity + (reservableQueueName), + 0.0001); + assertEquals( + "DummyAgentName", + allocConf.getReservationAgent(reservableQueueName)); + assertEquals(100, allocConf.getAverageCapacity(reservableQueueName), 0.001); + assertFalse(allocConf.getShowReservationAsQueues(reservableQueueName)); + assertEquals("AnyAdmissionPolicy", + allocConf.getReservationAdmissionPolicy(reservableQueueName)); + assertEquals(ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_PLANNER_NAME, + allocConf.getReplanner(reservableQueueName)); + assertEquals(ReservationSchedulerConfiguration + .DEFAULT_RESERVATION_ENFORCEMENT_WINDOW, + allocConf.getEnforcementWindow(reservableQueueName)); + } + + /** + * Verify that you can't have dynamic user queue and reservable queue on + * the same queue + */ + @Test (expected = AllocationConfigurationException.class) + public void testReservableCannotBeCombinedWithDynamicUserQueue() + throws Exception { + Configuration conf = new Configuration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); + allocLoader.init(conf); + ReloadListener confHolder = new ReloadListener(); + allocLoader.setReloadListener(confHolder); + allocLoader.reloadAllocations(); + } + private class ReloadListener implements AllocationFileLoaderService.Listener { public AllocationConfiguration allocConf;