From core-commits-return-5938-apmail-hadoop-core-commits-archive=hadoop.apache.org@hadoop.apache.org Thu Sep 11 18:49:58 2008 Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 4184 invoked from network); 11 Sep 2008 18:49:58 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 11 Sep 2008 18:49:58 -0000 Received: (qmail 31650 invoked by uid 500); 11 Sep 2008 18:49:55 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 31617 invoked by uid 500); 11 Sep 2008 18:49:55 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 31608 invoked by uid 99); 11 Sep 2008 18:49:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Sep 2008 11:49:55 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Sep 2008 18:48:56 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 90CDA2388A04; Thu, 11 Sep 2008 11:49:28 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r694415 [1/2] - in /hadoop/core/trunk: conf/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/ src/contrib/capacity-scheduler/src/java/ src/contrib/capacity-scheduler/src/java/org/ src/contrib/capacity-schedul... Date: Thu, 11 Sep 2008 18:49:26 -0000 To: core-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20080911184928.90CDA2388A04@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Thu Sep 11 11:49:22 2008 New Revision: 694415 URL: http://svn.apache.org/viewvc?rev=694415&view=rev Log: HADOOP-3445. Add capacity scheduler that provides guaranteed capacities to queues as a percentage of the cluster. (Vivek Ratan via omalley) Added: hadoop/core/trunk/conf/capacity-scheduler.xml.template hadoop/core/trunk/src/contrib/capacity-scheduler/ hadoop/core/trunk/src/contrib/capacity-scheduler/README hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml hadoop/core/trunk/src/contrib/capacity-scheduler/src/ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java Removed: hadoop/core/trunk/conf/resource-manager-conf.xml hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ResourceManagerConf.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestResourceManagerConf.java Modified: hadoop/core/trunk/conf/ (props changed) hadoop/core/trunk/src/contrib/build.xml hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Propchange: hadoop/core/trunk/conf/ ------------------------------------------------------------------------------ --- svn:ignore (original) +++ svn:ignore Thu Sep 11 11:49:22 2008 @@ -3,3 +3,4 @@ slaves masters hadoop-env.sh +capacity-scheduler.xml Added: hadoop/core/trunk/conf/capacity-scheduler.xml.template URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/capacity-scheduler.xml.template?rev=694415&view=auto ============================================================================== --- hadoop/core/trunk/conf/capacity-scheduler.xml.template (added) +++ hadoop/core/trunk/conf/capacity-scheduler.xml.template Thu Sep 11 11:49:22 2008 @@ -0,0 +1,50 @@ + + + + + + + + + + + mapred.capacity-scheduler.queue.default.guaranteed-capacity + 100 + Percentage of the number of slots in the cluster that are + guaranteed to be available for jobs in this queue. + + + + + mapred.capacity-scheduler.queue.default.reclaim-time-limit + 300 + The amount of time, in seconds, before which + resources distributed to other queues will be reclaimed. + + + + + mapred.capacity-scheduler.queue.default.supports-priority + false + If true, priorities of jobs will be taken into + account in scheduling decisions. + + + + + mapred.capacity-scheduler.queue.default.minimum-user-limit-percent + 100 + Each queue enforces a limit on the percentage of resources + allocated to a user at any given time, if there is competition for them. + This user limit can vary between a minimum and maximum value. The former + depends on the number of users who have submitted jobs, and the latter is + set to this property value. For example, suppose the value of this + property is 25. If two users have submitted jobs to a queue, no single + user can use more than 50% of the queue resources. If a third user submits + a job, no single user can use more than 33% of the queue resources. With 4 + or more users, no user can use more than 25% of the queue's resources. A + value of 100 implies no user limits are imposed. + + + + Modified: hadoop/core/trunk/src/contrib/build.xml URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/build.xml?rev=694415&r1=694414&r2=694415&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/build.xml (original) +++ hadoop/core/trunk/src/contrib/build.xml Thu Sep 11 11:49:22 2008 @@ -48,6 +48,7 @@ + Added: hadoop/core/trunk/src/contrib/capacity-scheduler/README URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/README?rev=694415&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/capacity-scheduler/README (added) +++ hadoop/core/trunk/src/contrib/capacity-scheduler/README Thu Sep 11 11:49:22 2008 @@ -0,0 +1,135 @@ +# Copyright 2008 The Apache Software Foundation Licensed under the +# Apache License, Version 2.0 (the "License"); you may not use this +# file except in compliance with the License. You may obtain a copy +# of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless +# required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. See the License for the specific language governing +# permissions and limitations under the License. + +This package implements a scheduler for Map-Reduce jobs, called Capacity +Task Scheduler (or just Capacity Scheduler), which provides a way to share +large clusters. The scheduler provides the following features (which are +described in detail in HADOOP-3421): + +* Support for queues, where a job is submitted to a queue. +* Queues are guaranteed a fraction of the capacity of the grid (their + 'guaranteed capacity') in the sense that a certain capacity of resources + will be at their disposal. All jobs submitted to the queues of an Org will + have access to the capacity guaranteed to the Org. +* Free resources can be allocated to any queue beyond its guaranteed capacity. + These excess allocated resources can be reclaimed and made available to + another queue in order to meet its capacity guarantee. +* The scheduler guarantees that excess resources taken from a queue will be + restored to it within N minutes of its need for them. +* Queues optionally support job priorities (disabled by default). +* Within a queue, jobs with higher priority will have access to the queue's + resources before jobs with lower priority. However, once a job is running, it + will not be preempted for a higher priority job. +* In order to prevent one or more users from monopolizing its resources, each + queue enforces a limit on the percentage of resources allocated to a user at + any given time, if there is competition for them. +* Support for memory-intensive jobs, wherein a job can optionally specify + higher memory-requirements than the default, and the tasks of the job will + only be run on TaskTrackers that have enough memory to spare. + +Whenever a TaskTracker is free, the Capacity Scheduler first picks a queue +that needs to reclaim any resources the earliest. If no such queue is found, +it then picks a queue which has most free space (whose ratio of # of running +slots to guaranteed capacity is the lowest). + +-------------------------------------------------------------------------------- + +BUILDING: + +In HADOOP_HOME, run ant package to build Hadoop and its contrib packages. + +-------------------------------------------------------------------------------- + +INSTALLING: + +To run the capacity scheduler in your Hadoop installation, you need to put it +on the CLASSPATH. The easiest way is to copy the +hadoop-*-capacity-scheduler.jar from +HADOOP_HOME/build/contrib/capacity-scheduler to HADOOP_HOME/lib. Alternatively +you can modify HADOOP_CLASSPATH to include this jar, in conf/hadoop-env.sh. + +You will also need to set the following property in the Hadoop config file +(conf/hadoop-site.xml) to have Hadoop use the capacity scheduler: + + + mapred.jobtracker.taskScheduler + org.apache.hadoop.mapred.CapacityTaskScheduler + + +-------------------------------------------------------------------------------- + +CONFIGURATION: + +The following properties can be set in hadoop-site.xml to configure the +scheduler: + +mapred.capacity-scheduler.reclaimCapacity.interval: + The capacity scheduler checks, every 'interval' seconds, whether any + capacity needs to be reclaimed. The default value is 5 seconds. + +The scheduling information for queues is maintained in a configuration file +called 'capacity-scheduler.xml'. Note that the queue names are set in +hadoop-site.xml. capacity-scheduler.xml sets the scheduling properties +for each queue. See that file for configuration details, but the following +are the configuration options for each queue: + +mapred.capacity-scheduler.queue..guaranteed-capacity + Percentage of the number of slots in the cluster that are + guaranteed to be available for jobs in this queue. + The sum of guaranteed capacities for all queues should be less than or + equal 100. + +mapred.capacity-scheduler.queue..reclaim-time-limit + The amount of time, in seconds, before which resources distributed to other + queues will be reclaimed. + +mapred.capacity-scheduler.queue..supports-priority + If true, priorities of jobs will be taken into account in scheduling + decisions. + +mapred.capacity-scheduler.queue..minimum-user-limit-percent + Each queue enforces a limit on the percentage of resources + allocated to a user at any given time, if there is competition for them. + This user limit can vary between a minimum and maximum value. The former + depends on the number of users who have submitted jobs, and the latter is + set to this property value. For example, suppose the value of this + property is 25. If two users have submitted jobs to a queue, no single + user can use more than 50% of the queue resources. If a third user submits + a job, no single user can use more than 33% of the queue resources. With 4 + or more users, no user can use more than 25% of the queue's resources. A + value of 100 implies no user limits are imposed. + + +-------------------------------------------------------------------------------- + +IMPLEMENTATION: + +When a TaskTracker is free, the capacity scheduler does the following (note +that many of these steps can be, and will be, enhanced over time to provide +better algorithms): +1. Decide whether to giev it a Map or Reduce task, depending on how many tasks +the TT is already running of that type, with respect to the maximum taks it +can run. +2. The scheduler then picks a queue. Queues that need to reclaim capacity +sooner, come before queues that don't. For queues that don't, they're ordered +by a ratio of (# of running tasks)/Guaranteed capacity, which indicates how +much 'free space' the queue has, or how much it is over capacity. +3. A job is picked in the queue based on its state (running jobs are picked +first), its priority (if the queue supports priorities) or its submission +time, and whether the job's user is under or over limit. +4. A task is picked from the job in the same way it always has. + +Periodically, a thread checks each queue to see if it needs to reclaim any +capacity. Queues that are running below capacity and that have tasks waiting, +need to reclaim capacity within a certain perdiod of time. If a queue hasn't +received enough tasks in a certain amount of time, tasks will be killed from +queues that are running over capacity. + +-------------------------------------------------------------------------------- Added: hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml?rev=694415&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml (added) +++ hadoop/core/trunk/src/contrib/capacity-scheduler/build.xml Thu Sep 11 11:49:22 2008 @@ -0,0 +1,28 @@ + + + + + + + + + + Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=694415&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (added) +++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Thu Sep 11 11:49:22 2008 @@ -0,0 +1,222 @@ +/** Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +/** + * Class providing access to resource manager configuration. + * + * Resource manager configuration involves setting up queues, and defining + * various properties for the queues. These are typically read from a file + * called resource-manager-conf.xml that must be in the classpath of the + * application. The class provides APIs to get/set and reload the + * configuration for the queues. + */ +class CapacitySchedulerConf { + + /** Default file name from which the resource manager configuration is read. */ + public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml"; + + /** Default value for guaranteed capacity of maps (as percentage). + * The default value is set to 100, to represent the entire queue. + */ + public static final float DEFAULT_GUARANTEED_CAPACITY = 100; + + /** Default value for reclaiming redistributed resources. + * The default value is set to 300. + */ + public static final int DEFAULT_RECLAIM_TIME_LIMIT = 300; + + /** Default value for minimum resource limit per user per queue, as a + * percentage. + * The default value is set to 100, the idea + * being that the default is suitable for organizations that do not + * require setting up any queues. + */ + public static final int DEFAULT_MIN_USER_LIMIT_PERCENT = 100; + + private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = + "mapred.capacity-scheduler.queue."; + + private Configuration rmConf; + + /** + * Create a new ResourceManagerConf. + * This method reads from the default configuration file mentioned in + * {@link RM_CONF_FILE}, that must be present in the classpath of the + * application. + */ + public CapacitySchedulerConf() { + rmConf = new Configuration(false); + rmConf.addResource(SCHEDULER_CONF_FILE); + } + + /** + * Create a new ResourceManagerConf reading the specified configuration + * file. + * + * @param configFile {@link Path} to the configuration file containing + * the resource manager configuration. + */ + public CapacitySchedulerConf(Path configFile) { + rmConf = new Configuration(false); + rmConf.addResource(configFile); + } + + /** + * Get the guaranteed percentage of the cluster for the specified queue. + * + * This method defaults to {@link #DEFAULT_GUARANTEED_CAPACITY} if + * no value is specified in the configuration for this queue. If the queue + * name is unknown, this method throws a {@link IllegalArgumentException} + * @param queue name of the queue + * @return guaranteed percent of the cluster for the queue. + */ + public float getGuaranteedCapacity(String queue) { + checkQueue(queue); + float result = rmConf.getFloat(toFullPropertyName(queue, + "guaranteed-capacity"), + DEFAULT_GUARANTEED_CAPACITY); + if (result < 0.0 || result > 100.0) { + throw new IllegalArgumentException("Illegal capacity for queue " + queue + + " of " + result); + } + return result; + } + + /** + * Get the amount of time before which redistributed resources must be + * reclaimed for the specified queue. + * + * The resource manager distributes spare capacity from a free queue + * to ones which are in need for more resources. However, if a job + * submitted to the first queue requires back the resources, they must + * be reclaimed within the specified configuration time limit. + * + * This method defaults to {@link #DEFAULT_RECLAIM_TIME_LIMIT} if + * no value is specified in the configuration for this queue. If the queue + * name is unknown, this method throws a {@link IllegalArgumentException} + * @param queue name of the queue + * @return reclaim time limit for this queue. + */ + public int getReclaimTimeLimit(String queue) { + checkQueue(queue); + return rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"), + DEFAULT_RECLAIM_TIME_LIMIT); + } + + /** + * Set the amount of time before which redistributed resources must be + * reclaimed for the specified queue. + * @param queue Name of the queue + * @param value Amount of time before which the redistributed resources + * must be retained. + */ + public void setReclaimTimeLimit(String queue, int value) { + checkQueue(queue); + rmConf.setInt(toFullPropertyName(queue, "reclaim-time-limit"), value); + } + + /** + * Get whether priority is supported for this queue. + * + * If this value is false, then job priorities will be ignored in + * scheduling decisions. This method defaults to false if + * the property is not configured for this queue. If the queue name is + * unknown, this method throws a {@link IllegalArgumentException} + * @param queue name of the queue + * @return Whether this queue supports priority or not. + */ + public boolean isPrioritySupported(String queue) { + checkQueue(queue); + return rmConf.getBoolean(toFullPropertyName(queue, "supports-priority"), + false); + } + + /** + * Set whether priority is supported for this queue. + * + * If the queue name is unknown, this method throws a + * {@link IllegalArgumentException} + * @param queue name of the queue + * @param value true, if the queue must support priorities, false otherwise. + */ + public void setPrioritySupported(String queue, boolean value) { + checkQueue(queue); + rmConf.setBoolean(toFullPropertyName(queue, "supports-priority"), value); + } + + /** + * Get the minimum limit of resources for any user submitting jobs in + * this queue, in percentage. + * + * This method defaults to {@link #DEFAULT_MIN_USER_LIMIT_PERCENT} if + * no value is specified in the configuration for this queue. If the queue + * name is unknown, this method throws a {@link IllegalArgumentException} + * @param queue name of the queue + * @return minimum limit of resources, in percentage, that will be + * available for a user. + * + */ + public int getMinimumUserLimitPercent(String queue) { + checkQueue(queue); + return rmConf.getInt(toFullPropertyName(queue, + "minimum-user-limit-percent"), + DEFAULT_MIN_USER_LIMIT_PERCENT); + } + + /** + * Set the minimum limit of resources for any user submitting jobs in + * this queue, in percentage. + * + * If the queue name is unknown, this method throws a + * {@link IllegalArgumentException} + * @param queue name of the queue + * @param value minimum limit of resources for any user submitting jobs + * in this queue + */ + public void setMinimumUserLimitPercent(String queue, int value) { + checkQueue(queue); + rmConf.setInt(toFullPropertyName(queue, "minimum-user-limit-percent"), + value); + } + + /** + * Reload configuration by clearing the information read from the + * underlying configuration file. + */ + public synchronized void reloadConfiguration() { + rmConf.reloadConfiguration(); + } + + private synchronized void checkQueue(String queue) { + /*if (queues == null) { + queues = getQueues(); + } + if (!queues.contains(queue)) { + throw new IllegalArgumentException("Queue " + queue + " is undefined."); + }*/ + } + + private static final String toFullPropertyName(String queue, + String property) { + return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property; + } +} Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=694415&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (added) +++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Thu Sep 11 11:49:22 2008 @@ -0,0 +1,1068 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobTracker.IllegalStateException; +import org.apache.hadoop.util.StringUtils; + +/** + * A {@link TaskScheduler} that implements the requirements in HADOOP-3421 + * and provides a HOD-less way to share large clusters. This scheduler + * provides the following features: + * * support for queues, where a job is submitted to a queue. + * * Queues are guaranteed a fraction of the capacity of the grid (their + * 'guaranteed capacity') in the sense that a certain capacity of resources + * will be at their disposal. All jobs submitted to the queues of an Org + * will have access to the capacity guaranteed to the Org. + * * Free resources can be allocated to any queue beyond its guaranteed + * capacity. These excess allocated resources can be reclaimed and made + * available to another queue in order to meet its capacity guarantee. + * * The scheduler guarantees that excess resources taken from a queue will + * be restored to it within N minutes of its need for them. + * * Queues optionally support job priorities (disabled by default). + * * Within a queue, jobs with higher priority will have access to the + * queue's resources before jobs with lower priority. However, once a job + * is running, it will not be preempted for a higher priority job. + * * In order to prevent one or more users from monopolizing its resources, + * each queue enforces a limit on the percentage of resources allocated to a + * user at any given time, if there is competition for them. + * + */ +class CapacityTaskScheduler extends TaskScheduler { + + /** + * For keeping track of reclaimed capacity. + * Whenever slots need to be reclaimed, we create one of these objects. + * As the queue gets slots, the amount to reclaim gets decremented. if + * we haven't reclaimed enough within a certain time, we need to kill + * tasks. This object 'expires' either if all resources are reclaimed + * before the deadline, or the deadline passes . + */ + private static class ReclaimedResource { + // how much resource to reclaim + public int originalAmount; + // how much is to be reclaimed currently + public int currentAmount; + // the time, in millisecs, when this object expires. + // This time is equal to the time when the object was created, plus + // the reclaim-time SLA for the queue. + public long whenToExpire; + // we also keep track of when to kill tasks, im millisecs. This is a + // fraction of 'whenToExpire', but we store it here so we don't + // recompute it every time. + public long whenToKill; + // whether tasks have been killed for this resource + boolean tasksKilled; + + public ReclaimedResource(int amount, long expiryTime, + long whenToKill) { + this.originalAmount = amount; + this.currentAmount = amount; + this.whenToExpire = expiryTime; + this.whenToKill = whenToKill; + this.tasksKilled = false; + } + } + + /** + * This class keeps track of scheduling info for each queue for either + * Map or Reduce tasks. . + * This scheduling information is used by the JT to decide how to allocate + * tasks, redistribute capacity, etc. + */ + private static class QueueSchedulingInfo { + String queueName; + + /** guaranteed capacity(%) is set at config time */ + float guaranteedCapacityPercent = 0; + /** + * the actual gc, which depends on how many slots are available + * in the cluster at any given time. + */ + int guaranteedCapacity = 0; + + /** + * we also keep track of how many tasks are running for all jobs in + * the queue, and how many overall tasks there are. This info is + * available for each job, but keeping a sum makes our algos faster. + */ + // number of running tasks + int numRunningTasks = 0; + // number of pending tasks + int numPendingTasks = 0; + + /** + * to handle user limits, we need to know how many users have jobs in + * the queue. + */ + Map numJobsByUser = new HashMap(); + /** for each user, we need to keep track of number of running tasks */ + Map numRunningTasksByUser = + new HashMap(); + + /** min value of user limit (same for all users) */ + int ulMin; + + /** + * We need to keep track of resources to reclaim. + * Whenever a queue is under capacity and has tasks pending, we offer it + * an SLA that gives it free slots equal to or greater than the gap in + * its capacity, within a period of time (reclaimTime). + * To do this, we periodically check if queues need to reclaim capacity. + * If they do, we create a ResourceReclaim object. We also periodically + * check if a queue has received enough free slots within, say, 80% of + * its reclaimTime. If not, we kill enough tasks to make up the + * difference. + * We keep two queues of ResourceReclaim objects. when an object is + * created, it is placed in one queue. Once we kill tasks to recover + * resources for that object, it is placed in an expiry queue. we need + * to do this to prevent creating spurious ResourceReclaim objects. We + * keep a count of total resources that are being reclaimed. Thsi count + * is decremented when an object expires. + */ + + /** + * reclaim time limit (in msec). This time represents the SLA we offer + * a queue - a queue gets back any lost capacity withing this period + * of time. + */ + long reclaimTime; + /** + * the list of resources to reclaim. This list is always sorted so that + * resources that need to be reclaimed sooner occur earlier in the list. + */ + LinkedList reclaimList = + new LinkedList(); + /** + * the list of resources to expire. This list is always sorted so that + * resources that need to be expired sooner occur earlier in the list. + */ + LinkedList reclaimExpireList = + new LinkedList(); + /** + * sum of all resources that are being reclaimed. + * We keep this to prevent unnecessary ReclaimResource objects from being + * created. + */ + int numReclaimedResources = 0; + + public QueueSchedulingInfo(String queueName, float guaranteedCapacity, + int ulMin, long reclaimTime) { + this.queueName = new String(queueName); + this.guaranteedCapacityPercent = guaranteedCapacity; + this.ulMin = ulMin; + this.reclaimTime = reclaimTime; + } + } + + /** + * This class handles the scheduling algorithms. + * The algos are the same for both Map and Reduce tasks. + * There may be slight variations later, in which case we can make this + * an abstract base class and have derived classes for Map and Reduce. + */ + private static abstract class TaskSchedulingMgr { + + /** quick way to get qsi object given a queue name */ + private Map queueInfoMap = + new HashMap(); + /** we keep track of the number of map or reduce slots we saw last */ + private int numSlots = 0; + /** our enclosing TaskScheduler object */ + protected CapacityTaskScheduler scheduler; + // for debugging + protected String type = null; + + abstract Task obtainNewTask(TaskTrackerStatus taskTracker, + JobInProgress job) throws IOException; + abstract int getClusterCapacity(); + abstract int getRunningTasks(JobInProgress job); + abstract int getPendingTasks(JobInProgress job); + abstract int killTasksFromJob(JobInProgress job, int tasksToKill); + + /** + * List of QSIs for assigning tasks. + * This list is ordered such that queues that need to reclaim capacity + * sooner, come before queues that don't. For queues that don't, they're + * ordered by a ratio of (# of running tasks)/Guaranteed capacity, which + * indicates how much 'free space' the queue has, or how much it is over + * capacity. This ordered list is iterated over, when assigning tasks. + */ + private List qsiForAssigningTasks = + new ArrayList(); + /** comparator to sort queues */ + private static final class QueueComparator + implements Comparator { + public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) { + // if one queue needs to reclaim something and the other one doesn't, + // the former is first + if ((0 == q1.reclaimList.size()) && (0 != q2.reclaimList.size())) { + return 1; + } + else if ((0 != q1.reclaimList.size()) && (0 == q2.reclaimList.size())){ + return -1; + } + else if ((0 == q1.reclaimList.size()) && (0 == q2.reclaimList.size())){ + // neither needs to reclaim. look at how much capacity they've filled + double r1 = (double)q1.numRunningTasks/(double)q1.guaranteedCapacity; + double r2 = (double)q2.numRunningTasks/(double)q2.guaranteedCapacity; + if (r1r2) return 1; + else return 0; + } + else { + // both have to reclaim. Look at which one needs to reclaim earlier + long t1 = q1.reclaimList.get(0).whenToKill; + long t2 = q2.reclaimList.get(0).whenToKill; + if (t1t2) return 1; + else return 0; + } + } + } + private final static QueueComparator queueComparator = new QueueComparator(); + + + TaskSchedulingMgr(CapacityTaskScheduler sched) { + scheduler = sched; + } + + private void add(QueueSchedulingInfo qsi) { + queueInfoMap.put(qsi.queueName, qsi); + qsiForAssigningTasks.add(qsi); + } + private int getNumQueues() { + return queueInfoMap.size(); + } + private boolean isQueuePresent(String queueName) { + return queueInfoMap.containsKey(queueName); + } + + /** + * Periodically, we walk through our queues to do the following: + * a. Check if a queue needs to reclaim any resources within a period + * of time (because it's running below capacity and more tasks are + * waiting) + * b. Check if a queue hasn't received enough of the resources it needed + * to be reclaimed and thus tasks need to be killed. + */ + private synchronized void reclaimCapacity() { + int tasksToKill = 0; + // with only one queue, there's nothing to do + if (queueInfoMap.size() < 2) { + return; + } + QueueSchedulingInfo lastQsi = + qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1); + long currentTime = scheduler.clock.getTime(); + for (QueueSchedulingInfo qsi: queueInfoMap.values()) { + // is there any resource that needs to be reclaimed? + if ((!qsi.reclaimList.isEmpty()) && + (qsi.reclaimList.getFirst().whenToKill < + currentTime + CapacityTaskScheduler.RECLAIM_CAPACITY_INTERVAL)) { + // make a note of how many tasks to kill to claim resources + tasksToKill += qsi.reclaimList.getFirst().currentAmount; + // move this to expiry list + ReclaimedResource r = qsi.reclaimList.remove(); + qsi.reclaimExpireList.add(r); + } + // is there any resource that needs to be expired? + if ((!qsi.reclaimExpireList.isEmpty()) && + (qsi.reclaimExpireList.getFirst().whenToExpire <= currentTime)) { + ReclaimedResource r = qsi.reclaimExpireList.remove(); + qsi.numReclaimedResources -= r.originalAmount; + } + // do we need to reclaim a resource later? + // if no queue is over capacity, there's nothing to reclaim + if (lastQsi.numRunningTasks <= lastQsi.guaranteedCapacity) { + continue; + } + if (qsi.numRunningTasks < qsi.guaranteedCapacity) { + // usedCap is how much capacity is currently accounted for + int usedCap = qsi.numRunningTasks + qsi.numReclaimedResources; + // see if we have remaining capacity and if we have enough pending + // tasks to use up remaining capacity + if ((usedCap < qsi.guaranteedCapacity) && + ((qsi.numPendingTasks - qsi.numReclaimedResources)>0)) { + // create a request for resources to be reclaimed + int amt = Math.min((qsi.guaranteedCapacity-usedCap), + (qsi.numPendingTasks - qsi.numReclaimedResources)); + // create a rsource object that needs to be reclaimed some time + // in the future + long whenToKill = qsi.reclaimTime - + (CapacityTaskScheduler.HEARTBEATS_LEFT_BEFORE_KILLING * + scheduler.taskTrackerManager.getNextHeartbeatInterval()); + if (whenToKill < 0) whenToKill = 0; + qsi.reclaimList.add(new ReclaimedResource(amt, + currentTime + qsi.reclaimTime, + currentTime + whenToKill)); + qsi.numReclaimedResources += amt; + LOG.debug("Queue " + qsi.queueName + " needs to reclaim " + + amt + " resources"); + } + } + } + // kill tasks to reclaim capacity + if (0 != tasksToKill) { + killTasks(tasksToKill); + } + } + + // kill 'tasksToKill' tasks + private void killTasks(int tasksToKill) + { + /* + * There are a number of fair ways in which one can figure out how + * many tasks to kill from which queue, so that the total number of + * tasks killed is equal to 'tasksToKill'. + * Maybe the best way is to keep a global ordering of running tasks + * and kill the ones that ran last, irrespective of what queue or + * job they belong to. + * What we do here is look at how many tasks is each queue running + * over capacity, and use that as a weight to decide how many tasks + * to kill from that queue. + */ + + // first, find out all queues over capacity + int loc; + for (loc=0; loc qsi.guaranteedCapacity) { + // all queues from here onwards are running over cap + break; + } + } + // if some queue needs to reclaim cap, there must be at least one queue + // over cap. But check, just in case. + if (loc == qsiForAssigningTasks.size()) { + LOG.warn("In Capacity scheduler, we need to kill " + tasksToKill + + " tasks but there is no queue over capacity."); + return; + } + // calculate how many total tasks are over cap + int tasksOverCap = 0; + for (int i=loc; i=0; i--) { + if (jobs[i].getStatus().getRunState() != JobStatus.RUNNING) { + continue; + } + tasksKilled += killTasksFromJob(jobs[i], tasksToKill-tasksKilled); + if (tasksKilled >= tasksToKill) break; + } + } + + // return the TaskAttemptID of the running task, if any, that has made + // the least progress. + TaskAttemptID getRunningTaskWithLeastProgress(TaskInProgress tip) { + double leastProgress = 1; + TaskAttemptID tID = null; + for (Iterator it = + tip.getActiveTasks().keySet().iterator(); it.hasNext();) { + TaskAttemptID taskid = it.next(); + TaskStatus status = tip.getTaskStatus(taskid); + if (status.getRunState() == TaskStatus.State.RUNNING) { + if (status.getProgress() < leastProgress) { + leastProgress = status.getProgress(); + tID = taskid; + } + } + } + return tID; + } + + + /** + * Update individual QSI objects. + * We don't need exact information for all variables, just enough for us + * to make scheduling decisions. For example, we don't need an exact count + * of numRunningTasks. Once we count upto the grid capacity (gcSum), any + * number beyond that will make no difference. + * */ + private synchronized void updateQSIObjects() { + // if # of slots have changed since last time, update. + // First, compute whether the total number of TT slots have changed + int slotsDiff = getClusterCapacity()- numSlots; + numSlots += slotsDiff; + for (QueueSchedulingInfo qsi: queueInfoMap.values()) { + // compute new GCs and ACs, if TT slots have changed + if (slotsDiff != 0) { + qsi.guaranteedCapacity += + (qsi.guaranteedCapacityPercent*slotsDiff/100); + } + qsi.numRunningTasks = 0; + qsi.numPendingTasks = 0; + for (String s: qsi.numRunningTasksByUser.keySet()) { + qsi.numRunningTasksByUser.put(s, 0); + } + // update stats on running jobs + for (JobInProgress j: + scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) { + if (j.getStatus().getRunState() != JobStatus.RUNNING) { + continue; + } + qsi.numRunningTasks += getRunningTasks(j); + Integer i = qsi.numRunningTasksByUser.get(j.getProfile().getUser()); + qsi.numRunningTasksByUser.put(j.getProfile().getUser(), + i+getRunningTasks(j)); + qsi.numPendingTasks += getPendingTasks(j); + LOG.debug("updateQSI: job " + j.toString() + ": run(m) = " + + j.runningMaps() + ", run(r) = " + j.runningReduces() + + ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + + j.finishedReduces() + ", failed(m) = " + j.failedMapTasks + + ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " + + j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks + + ", total(m) = " + j.numMapTasks + ", total(r) = " + + j.numReduceTasks); + /* + * it's fine walking down the entire list of running jobs - there + * probably will not be many, plus, we may need to go through the + * list to compute numRunningTasksByUser. If this is expensive, we + * can keep a list of running jobs per user. Then we only need to + * consider the first few jobs per user. + */ + } + // update stats on waiting jobs + for (JobInProgress j: + scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) { + // pending tasks + if (qsi.numPendingTasks > getClusterCapacity()) { + // that's plenty. no need for more computation + break; + } + qsi.numPendingTasks += getPendingTasks(j); + } + } + } + + + void jobAdded(JobInProgress job) { + // update qsi + QueueSchedulingInfo qsi = + queueInfoMap.get(job.getProfile().getQueueName()); + // qsi shouldn't be null + + // update user-specific info + Integer i = qsi.numJobsByUser.get(job.getProfile().getUser()); + if (null == i) { + qsi.numJobsByUser.put(job.getProfile().getUser(), 1); + qsi.numRunningTasksByUser.put(job.getProfile().getUser(), 0); + } + else { + i++; + } + } + void jobRemoved(JobInProgress job) { + // update qsi + QueueSchedulingInfo qsi = + queueInfoMap.get(job.getProfile().getQueueName()); + // qsi shouldn't be null + + // update numJobsByUser + LOG.debug("JOb to be removed for user " + job.getProfile().getUser()); + Integer i = qsi.numJobsByUser.get(job.getProfile().getUser()); + i--; + if (0 == i.intValue()) { + qsi.numJobsByUser.remove(job.getProfile().getUser()); + qsi.numRunningTasksByUser.remove(job.getProfile().getUser()); + LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size()); + } + else { + LOG.debug("User still has jobs, number of users = " + qsi.numJobsByUser.size()); + } + } + + // called when a task is allocated to queue represented by qsi. + // update our info about reclaimed resources + private synchronized void updateReclaimedResources(QueueSchedulingInfo qsi) { + // if we needed to reclaim resources, we have reclaimed one + if (qsi.reclaimList.isEmpty()) { + return; + } + ReclaimedResource res = qsi.reclaimList.getFirst(); + res.currentAmount--; + if (0 == res.currentAmount) { + // move this resource to the expiry list + ReclaimedResource r = qsi.reclaimList.remove(); + qsi.reclaimExpireList.add(r); + } + } + + private synchronized void updateCollectionOfQSIs() { + Collections.sort(qsiForAssigningTasks, queueComparator); + } + + + private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) { + // what is our current capacity? It's GC if we're running below GC. + // If we're running over GC, then its #running plus 1 (which is the + // extra slot we're getting). + int currentCapacity; + if (qsi.numRunningTasks < qsi.guaranteedCapacity) { + currentCapacity = qsi.guaranteedCapacity; + } + else { + currentCapacity = qsi.numRunningTasks+1; + } + int limit = Math.max((int)(Math.ceil((double)currentCapacity/ + (double)qsi.numJobsByUser.size())), + (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0))); + if (qsi.numRunningTasksByUser.get( + j.getProfile().getUser()) >= limit) { + LOG.debug("User " + j.getProfile().getUser() + + " is over limit, num running tasks = " + + qsi.numRunningTasksByUser.get(j.getProfile().getUser()) + + ", limit = " + limit); + return true; + } + else { + return false; + } + } + + private Task getTaskFromQueue(TaskTrackerStatus taskTracker, + QueueSchedulingInfo qsi) throws IOException { + Task t = null; + // keep track of users over limit + Set usersOverLimit = new HashSet(); + // look at running jobs first + for (JobInProgress j: + scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) { + // some jobs may be in the running queue but may have completed + // and not yet have been removed from the running queue + if (j.getStatus().getRunState() != JobStatus.RUNNING) { + continue; + } + // is this job's user over limit? + if (isUserOverLimit(j, qsi)) { + // user over limit. + usersOverLimit.add(j.getProfile().getUser()); + continue; + } + // We found a suitable job. Get task from it. + t = obtainNewTask(taskTracker, j); + if (t != null) { + LOG.debug("Got task from job " + + j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName); + return t; + } + } + + // if we're here, we found nothing in the running jobs. Time to + // look at waiting jobs. Get first job of a user that is not over limit + for (JobInProgress j: + scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) { + // is this job's user over limit? + if (usersOverLimit.contains(j.getProfile().getUser())) { + // user over limit. + continue; + } + // this job is a candidate for running. Initialize it, move it + // to run queue + j.initTasks(); + scheduler.jobQueuesManager.jobUpdated(j); + // We found a suitable job. Get task from it. + t = obtainNewTask(taskTracker, j); + if (t != null) { + LOG.debug("Getting task from job " + + j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName); + return t; + } + } + + // if we're here, we haven't found anything. This could be because + // there is nothing to run, or that the user limit for some user is + // too strict, i.e., there's at least one user who doesn't have + // enough tasks to satisfy his limit. If it's the later case, look at + // jobs without considering user limits, and get task from first + // eligible job + if (usersOverLimit.size() > 0) { + for (JobInProgress j: + scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) { + if ((j.getStatus().getRunState() == JobStatus.RUNNING) && + (usersOverLimit.contains(j.getProfile().getUser()))) { + t = obtainNewTask(taskTracker, j); + if (t != null) { + LOG.debug("Getting task from job " + + j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName); + return t; + } + } + } + // look at waiting jobs the same way + for (JobInProgress j: + scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName)) { + if (usersOverLimit.contains(j.getProfile().getUser())) { + j.initTasks(); + scheduler.jobQueuesManager.jobUpdated(j); + t = obtainNewTask(taskTracker, j); + if (t != null) { + LOG.debug("Getting task from job " + + j.getJobID().toStringWOPrefix() + " in queue " + qsi.queueName); + return t; + } + } + } + } + + return null; + } + + private List assignTasks(TaskTrackerStatus taskTracker) throws IOException { + Task t = null; + + /* + * update all our QSI objects. + * This involves updating each qsi structure. This operation depends + * on the number of running jobs in a queue, and some waiting jobs. If it + * becomes expensive, do it once every few hearbeats only. + */ + updateQSIObjects(); + LOG.debug("After updating QSI objects:"); + printQSIs(); + /* + * sort list of qeues first, as we want queues that need the most to + * get first access. If this is expensive, sort every few heartbeats. + * We're only sorting a collection of queues - there shouldn't be many. + */ + updateCollectionOfQSIs(); + for (QueueSchedulingInfo qsi: qsiForAssigningTasks) { + t = getTaskFromQueue(taskTracker, qsi); + if (t!= null) { + // we have a task. Update reclaimed resource info + updateReclaimedResources(qsi); + return Collections.singletonList(t); + } + } + + // nothing to give + return null; + } + + private void printQSIs() { + StringBuffer s = new StringBuffer(); + for (QueueSchedulingInfo qsi: qsiForAssigningTasks) { + Collection runJobs = + scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName); + Collection waitJobs = + scheduler.jobQueuesManager.getWaitingJobQueue(qsi.queueName); + s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" + + qsi.numRunningTasks + ", gc=" + qsi.guaranteedCapacity + + ", wait=" + qsi.numPendingTasks + ", run jobs="+ runJobs.size() + + ", wait jobs=" + waitJobs.size() + "*** "); + } + LOG.debug(s); + } + + } + + /** + * The scheduling algorithms for map tasks. + */ + private static class MapSchedulingMgr extends TaskSchedulingMgr { + MapSchedulingMgr(CapacityTaskScheduler dad) { + super(dad); + type = new String("map"); + } + Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) + throws IOException { + ClusterStatus clusterStatus = + scheduler.taskTrackerManager.getClusterStatus(); + int numTaskTrackers = clusterStatus.getTaskTrackers(); + return job.obtainNewMapTask(taskTracker, numTaskTrackers, + scheduler.taskTrackerManager.getNumberOfUniqueHosts()); + } + int getClusterCapacity() { + return scheduler.taskTrackerManager.getClusterStatus().getMaxMapTasks(); + } + int getRunningTasks(JobInProgress job) { + return job.runningMaps(); + } + int getPendingTasks(JobInProgress job) { + return job.pendingMaps(); + } + int killTasksFromJob(JobInProgress job, int tasksToKill) { + /* + * We'd like to kill tasks that ran the last, or that have made the + * least progress. + * Ideally, each job would have a list of tasks, sorted by start + * time or progress. That's a lot of state to keep, however. + * For now, we do something a little different. We first try and kill + * non-local tasks, as these can be run anywhere. For each TIP, we + * kill the task that has made the least progress, if the TIP has + * more than one active task. + * We then look at tasks in runningMapCache. + */ + int tasksKilled = 0; + + /* + * For non-local running maps, we 'cheat' a bit. We know that the set + * of non-local running maps has an insertion order such that tasks + * that ran last are at the end. So we iterate through the set in + * reverse. This is OK because even if the implementation changes, + * we're still using generic set iteration and are no worse of. + */ + TaskInProgress[] tips = + job.getNonLocalRunningMaps().toArray(new TaskInProgress[0]); + for (int i=tips.length-1; i>=0; i--) { + // pick the tast attempt that has progressed least + TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]); + if (null != tid) { + if (tips[i].killTask(tid, false)) { + if (++tasksKilled >= tasksToKill) { + return tasksKilled; + } + } + } + } + // now look at other running tasks + for (Set s: job.getRunningMapCache().values()) { + for (TaskInProgress tip: s) { + TaskAttemptID tid = getRunningTaskWithLeastProgress(tip); + if (null != tid) { + if (tip.killTask(tid, false)) { + if (++tasksKilled >= tasksToKill) { + return tasksKilled; + } + } + } + } + } + return tasksKilled; + } + + } + + /** + * The scheduling algorithms for reduce tasks. + */ + private static class ReduceSchedulingMgr extends TaskSchedulingMgr { + ReduceSchedulingMgr(CapacityTaskScheduler dad) { + super(dad); + type = new String("reduce"); + } + Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) + throws IOException { + ClusterStatus clusterStatus = + scheduler.taskTrackerManager.getClusterStatus(); + int numTaskTrackers = clusterStatus.getTaskTrackers(); + return job.obtainNewReduceTask(taskTracker, numTaskTrackers, + scheduler.taskTrackerManager.getNumberOfUniqueHosts()); + } + int getClusterCapacity() { + return scheduler.taskTrackerManager.getClusterStatus().getMaxReduceTasks(); + } + int getRunningTasks(JobInProgress job) { + return job.runningReduces(); + } + int getPendingTasks(JobInProgress job) { + return job.pendingReduces(); + } + int killTasksFromJob(JobInProgress job, int tasksToKill) { + /* + * For reduces, we 'cheat' a bit. We know that the set + * of running reduces has an insertion order such that tasks + * that ran last are at the end. So we iterate through the set in + * reverse. This is OK because even if the implementation changes, + * we're still using generic set iteration and are no worse of. + */ + int tasksKilled = 0; + TaskInProgress[] tips = + job.getRunningReduces().toArray(new TaskInProgress[0]); + for (int i=tips.length-1; i>=0; i--) { + // pick the tast attempt that has progressed least + TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]); + if (null != tid) { + if (tips[i].killTask(tid, false)) { + if (++tasksKilled >= tasksToKill) { + return tasksKilled; + } + } + } + } + return tasksKilled; + } + } + + /** the scheduling mgrs for Map and Reduce tasks */ + protected TaskSchedulingMgr mapScheduler = new MapSchedulingMgr(this); + protected TaskSchedulingMgr reduceScheduler = new ReduceSchedulingMgr(this); + + /** name of the default queue. */ + static final String DEFAULT_QUEUE_NAME = "default"; + + /** how often does redistribution thread run (in msecs)*/ + private static long RECLAIM_CAPACITY_INTERVAL; + /** we start killing tasks to reclaim capacity when we have so many + * heartbeats left. */ + private static final int HEARTBEATS_LEFT_BEFORE_KILLING = 3; + + private static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class); + protected JobQueuesManager jobQueuesManager; + protected CapacitySchedulerConf rmConf; + /** whether scheduler has started or not */ + private boolean started = false; + + /** + * Used to distribute/reclaim excess capacity among queues + */ + class ReclaimCapacity implements Runnable { + public ReclaimCapacity() { + } + public void run() { + while (true) { + try { + Thread.sleep(RECLAIM_CAPACITY_INTERVAL); + if (stopReclaim) { + break; + } + reclaimCapacity(); + } catch (InterruptedException t) { + break; + } catch (Throwable t) { + LOG.error("Error in redistributing capacity:\n" + + StringUtils.stringifyException(t)); + } + } + } + } + private Thread reclaimCapacityThread = null; + /** variable to indicate that thread should stop */ + private boolean stopReclaim = false; + + /** + * A clock class - can be mocked out for testing. + */ + static class Clock { + long getTime() { + return System.currentTimeMillis(); + } + } + private Clock clock; + + + public CapacityTaskScheduler() { + this(new Clock()); + } + + // for testing + public CapacityTaskScheduler(Clock clock) { + this.jobQueuesManager = new JobQueuesManager(this); + this.clock = clock; + } + + /** mostly for testing purposes */ + public void setResourceManagerConf(CapacitySchedulerConf conf) { + this.rmConf = conf; + } + + @Override + public synchronized void start() throws IOException { + if (started) return; + super.start(); + RECLAIM_CAPACITY_INTERVAL = + conf.getLong("mapred.capacity-scheduler.reclaimCapacity.interval", 5); + RECLAIM_CAPACITY_INTERVAL *= 1000; + // initialize our queues from the config settings + if (null == rmConf) { + rmConf = new CapacitySchedulerConf(); + } + // read queue info from config file + Set queues = taskTrackerManager.getQueueManager().getQueues(); + float totalCapacity = 0.0f; + for (String queueName: queues) { + float gc = rmConf.getGuaranteedCapacity(queueName); + totalCapacity += gc; + int ulMin = rmConf.getMinimumUserLimitPercent(queueName); + long reclaimTimeLimit = rmConf.getReclaimTimeLimit(queueName); + // reclaimTimeLimit is the time(in millisec) within which we need to + // reclaim capacity. + // create queue scheduling objects for Map and Reduce + mapScheduler.add(new QueueSchedulingInfo(queueName, gc, + ulMin, reclaimTimeLimit)); + reduceScheduler.add(new QueueSchedulingInfo(queueName, gc, + ulMin, reclaimTimeLimit)); + + // create the queues of job objects + boolean supportsPrio = rmConf.isPrioritySupported(queueName); + jobQueuesManager.createQueue(queueName, supportsPrio); + } + if (totalCapacity > 100.0) { + throw new IllegalArgumentException("Sum of queue capacities over 100% at " + + totalCapacity); + } + + // Sanity check: there should be at least one queue. + if (0 == mapScheduler.getNumQueues()) { + throw new IllegalStateException("System has no queue configured"); + } + + // check if there's a queue with the default name. If not, we quit. + if (!mapScheduler.isQueuePresent(DEFAULT_QUEUE_NAME)) { + throw new IllegalStateException("System has no default queue configured"); + } + + // listen to job changes + taskTrackerManager.addJobInProgressListener(jobQueuesManager); + + // start thread for redistributing capacity + this.reclaimCapacityThread = + new Thread(new ReclaimCapacity(),"reclaimCapacity"); + this.reclaimCapacityThread.start(); + started = true; + LOG.info("Capacity scheduler initialized " + queues.size() + " queues"); + } + + @Override + public synchronized void terminate() throws IOException { + if (!started) return; + if (jobQueuesManager != null) { + taskTrackerManager.removeJobInProgressListener( + jobQueuesManager); + } + // tell the reclaim thread to stop + stopReclaim = true; + started = false; + super.terminate(); + } + + @Override + public synchronized void setConf(Configuration conf) { + super.setConf(conf); + } + + void reclaimCapacity() { + mapScheduler.reclaimCapacity(); + reduceScheduler.reclaimCapacity(); + } + + /** + * provided for the test classes + * lets you update the QSI objects and sorted collection + */ + void updateQSIInfo() { + mapScheduler.updateQSIObjects(); + mapScheduler.updateCollectionOfQSIs(); + reduceScheduler.updateQSIObjects(); + reduceScheduler.updateCollectionOfQSIs(); + } + + /* + * The grand plan for assigning a task. + * First, decide whether a Map or Reduce task should be given to a TT + * (if the TT can accept either). + * Next, pick a queue. We only look at queues that need a slot. Among + * these, we first look at queues whose ac is less than gc (queues that + * gave up capacity in the past). Next, we look at any other queue that + * needs a slot. + * Next, pick a job in a queue. we pick the job at the front of the queue + * unless its user is over the user limit. + * Finally, given a job, pick a task from the job. + * + */ + @Override + public synchronized List assignTasks(TaskTrackerStatus taskTracker) + throws IOException { + + List tasks = null; + /* + * If TT has Map and Reduce slot free, we need to figure out whether to + * give it a Map or Reduce task. + * Number of ways to do this. For now, base decision on how much is needed + * versus how much is used (default to Map, if equal). + */ + LOG.debug("TT asking for task, max maps=" + taskTracker.getMaxMapTasks() + + ", run maps=" + taskTracker.countMapTasks() + ", max reds=" + + taskTracker.getMaxReduceTasks() + ", run reds=" + + taskTracker.countReduceTasks() + ", map cap=" + + mapScheduler.getClusterCapacity() + ", red cap = " + + reduceScheduler.getClusterCapacity()); + int maxMapTasks = taskTracker.getMaxMapTasks(); + int currentMapTasks = taskTracker.countMapTasks(); + int maxReduceTasks = taskTracker.getMaxReduceTasks(); + int currentReduceTasks = taskTracker.countReduceTasks(); + if ((maxReduceTasks - currentReduceTasks) > + (maxMapTasks - currentMapTasks)) { + tasks = reduceScheduler.assignTasks(taskTracker); + // if we didn't get any, look at map tasks, if TT has space + if ((null == tasks) && (maxMapTasks > currentMapTasks)) { + tasks = mapScheduler.assignTasks(taskTracker); + } + } + else { + tasks = mapScheduler.assignTasks(taskTracker); + // if we didn't get any, look at red tasks, if TT has space + if ((null == tasks) && (maxReduceTasks > currentReduceTasks)) { + tasks = reduceScheduler.assignTasks(taskTracker); + } + } + return tasks; + } + + // called when a job is added + synchronized void jobAdded(JobInProgress job) { + // let our map and reduce schedulers know this, so they can update + // user-specific info + mapScheduler.jobAdded(job); + reduceScheduler.jobAdded(job); + } + + // called when a job is removed + synchronized void jobRemoved(JobInProgress job) { + // let our map and reduce schedulers know this, so they can update + // user-specific info + mapScheduler.jobRemoved(job); + reduceScheduler.jobRemoved(job); + } + +} + Added: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=694415&view=auto ============================================================================== --- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (added) +++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Thu Sep 11 11:49:22 2008 @@ -0,0 +1,216 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * A {@link JobInProgressListener} that maintains the jobs being managed in + * one or more queues. + */ +class JobQueuesManager extends JobInProgressListener { + + /* + * If a queue supports priorities, waiting jobs must be + * sorted on priorities, and then on their start times (technically, + * their insertion time. + * If a queue doesn't support priorities, waiting jobs are + * sorted based on their start time. + * Running jobs are not sorted. A job that started running earlier + * is ahead in the queue, so insertion should be at the tail. + */ + + // comparator for jobs in queues that support priorities + private static final Comparator PRIORITY_JOB_COMPARATOR + = new Comparator() { + public int compare(JobInProgress o1, JobInProgress o2) { + // Look at priority. + int res = o1.getPriority().compareTo(o2.getPriority()); + if (res == 0) { + // the job that started earlier wins + if (o1.getStartTime() < o2.getStartTime()) { + res = -1; + } else { + res = (o1.getStartTime() == o2.getStartTime() ? 0 : 1); + } + } + if (res == 0) { + res = o1.getJobID().compareTo(o2.getJobID()); + } + return res; + } + }; + // comparator for jobs in queues that don't support priorities + private static final Comparator STARTTIME_JOB_COMPARATOR + = new Comparator() { + public int compare(JobInProgress o1, JobInProgress o2) { + // the job that started earlier wins + if (o1.getStartTime() < o2.getStartTime()) { + return -1; + } else { + return (o1.getStartTime() == o2.getStartTime() ? 0 : 1); + } + } + }; + + // class to store queue info + private static class QueueInfo { + + // whether the queue supports priorities + boolean supportsPriorities; + // maintain separate collections of running & waiting jobs. This we do + // mainly because when a new job is added, it cannot superceede a running + // job, even though the latter may be a lower priority. If this is ever + // changed, we may get by with one collection. + Collection waitingJobs; + Collection runningJobs; + + QueueInfo(boolean prio) { + this.supportsPriorities = prio; + if (supportsPriorities) { + this.waitingJobs = new TreeSet(PRIORITY_JOB_COMPARATOR); + } + else { + this.waitingJobs = new TreeSet(STARTTIME_JOB_COMPARATOR); + } + this.runningJobs = new LinkedList(); + } + + /** + * we need to delete an object from our TreeSet based on referential + * equality, rather than value equality that the TreeSet uses. + * Another way to do this is to extend the TreeSet and override remove(). + */ + static private boolean removeOb(Collection c, Object o) { + Iterator i = c.iterator(); + while (i.hasNext()) { + if (i.next() == o) { + i.remove(); + return true; + } + } + return false; + } + + } + + // we maintain a hashmap of queue-names to queue info + private Map jobQueues = + new HashMap(); + private static final Log LOG = LogFactory.getLog(JobQueuesManager.class); + private CapacityTaskScheduler scheduler; + + + JobQueuesManager(CapacityTaskScheduler s) { + this.scheduler = s; + } + + /** + * create an empty queue with the default comparator + * @param queueName The name of the queue + * @param supportsPriotities whether the queue supports priorities + */ + public void createQueue(String queueName, boolean supportsPriotities) { + jobQueues.put(queueName, new QueueInfo(supportsPriotities)); + } + + /** + * Returns the queue of running jobs associated with the name + */ + public Collection getRunningJobQueue(String queueName) { + return jobQueues.get(queueName).runningJobs; + } + + /** + * Returns the queue of waiting jobs associated with the name + */ + public Collection getWaitingJobQueue(String queueName) { + return jobQueues.get(queueName).waitingJobs; + } + + @Override + public void jobAdded(JobInProgress job) { + LOG.info("Job submitted to queue " + job.getProfile().getQueueName()); + // add job to the right queue + QueueInfo qi = jobQueues.get(job.getProfile().getQueueName()); + if (null == qi) { + // job was submitted to a queue we're not aware of + LOG.warn("Invalid queue " + job.getProfile().getQueueName() + + " specified for job" + job.getProfile().getJobID() + + ". Ignoring job."); + return; + } + // add job to waiting queue. It will end up in the right place, + // based on priority. + // We use our own version of removing objects based on referential + // equality, since the 'job' object has already been changed. + qi.waitingJobs.add(job); + // let scheduler know. + scheduler.jobAdded(job); + } + + @Override + public void jobRemoved(JobInProgress job) { + QueueInfo qi = jobQueues.get(job.getProfile().getQueueName()); + if (null == qi) { + // can't find queue for job. Shouldn't happen. + LOG.warn("Could not find queue " + job.getProfile().getQueueName() + + " when removing job " + job.getProfile().getJobID()); + return; + } + // job could be in running or waiting queue + if (!qi.runningJobs.remove(job)) { + QueueInfo.removeOb(qi.waitingJobs, job); + } + // let scheduler know + scheduler.jobRemoved(job); + } + + @Override + public void jobUpdated(JobInProgress job) { + QueueInfo qi = jobQueues.get(job.getProfile().getQueueName()); + if (null == qi) { + // can't find queue for job. Shouldn't happen. + LOG.warn("Could not find queue " + job.getProfile().getQueueName() + + " when updating job " + job.getProfile().getJobID()); + return; + } + // this is called when a job's priority or state is changed. + // since we don't know the job's previous state, we need to + // find out in which queue it was earlier, and then place it in the + // right queue. + Collection dest = (job.getStatus().getRunState() == + JobStatus.PREP)? qi.waitingJobs: qi.runningJobs; + // We use our own version of removing objects based on referential + // equality, since the 'job' object has already been changed. + if (!QueueInfo.removeOb(qi.waitingJobs, job)) { + qi.runningJobs.remove(job); + } + dest.add(job); + } + +}