From core-commits-return-8626-apmail-hadoop-core-commits-archive=hadoop.apache.org@hadoop.apache.org Tue May 05 07:30:54 2009 Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 2886 invoked from network); 5 May 2009 07:30:54 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 5 May 2009 07:30:54 -0000 Received: (qmail 94469 invoked by uid 500); 5 May 2009 07:30:53 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 94397 invoked by uid 500); 5 May 2009 07:30:53 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 94388 invoked by uid 99); 5 May 2009 07:30:53 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2009 07:30:53 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2009 07:30:42 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 247B323888EB; Tue, 5 May 2009 07:30:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r771607 [1/2] - in /hadoop/core/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ Date: Tue, 05 May 2009 07:30:20 -0000 To: core-commits@hadoop.apache.org From: yhemanth@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090505073021.247B323888EB@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: yhemanth Date: Tue May 5 07:30:20 2009 New Revision: 771607 URL: http://svn.apache.org/viewvc?rev=771607&view=rev Log: HADOOP-5726. Remove pre-emption from capacity scheduler code base. Contributed by Rahul Kumar Singh. Modified: hadoop/core/trunk/CHANGES.txt hadoop/core/trunk/conf/capacity-scheduler.xml.template hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java Modified: hadoop/core/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=771607&r1=771606&r2=771607&view=diff ============================================================================== --- hadoop/core/trunk/CHANGES.txt (original) +++ hadoop/core/trunk/CHANGES.txt Tue May 5 07:30:20 2009 @@ -527,6 +527,9 @@ INCOMPATIBLE CHANGES + HADOOP-5726. Remove pre-emption from capacity scheduler code base. + (Rahul Kumar Singh via yhemanth) + NEW FEATURES IMPROVEMENTS Modified: hadoop/core/trunk/conf/capacity-scheduler.xml.template URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/capacity-scheduler.xml.template?rev=771607&r1=771606&r2=771607&view=diff ============================================================================== --- hadoop/core/trunk/conf/capacity-scheduler.xml.template (original) +++ hadoop/core/trunk/conf/capacity-scheduler.xml.template Tue May 5 07:30:20 2009 @@ -8,22 +8,14 @@ - mapred.capacity-scheduler.queue.default.guaranteed-capacity + mapred.capacity-scheduler.queue.default.capacity 100 Percentage of the number of slots in the cluster that are - guaranteed to be available for jobs in this queue. + to be available for jobs in this queue. - mapred.capacity-scheduler.queue.default.reclaim-time-limit - 300 - The amount of time, in seconds, before which - resources distributed to other queues will be reclaimed. - - - - mapred.capacity-scheduler.queue.default.supports-priority false If true, priorities of jobs will be taken into @@ -54,29 +46,10 @@ - - - mapred.capacity-scheduler.reclaimCapacity.interval - 5 - The time interval, in seconds, between which the scheduler - periodically determines whether capacity needs to be reclaimed for - any queue. - - - - mapred.capacity-scheduler.default-reclaim-time-limit - 300 - The amount of time, in seconds, before which - resources distributed to other queues will be reclaimed by default - in a job queue. - - - - mapred.capacity-scheduler.default-supports-priority false If true, priorities of jobs will be taken into Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=771607&r1=771606&r2=771607&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original) +++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Tue May 5 07:30:20 2009 @@ -34,8 +34,6 @@ /** Default file name from which the resource manager configuration is read. */ public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml"; - private int defaultReclaimTime; - private int defaultUlimitMinimum; private boolean defaultSupportPriority; @@ -117,8 +115,6 @@ * which is used by the Capacity Scheduler. */ private void initializeDefaults() { - defaultReclaimTime = rmConf.getInt( - "mapred.capacity-scheduler.default-reclaim-time-limit",300); defaultUlimitMinimum = rmConf.getInt( "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100); defaultSupportPriority = rmConf.getBoolean( @@ -129,33 +125,33 @@ } /** - * Get the guaranteed percentage of the cluster for the specified queue. + * Get the percentage of the cluster for the specified queue. * - * This method defaults to configured default Guaranteed Capacity if + * This method defaults to configured default Capacity if * no value is specified in the configuration for this queue. * If the configured capacity is negative value or greater than 100 an * {@link IllegalArgumentException} is thrown. * - * If default Guaranteed capacity is not configured for a queue, then + * If default capacity is not configured for a queue, then * system allocates capacity based on what is free at the time of * capacity scheduler start * * * @param queue name of the queue - * @return guaranteed percent of the cluster for the queue. + * @return percent of the cluster for the queue. */ - public float getGuaranteedCapacity(String queue) { - //Check done in order to return default GC which can be negative - //In case of both GC and default GC not configured. + public float getCapacity(String queue) { + //Check done in order to return default capacity which can be negative + //In case of both capacity and default capacity not configured. //Last check is if the configuration is specified and is marked as //negative we throw exception String raw = rmConf.getRaw(toFullPropertyName(queue, - "guaranteed-capacity")); + "capacity")); if(raw == null) { return -1; } float result = rmConf.getFloat(toFullPropertyName(queue, - "guaranteed-capacity"), + "capacity"), -1); if (result < 0.0 || result > 100.0) { throw new IllegalArgumentException("Illegal capacity for queue " + queue + @@ -165,53 +161,13 @@ } /** - * Sets the Guaranteed capacity of the given queue. - * - * @param queue name of the queue - * @param gc guaranteed percent of the cluster for the queue. - */ - public void setGuaranteedCapacity(String queue,float gc) { - rmConf.setFloat(toFullPropertyName(queue, "guaranteed-capacity"),gc); - } - - - /** - * Get the amount of time before which redistributed resources must be - * reclaimed for the specified queue. - * - * The resource manager distributes spare capacity from a free queue - * to ones which are in need for more resources. However, if a job - * submitted to the first queue requires back the resources, they must - * be reclaimed within the specified configuration time limit. - * - * This method defaults to configured default reclaim time limit if - * no value is specified in the configuration for this queue. - * - * Throws an {@link IllegalArgumentException} when invalid value is - * configured. + * Sets the capacity of the given queue. * * @param queue name of the queue - * @return reclaim time limit for this queue. - */ - public int getReclaimTimeLimit(String queue) { - int reclaimTimeLimit = rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"), - defaultReclaimTime); - if(reclaimTimeLimit <= 0) { - throw new IllegalArgumentException("Invalid reclaim time limit : " - + reclaimTimeLimit + " for queue : " + queue); - } - return reclaimTimeLimit; - } - - /** - * Set the amount of time before which redistributed resources must be - * reclaimed for the specified queue. - * @param queue Name of the queue - * @param value Amount of time before which the redistributed resources - * must be retained. + * @param gc percent of the cluster for the queue. */ - public void setReclaimTimeLimit(String queue, int value) { - rmConf.setInt(toFullPropertyName(queue, "reclaim-time-limit"), value); + public void setCapacity(String queue,float gc) { + rmConf.setFloat(toFullPropertyName(queue, "capacity"),gc); } /** @@ -435,30 +391,4 @@ public void setDefaultPercentOfPmemInVmem(float value) { rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value); } - - /** - * Gets the reclaim capacity thread interval. - * - * @return reclaim capacity interval - */ - - public long getReclaimCapacityInterval() { - long reclaimCapacityInterval = - rmConf.getLong("mapred.capacity-scheduler.reclaimCapacity.interval", 5); - - if(reclaimCapacityInterval <= 0) { - throw new IllegalArgumentException("Invalid reclaim capacity " + - "interval, should be greater than zero"); - } - return reclaimCapacityInterval; - } - /** - * Sets the reclaim capacity thread interval. - * - * @param value - */ - public void setReclaimCapacityInterval(long value) { - rmConf.setLong("mapred.capacity-scheduler.reclaimCapacity.interval", - value); - } } Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=771607&r1=771606&r2=771607&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original) +++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue May 5 07:30:20 2009 @@ -42,15 +42,12 @@ * and provides a HOD-less way to share large clusters. This scheduler * provides the following features: * * support for queues, where a job is submitted to a queue. - * * Queues are guaranteed a fraction of the capacity of the grid (their - * 'guaranteed capacity') in the sense that a certain capacity of resources + * * Queues are assigned a fraction of the capacity of the grid (their + * 'capacity') in the sense that a certain capacity of resources * will be at their disposal. All jobs submitted to the queues of an Org - * will have access to the capacity guaranteed to the Org. - * * Free resources can be allocated to any queue beyond its guaranteed - * capacity. These excess allocated resources can be reclaimed and made - * available to another queue in order to meet its capacity guarantee. - * * The scheduler guarantees that excess resources taken from a queue will - * be restored to it within N minutes of its need for them. + * will have access to the capacity to the Org. + * * Free resources can be allocated to any queue beyond its + * capacity. * * Queues optionally support job priorities (disabled by default). * * Within a queue, jobs with higher priority will have access to the * queue's resources before jobs with lower priority. However, once a job @@ -62,42 +59,11 @@ */ class CapacityTaskScheduler extends TaskScheduler { - /** - * For keeping track of reclaimed capacity. - * Whenever slots need to be reclaimed, we create one of these objects. - * As the queue gets slots, the amount to reclaim gets decremented. if - * we haven't reclaimed enough within a certain time, we need to kill - * tasks. This object 'expires' either if all resources are reclaimed - * before the deadline, or the deadline passes . - */ - private static class ReclaimedResource { - // how much resource to reclaim - public int originalAmount; - // how much is to be reclaimed currently - public int currentAmount; - // the time, in millisecs, when this object expires. - // This time is equal to the time when the object was created, plus - // the reclaim-time SLA for the queue. - public long whenToExpire; - // we also keep track of when to kill tasks, in millisecs. This is a - // fraction of 'whenToExpire', but we store it here so we don't - // recompute it every time. - public long whenToKill; - - public ReclaimedResource(int amount, long expiryTime, - long whenToKill) { - this.originalAmount = amount; - this.currentAmount = amount; - this.whenToExpire = expiryTime; - this.whenToKill = whenToKill; - } - } - /*********************************************************************** * Keeping track of scheduling information for queues * * We need to maintain scheduling information relevant to a queue (its - * name, guaranteed capacity, etc), along with information specific to + * name, capacity, etc), along with information specific to * each kind of task, Map or Reduce (num of running tasks, pending * tasks etc). * @@ -115,58 +81,18 @@ * the actual gc, which depends on how many slots are available * in the cluster at any given time. */ - int guaranteedCapacity = 0; + int capacity = 0; // number of running tasks int numRunningTasks = 0; - // number of pending tasks - int numPendingTasks = 0; /** for each user, we need to keep track of number of running tasks */ Map numRunningTasksByUser = new HashMap(); /** - * We need to keep track of resources to reclaim. - * Whenever a queue is under capacity and has tasks pending, we offer it - * an SLA that gives it free slots equal to or greater than the gap in - * its capacity, within a period of time (reclaimTime). - * To do this, we periodically check if queues need to reclaim capacity. - * If they do, we create a ResourceReclaim object. We also periodically - * check if a queue has received enough free slots within, say, 80% of - * its reclaimTime. If not, we kill enough tasks to make up the - * difference. - * We keep two queues of ResourceReclaim objects. when an object is - * created, it is placed in one queue. Once we kill tasks to recover - * resources for that object, it is placed in an expiry queue. we need - * to do this to prevent creating spurious ResourceReclaim objects. We - * keep a count of total resources that are being reclaimed. This count - * is decremented when an object expires. - */ - - /** - * the list of resources to reclaim. This list is always sorted so that - * resources that need to be reclaimed sooner occur earlier in the list. - */ - LinkedList reclaimList = - new LinkedList(); - /** - * the list of resources to expire. This list is always sorted so that - * resources that need to be expired sooner occur earlier in the list. - */ - LinkedList reclaimExpireList = - new LinkedList(); - /** - * sum of all resources that are being reclaimed. - * We keep this to prevent unnecessary ReclaimResource objects from being - * created. - */ - int numReclaimedResources = 0; - - /** * reset the variables associated with tasks */ void resetTaskVars() { numRunningTasks = 0; - numPendingTasks = 0; for (String s: numRunningTasksByUser.keySet()) { numRunningTasksByUser.put(s, 0); } @@ -176,11 +102,11 @@ * return information about the tasks */ public String toString(){ - float runningTasksAsPercent = guaranteedCapacity!= 0 ? - ((float)numRunningTasks * 100/guaranteedCapacity):0; + float runningTasksAsPercent = capacity!= 0 ? + ((float)numRunningTasks * 100/capacity):0; StringBuffer sb = new StringBuffer(); - sb.append("Guaranteed Capacity: " + guaranteedCapacity + "\n"); - sb.append(String.format("Running tasks: %.1f%% of Guaranteed Capacity\n", + sb.append("Capacity: " + capacity + "\n"); + sb.append(String.format("Running tasks: %.1f%% of Capacity\n", runningTasksAsPercent)); // include info on active users if (numRunningTasks != 0) { @@ -202,8 +128,8 @@ private static class QueueSchedulingInfo { String queueName; - /** guaranteed capacity(%) is set in the config */ - float guaranteedCapacityPercent = 0; + /** capacity(%) is set in the config */ + float capacityPercent = 0; /** * to handle user limits, we need to know how many users have jobs in @@ -215,13 +141,6 @@ int ulMin; /** - * reclaim time limit (in msec). This time represents the SLA we offer - * a queue - a queue gets back any lost capacity withing this period - * of time. - */ - long reclaimTime; - - /** * We keep track of the JobQueuesManager only for reporting purposes * (in toString()). */ @@ -234,11 +153,10 @@ TaskSchedulingInfo reduceTSI; public QueueSchedulingInfo(String queueName, float gcPercent, - int ulMin, long reclaimTime, JobQueuesManager jobQueuesManager) { + int ulMin, JobQueuesManager jobQueuesManager) { this.queueName = new String(queueName); - this.guaranteedCapacityPercent = gcPercent; + this.capacityPercent = gcPercent; this.ulMin = ulMin; - this.reclaimTime = reclaimTime; this.jobQueuesManager = jobQueuesManager; this.mapTSI = new TaskSchedulingInfo(); this.reduceTSI = new TaskSchedulingInfo(); @@ -253,12 +171,10 @@ StringBuffer sb = new StringBuffer(); sb.append("Queue configuration\n"); //sb.append("Name: " + queueName + "\n"); - sb.append("Guaranteed Capacity Percentage: "); - sb.append(guaranteedCapacityPercent); + sb.append("Capacity Percentage: "); + sb.append(capacityPercent); sb.append("%\n"); sb.append(String.format("User Limit: %d%s\n",ulMin, "%")); - sb.append(String.format("Reclaim Time limit: %s\n", - StringUtils.formatTime(reclaimTime))); sb.append(String.format("Priority Supported: %s\n", (jobQueuesManager.doesQueueSupportPriorities(queueName))? "YES":"NO")); @@ -300,9 +216,8 @@ public String toString(){ // note that we do not call updateQSIObjects() here for performance // reasons. This means that the data we print out may be slightly - // stale. This data is updated whenever assignTasks() is called, or - // whenever the reclaim capacity thread runs, which should be fairly - // often. If neither of these happen, the data gets stale. If we see + // stale. This data is updated whenever assignTasks() is called + // If this doesn't happen, the data gets stale. If we see // this often, we may need to detect this situation and call // updateQSIObjects(), or just call it each time. return scheduler.getDisplayInfo(queueName); @@ -372,14 +287,11 @@ abstract Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) throws IOException; abstract int getPendingTasks(JobInProgress job); - abstract int killTasksFromJob(JobInProgress job, int tasksToKill); abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi); /** * List of QSIs for assigning tasks. - * This list is ordered such that queues that need to reclaim capacity - * sooner, come before queues that don't. For queues that don't, they're - * ordered by a ratio of (# of running tasks)/Guaranteed capacity, which + * Queues are ordered by a ratio of (# of running tasks)/capacity, which * indicates how much 'free space' the queue has, or how much it is over * capacity. This ordered list is iterated over, when assigning tasks. */ @@ -396,34 +308,15 @@ public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) { TaskSchedulingInfo t1 = getTSI(q1); TaskSchedulingInfo t2 = getTSI(q2); - // if one queue needs to reclaim something and the other one doesn't, - // the former is first - if ((0 == t1.reclaimList.size()) && (0 != t2.reclaimList.size())) { - return 1; - } - else if ((0 != t1.reclaimList.size()) && (0 == t2.reclaimList.size())){ - return -1; - } - else if ((0 == t1.reclaimList.size()) && (0 == t2.reclaimList.size())){ - // neither needs to reclaim. - // look at how much capacity they've filled. Treat a queue with gc=0 - // equivalent to a queue running at capacity - double r1 = (0 == t1.guaranteedCapacity)? 1.0f: - (double)t1.numRunningTasks/(double)t1.guaranteedCapacity; - double r2 = (0 == t2.guaranteedCapacity)? 1.0f: - (double)t2.numRunningTasks/(double)t2.guaranteedCapacity; - if (r1r2) return 1; - else return 0; - } - else { - // both have to reclaim. Look at which one needs to reclaim earlier - long tm1 = t1.reclaimList.get(0).whenToKill; - long tm2 = t2.reclaimList.get(0).whenToKill; - if (tm1tm2) return 1; - else return 0; - } + // look at how much capacity they've filled. Treat a queue with gc=0 + // equivalent to a queue running at capacity + double r1 = (0 == t1.capacity)? 1.0f: + (double)t1.numRunningTasks/(double)t1.capacity; + double r2 = (0 == t2.capacity)? 1.0f: + (double)t2.numRunningTasks/(double)t2.capacity; + if (r1r2) return 1; + else return 0; } } // subclass for map and reduce comparators @@ -454,197 +347,19 @@ Collections.sort(qsiForAssigningTasks, queueComparator); } - /** - * Periodically, we walk through our queues to do the following: - * a. Check if a queue needs to reclaim any resources within a period - * of time (because it's running below capacity and more tasks are - * waiting) - * b. Check if a queue hasn't received enough of the resources it needed - * to be reclaimed and thus tasks need to be killed. - * The caller is responsible for ensuring that the QSI objects and the - * collections are up-to-date. - * - * Make sure that we do not make any calls to scheduler.taskTrackerManager - * as this can result in a deadlock (see HADOOP-4977). - */ - private synchronized void reclaimCapacity(int nextHeartbeatInterval) { - int tasksToKill = 0; - - QueueSchedulingInfo lastQsi = - qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1); - TaskSchedulingInfo lastTsi = getTSI(lastQsi); - long currentTime = scheduler.clock.getTime(); - for (QueueSchedulingInfo qsi: qsiForAssigningTasks) { - TaskSchedulingInfo tsi = getTSI(qsi); - if (tsi.guaranteedCapacity <= 0) { - // no capacity, hence nothing can be reclaimed. - continue; - } - // is there any resource that needs to be reclaimed? - if ((!tsi.reclaimList.isEmpty()) && - (tsi.reclaimList.getFirst().whenToKill < - currentTime + CapacityTaskScheduler.RECLAIM_CAPACITY_INTERVAL)) { - // make a note of how many tasks to kill to claim resources - tasksToKill += tsi.reclaimList.getFirst().currentAmount; - // move this to expiry list - ReclaimedResource r = tsi.reclaimList.remove(); - tsi.reclaimExpireList.add(r); - } - // is there any resource that needs to be expired? - if ((!tsi.reclaimExpireList.isEmpty()) && - (tsi.reclaimExpireList.getFirst().whenToExpire <= currentTime)) { - ReclaimedResource r = tsi.reclaimExpireList.remove(); - tsi.numReclaimedResources -= r.originalAmount; - } - // do we need to reclaim a resource later? - // if no queue is over capacity, there's nothing to reclaim - if (lastTsi.numRunningTasks <= lastTsi.guaranteedCapacity) { - continue; - } - if (tsi.numRunningTasks < tsi.guaranteedCapacity) { - // usedCap is how much capacity is currently accounted for - int usedCap = tsi.numRunningTasks + tsi.numReclaimedResources; - // see if we have remaining capacity and if we have enough pending - // tasks to use up remaining capacity - if ((usedCap < tsi.guaranteedCapacity) && - ((tsi.numPendingTasks - tsi.numReclaimedResources)>0)) { - // create a request for resources to be reclaimed - int amt = Math.min((tsi.guaranteedCapacity-usedCap), - (tsi.numPendingTasks - tsi.numReclaimedResources)); - // create a resource object that needs to be reclaimed some time - // in the future - long whenToKill = qsi.reclaimTime - - (CapacityTaskScheduler.HEARTBEATS_LEFT_BEFORE_KILLING * - nextHeartbeatInterval); - if (whenToKill < 0) whenToKill = 0; - tsi.reclaimList.add(new ReclaimedResource(amt, - currentTime + qsi.reclaimTime, - currentTime + whenToKill)); - tsi.numReclaimedResources += amt; - LOG.debug("Queue " + qsi.queueName + " needs to reclaim " + - amt + " resources"); - } - } - } - // kill tasks to reclaim capacity - if (0 != tasksToKill) { - killTasks(tasksToKill); - } - } - - // kill 'tasksToKill' tasks - private void killTasks(int tasksToKill) - { - /* - * There are a number of fair ways in which one can figure out how - * many tasks to kill from which queue, so that the total number of - * tasks killed is equal to 'tasksToKill'. - * Maybe the best way is to keep a global ordering of running tasks - * and kill the ones that ran last, irrespective of what queue or - * job they belong to. - * What we do here is look at how many tasks is each queue running - * over capacity, and use that as a weight to decide how many tasks - * to kill from that queue. - */ - - // first, find out all queues over capacity - int loc; - for (loc=0; loc getTSI(qsi).guaranteedCapacity) { - // all queues from here onwards are running over cap - break; - } - } - // if some queue needs to reclaim cap, there must be at least one queue - // over cap. But check, just in case. - if (loc == qsiForAssigningTasks.size()) { - LOG.warn("In Capacity scheduler, we need to kill " + tasksToKill + - " tasks but there is no queue over capacity."); - return; - } - // calculate how many total tasks are over cap - int tasksOverCap = 0; - for (int i=loc; i=0; i--) { - if (jobs[i].getStatus().getRunState() != JobStatus.RUNNING) { - continue; - } - tasksKilled += killTasksFromJob(jobs[i], tasksToKill-tasksKilled); - if (tasksKilled >= tasksToKill) break; - } - } - - // return the TaskAttemptID of the running task, if any, that has made - // the least progress. - TaskAttemptID getRunningTaskWithLeastProgress(TaskInProgress tip) { - double leastProgress = 1; - TaskAttemptID tID = null; - for (Iterator it = - tip.getActiveTasks().keySet().iterator(); it.hasNext();) { - TaskAttemptID taskid = it.next(); - TaskStatus status = tip.getTaskStatus(taskid); - if (status.getRunState() == TaskStatus.State.RUNNING) { - if (status.getProgress() < leastProgress) { - leastProgress = status.getProgress(); - tID = taskid; - } - } - } - return tID; - } - - // called when a task is allocated to queue represented by qsi. - // update our info about reclaimed resources - private synchronized void updateReclaimedResources(QueueSchedulingInfo qsi) { - TaskSchedulingInfo tsi = getTSI(qsi); - // if we needed to reclaim resources, we have reclaimed one - if (tsi.reclaimList.isEmpty()) { - return; - } - ReclaimedResource res = tsi.reclaimList.getFirst(); - res.currentAmount--; - if (0 == res.currentAmount) { - // move this resource to the expiry list - ReclaimedResource r = tsi.reclaimList.remove(); - tsi.reclaimExpireList.add(r); - } - } - private synchronized void updateCollectionOfQSIs() { Collections.sort(qsiForAssigningTasks, queueComparator); } private boolean isUserOverLimit(String user, QueueSchedulingInfo qsi) { - // what is our current capacity? It's GC if we're running below GC. - // If we're running over GC, then its #running plus 1 (which is the + // what is our current capacity? It's capacity if we're running below capacity. + // If we're running over capacity, then its #running plus 1 (which is the // extra slot we're getting). int currentCapacity; TaskSchedulingInfo tsi = getTSI(qsi); - if (tsi.numRunningTasks < tsi.guaranteedCapacity) { - currentCapacity = tsi.guaranteedCapacity; + if (tsi.numRunningTasks < tsi.capacity) { + currentCapacity = tsi.capacity; } else { currentCapacity = tsi.numRunningTasks+1; @@ -760,7 +475,7 @@ for (QueueSchedulingInfo qsi : qsiForAssigningTasks) { // we may have queues with gc=0. We shouldn't look at jobs from // these queues - if (0 == getTSI(qsi).guaranteedCapacity) { + if (0 == getTSI(qsi).capacity) { continue; } TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi); @@ -772,8 +487,6 @@ // if we find a task, return if (lookUpStatus == TaskLookupResult.LookUpStatus.TASK_FOUND) { - // we have a task. Update reclaimed resource info - updateReclaimedResources(qsi); return tlr; } // if there was a memory mismatch, return @@ -795,8 +508,8 @@ Collection runJobs = scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName); s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" + - tsi.numRunningTasks + ", gc=" + tsi.guaranteedCapacity + - ", wait=" + tsi.numPendingTasks + ", run jobs="+ runJobs.size() + + tsi.numRunningTasks + ", gc=" + tsi.capacity + + ", run jobs="+ runJobs.size() + "*** "); } LOG.debug(s); @@ -830,55 +543,7 @@ int getPendingTasks(JobInProgress job) { return job.pendingMaps(); } - int killTasksFromJob(JobInProgress job, int tasksToKill) { - /* - * We'd like to kill tasks that ran the last, or that have made the - * least progress. - * Ideally, each job would have a list of tasks, sorted by start - * time or progress. That's a lot of state to keep, however. - * For now, we do something a little different. We first try and kill - * non-local tasks, as these can be run anywhere. For each TIP, we - * kill the task that has made the least progress, if the TIP has - * more than one active task. - * We then look at tasks in runningMapCache. - */ - int tasksKilled = 0; - - /* - * For non-local running maps, we 'cheat' a bit. We know that the set - * of non-local running maps has an insertion order such that tasks - * that ran last are at the end. So we iterate through the set in - * reverse. This is OK because even if the implementation changes, - * we're still using generic set iteration and are no worse of. - */ - TaskInProgress[] tips = - job.getNonLocalRunningMaps().toArray(new TaskInProgress[0]); - for (int i=tips.length-1; i>=0; i--) { - // pick the tast attempt that has progressed least - TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]); - if (null != tid) { - if (tips[i].killTask(tid, false)) { - if (++tasksKilled >= tasksToKill) { - return tasksKilled; - } - } - } - } - // now look at other running tasks - for (Set s: job.getRunningMapCache().values()) { - for (TaskInProgress tip: s) { - TaskAttemptID tid = getRunningTaskWithLeastProgress(tip); - if (null != tid) { - if (tip.killTask(tid, false)) { - if (++tasksKilled >= tasksToKill) { - return tasksKilled; - } - } - } - } - } - return tasksKilled; - } + TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) { return qsi.mapTSI; } @@ -911,30 +576,7 @@ int getPendingTasks(JobInProgress job) { return job.pendingReduces(); } - int killTasksFromJob(JobInProgress job, int tasksToKill) { - /* - * For reduces, we 'cheat' a bit. We know that the set - * of running reduces has an insertion order such that tasks - * that ran last are at the end. So we iterate through the set in - * reverse. This is OK because even if the implementation changes, - * we're still using generic set iteration and are no worse of. - */ - int tasksKilled = 0; - TaskInProgress[] tips = - job.getRunningReduces().toArray(new TaskInProgress[0]); - for (int i=tips.length-1; i>=0; i--) { - // pick the tast attempt that has progressed least - TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]); - if (null != tid) { - if (tips[i].killTask(tid, false)) { - if (++tasksKilled >= tasksToKill) { - return tasksKilled; - } - } - } - } - return tasksKilled; - } + TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) { return qsi.reduceTSI; } @@ -953,12 +595,6 @@ /** name of the default queue. */ static final String DEFAULT_QUEUE_NAME = "default"; - /** how often does redistribution thread run (in msecs)*/ - private static long RECLAIM_CAPACITY_INTERVAL; - /** we start killing tasks to reclaim capacity when we have so many - * heartbeats left. */ - private static final int HEARTBEATS_LEFT_BEFORE_KILLING = 3; - static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class); protected JobQueuesManager jobQueuesManager; protected CapacitySchedulerConf schedConf; @@ -966,33 +602,6 @@ private boolean started = false; /** - * Used to distribute/reclaim excess capacity among queues - */ - class ReclaimCapacity implements Runnable { - public ReclaimCapacity() { - } - public void run() { - while (true) { - try { - Thread.sleep(RECLAIM_CAPACITY_INTERVAL); - if (stopReclaim) { - break; - } - reclaimCapacity(); - } catch (InterruptedException t) { - break; - } catch (Throwable t) { - LOG.error("Error in redistributing capacity:\n" + - StringUtils.stringifyException(t)); - } - } - } - } - private Thread reclaimCapacityThread = null; - /** variable to indicate that thread should stop */ - private boolean stopReclaim = false; - - /** * A clock class - can be mocked out for testing. */ static class Clock { @@ -1067,9 +676,6 @@ initializeMemoryRelatedConf(); - RECLAIM_CAPACITY_INTERVAL = schedConf.getReclaimCapacityInterval(); - RECLAIM_CAPACITY_INTERVAL *= 1000; - // read queue info from config file QueueManager queueManager = taskTrackerManager.getQueueManager(); Set queues = queueManager.getQueues(); @@ -1078,20 +684,19 @@ throw new IllegalStateException("System has no queue configured"); } - Set queuesWithoutConfiguredGC = new HashSet(); + Set queuesWithoutConfiguredCapacity = new HashSet(); float totalCapacity = 0.0f; for (String queueName: queues) { - float gc = schedConf.getGuaranteedCapacity(queueName); + float gc = schedConf.getCapacity(queueName); if(gc == -1.0) { - queuesWithoutConfiguredGC.add(queueName); + queuesWithoutConfiguredCapacity.add(queueName); }else { totalCapacity += gc; } int ulMin = schedConf.getMinimumUserLimitPercent(queueName); - long reclaimTimeLimit = schedConf.getReclaimTimeLimit(queueName) * 1000; // create our QSI and add to our hashmap QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, gc, - ulMin, reclaimTimeLimit, jobQueuesManager); + ulMin, jobQueuesManager); queueInfoMap.put(queueName, qsi); // create the queues of job objects @@ -1105,11 +710,11 @@ } float remainingQuantityToAllocate = 100 - totalCapacity; float quantityToAllocate = - remainingQuantityToAllocate/queuesWithoutConfiguredGC.size(); - for(String queue: queuesWithoutConfiguredGC) { + remainingQuantityToAllocate/queuesWithoutConfiguredCapacity.size(); + for(String queue: queuesWithoutConfiguredCapacity) { QueueSchedulingInfo qsi = queueInfoMap.get(queue); - qsi.guaranteedCapacityPercent = quantityToAllocate; - schedConf.setGuaranteedCapacity(queue, quantityToAllocate); + qsi.capacityPercent = quantityToAllocate; + schedConf.setCapacity(queue, quantityToAllocate); } // check if there's a queue with the default name. If not, we quit. @@ -1137,19 +742,9 @@ initializationPoller.setDaemon(true); initializationPoller.start(); - // start thread for redistributing capacity if we have more than - // one queue - if (queueInfoMap.size() > 1) { - this.reclaimCapacityThread = - new Thread(new ReclaimCapacity(),"reclaimCapacity"); - this.reclaimCapacityThread.start(); - } - else { - LOG.info("Only one queue present. Reclaim capacity thread not started."); - } - started = true; - LOG.info("Capacity scheduler initialized " + queues.size() + " queues"); } + LOG.info("Capacity scheduler initialized " + queues.size() + " queues"); + } /** mostly for testing purposes */ void setInitializationPoller(JobInitializationPoller p) { @@ -1163,8 +758,6 @@ taskTrackerManager.removeJobInProgressListener( jobQueuesManager); } - // tell the reclaim thread to stop - stopReclaim = true; started = false; initializationPoller.terminate(); super.terminate(); @@ -1176,27 +769,6 @@ } /** - * Reclaim capacity for both map & reduce tasks. - * Do not make this synchronized, since we call taskTrackerManager - * (see HADOOP-4977). - */ - void reclaimCapacity() { - // get the cluster capacity - ClusterStatus c = taskTrackerManager.getClusterStatus(); - int mapClusterCapacity = c.getMaxMapTasks(); - int reduceClusterCapacity = c.getMaxReduceTasks(); - int nextHeartbeatInterval = taskTrackerManager.getNextHeartbeatInterval(); - // update the QSI objects - updateQSIObjects(mapClusterCapacity, reduceClusterCapacity); - // update the qsi collections, since we depend on their ordering - mapScheduler.updateCollectionOfQSIs(); - reduceScheduler.updateCollectionOfQSIs(); - // now, reclaim - mapScheduler.reclaimCapacity(nextHeartbeatInterval); - reduceScheduler.reclaimCapacity(nextHeartbeatInterval); - } - - /** * provided for the test classes * lets you update the QSI objects and sorted collections */ @@ -1216,31 +788,27 @@ * to make scheduling decisions. For example, we don't need an exact count * of numRunningTasks. Once we count upto the grid capacity, any * number beyond that will make no difference. - * - * The pending task count is only required in reclaim capacity. So - * if the computation becomes expensive, we can add a boolean to - * denote if pending task computation is required or not. - * + * **/ - private synchronized void updateQSIObjects(int mapClusterCapacity, + private synchronized void updateQSIObjects(int mapClusterCapacity, int reduceClusterCapacity) { - // if # of slots have changed since last time, update. + // if # of slots have changed since last time, update. // First, compute whether the total number of TT slots have changed for (QueueSchedulingInfo qsi: queueInfoMap.values()) { - // compute new GCs, if TT slots have changed + // compute new capacities, if TT slots have changed if (mapClusterCapacity != prevMapClusterCapacity) { - qsi.mapTSI.guaranteedCapacity = - (int)(qsi.guaranteedCapacityPercent*mapClusterCapacity/100); + qsi.mapTSI.capacity = + (int)(qsi.capacityPercent*mapClusterCapacity/100); } if (reduceClusterCapacity != prevReduceClusterCapacity) { - qsi.reduceTSI.guaranteedCapacity = - (int)(qsi.guaranteedCapacityPercent*reduceClusterCapacity/100); + qsi.reduceTSI.capacity = + (int)(qsi.capacityPercent*reduceClusterCapacity/100); } // reset running/pending tasks, tasks per user qsi.mapTSI.resetTaskVars(); qsi.reduceTSI.resetTaskVars(); // update stats on running jobs - for (JobInProgress j: + for (JobInProgress j: jobQueuesManager.getRunningJobQueue(qsi.queueName)) { if (j.getStatus().getRunState() != JobStatus.RUNNING) { continue; @@ -1249,62 +817,36 @@ int runningReduces = j.runningReduces(); qsi.mapTSI.numRunningTasks += runningMaps; qsi.reduceTSI.numRunningTasks += runningReduces; - Integer i = + Integer i = qsi.mapTSI.numRunningTasksByUser.get(j.getProfile().getUser()); - qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(), + qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(), i+runningMaps); i = qsi.reduceTSI.numRunningTasksByUser.get(j.getProfile().getUser()); - qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(), + qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(), i+runningReduces); - qsi.mapTSI.numPendingTasks += j.pendingMaps(); - qsi.reduceTSI.numPendingTasks += j.pendingReduces(); LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " + - j.runningMaps() + ", run(r) = " + j.runningReduces() + - ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + - j.finishedReduces() + ", failed(m) = " + j.failedMapTasks + - ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " + - j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks - + ", total(m) = " + j.numMapTasks + ", total(r) = " + + j.runningMaps() + ", run(r) = " + j.runningReduces() + + ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + + j.finishedReduces() + ", failed(m) = " + j.failedMapTasks + + ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " + + j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks + + ", total(m) = " + j.numMapTasks + ", total(r) = " + j.numReduceTasks); - /* + /* * it's fine walking down the entire list of running jobs - there * probably will not be many, plus, we may need to go through the * list to compute numRunningTasksByUser. If this is expensive, we * can keep a list of running jobs per user. Then we only need to * consider the first few jobs per user. - */ - } - - //update stats on waiting jobs - for(JobInProgress j: jobQueuesManager.getWaitingJobs(qsi.queueName)) { - // pending tasks - if ((qsi.mapTSI.numPendingTasks > mapClusterCapacity) && - (qsi.reduceTSI.numPendingTasks > reduceClusterCapacity)) { - // that's plenty. no need for more computation - break; - } - /* - * Consider only the waiting jobs in the job queue. Job queue can - * contain: - * 1. Jobs which are in running state but not scheduled - * (these would also be present in running queue), the pending - * task count of these jobs is computed when scheduler walks - * through running job queue. - * 2. Jobs which are killed by user, but waiting job initialization - * poller to walk through the job queue to clean up killed jobs. */ - if (j.getStatus().getRunState() == JobStatus.PREP) { - qsi.mapTSI.numPendingTasks += j.pendingMaps(); - qsi.reduceTSI.numPendingTasks += j.pendingReduces(); - } } } - + prevMapClusterCapacity = mapClusterCapacity; prevReduceClusterCapacity = reduceClusterCapacity; } - /* + /* * The grand plan for assigning a task. * First, decide whether a Map or Reduce task should be given to a TT * (if the TT can accept either).