Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 8406 invoked from network); 4 Mar 2011 04:30:58 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 04:30:58 -0000 Received: (qmail 28831 invoked by uid 500); 4 Mar 2011 04:30:57 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 28793 invoked by uid 500); 4 Mar 2011 04:30:57 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 28785 invoked by uid 99); 4 Mar 2011 04:30:57 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:30:57 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 04:30:55 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id EF32E2388C0F; Fri, 4 Mar 2011 04:30:34 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1077573 [1/3] - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapred/ Date: Fri, 04 Mar 2011 04:30:34 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304043034.EF32E2388C0F@eris.apache.org> Author: omalley Date: Fri Mar 4 04:30:34 2011 New Revision: 1077573 URL: http://svn.apache.org/viewvc?rev=1077573&view=rev Log: commit 2845b2fe4208c72af24a511e2bcd98b82e195c58 Author: Arun C Murthy Date: Tue Jul 20 23:08:02 2010 -0700 MAPREDUCE-1872. Hardened CapacityScheduler to have comprehensive, coherent limits on tasks/jobs for jobs/users/queues. Also, added the ability to refresh queue definitions without the need to restart the JobTracker. +++ b/YAHOO-CHANGES.txt + MAPREDUCE-1872. Hardened CapacityScheduler to have comprehensive, coherent + limits on tasks/jobs for jobs/users/queues. Also, added the ability to + refresh queue definitions without the need to restart the JobTracker. + (acmurthy) + Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerServlet.java hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=1077573&r1=1077572&r2=1077573&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Mar 4 04:30:34 2011 @@ -36,6 +36,8 @@ class CapacitySchedulerConf { private int defaultUlimitMinimum; + private float defaultUserLimitFactor; + private boolean defaultSupportPriority; private static final String QUEUE_CONF_PROPERTY_NAME_PREFIX = @@ -98,7 +100,14 @@ class CapacitySchedulerConf { private Configuration rmConf; - private int defaultMaxJobsPerUsersToInitialize; + private int defaultInitToAcceptJobsFactor; + private int defaultMaxActiveTasksPerUserToInitialize; + private int defaultMaxActiveTasksPerQueueToInitialize; + + static final String MAX_SYSTEM_JOBS_KEY = + "mapred.capacity-scheduler.maximum-system-jobs"; + + static final int DEFAULT_MAX_SYSTEM_JOBS = 5000; /** * Create a new Capacity scheduler conf. @@ -130,13 +139,25 @@ class CapacitySchedulerConf { * which is used by the Capacity Scheduler. */ private void initializeDefaults() { - defaultUlimitMinimum = rmConf.getInt( - "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100); + defaultUlimitMinimum = + rmConf.getInt( + "mapred.capacity-scheduler.default-minimum-user-limit-percent", 100); + defaultUserLimitFactor = + rmConf.getFloat("mapred.capacity-scheduler.default-user-limit-factor", + 1.0f); defaultSupportPriority = rmConf.getBoolean( "mapred.capacity-scheduler.default-supports-priority", false); - defaultMaxJobsPerUsersToInitialize = rmConf.getInt( - "mapred.capacity-scheduler.default-maximum-initialized-jobs-per-user", - 2); + defaultMaxActiveTasksPerQueueToInitialize = + rmConf.getInt( + "mapred.capacity-scheduler.default-maximum-active-tasks-per-queue", + 200000); + defaultMaxActiveTasksPerUserToInitialize = + rmConf.getInt( + "mapred.capacity-scheduler.default-maximum-active-tasks-per-user", + 100000); + defaultInitToAcceptJobsFactor = + rmConf.getInt("mapred.capacity-scheduler.default-init-accept-jobs-factor", + 10); } /** @@ -294,6 +315,32 @@ class CapacitySchedulerConf { } /** + * Get the factor of queue capacity above which a single user in a queue + * can consume resources. + * + * @param queue queue name + * @return factor of queue capacity above which a single user in a queue + * can consume resources + */ + public float getUserLimitFactor(String queue) { + return rmConf.getFloat(toFullPropertyName(queue, "user-limit-factor"), + defaultUserLimitFactor); + } + + /** + * Set the factor of queue capacity above which a single user in a queue + * can consume resources. + * + * @param queue queue name + * @param userLimitFactor factor of queue capacity above which a single user + * in a queue can consume resources + */ + public void setUserLimitFactor(String queue, float userLimitFactor) { + rmConf.setFloat(toFullPropertyName(queue, "user-limit-factor"), + userLimitFactor); + } + + /** * Reload configuration by clearing the information read from the * underlying configuration file. */ @@ -307,38 +354,81 @@ class CapacitySchedulerConf { return QUEUE_CONF_PROPERTY_NAME_PREFIX + queue + "." + property; } - /** - * Gets the maximum number of jobs which are allowed to initialize in the - * job queue. - * - * @param queue queue name. - * @return maximum number of jobs allowed to be initialized per user. - * @throws IllegalArgumentException if maximum number of users is negative - * or zero. - */ - public int getMaxJobsPerUserToInitialize(String queue) { - int maxJobsPerUser = rmConf.getInt(toFullPropertyName(queue, - "maximum-initialized-jobs-per-user"), - defaultMaxJobsPerUsersToInitialize); - if(maxJobsPerUser <= 0) { + public int getMaxSystemJobs() { + int maxSystemJobs = + rmConf.getInt(MAX_SYSTEM_JOBS_KEY, DEFAULT_MAX_SYSTEM_JOBS); + if (maxSystemJobs <= 0) { + throw new IllegalArgumentException("Invalid maximum system jobs: " + + maxSystemJobs); + } + + return maxSystemJobs; + } + + public void setMaxSystemJobs(int maxSystemJobs) { + rmConf.setInt(MAX_SYSTEM_JOBS_KEY, maxSystemJobs); + } + + public int getInitToAcceptJobsFactor(String queue) { + int initToAccepFactor = + rmConf.getInt(toFullPropertyName(queue, "init-accept-jobs-factor"), + defaultInitToAcceptJobsFactor); + if(initToAccepFactor <= 0) { throw new IllegalArgumentException( - "Invalid maximum jobs per user configuration " + maxJobsPerUser); + "Invalid maximum jobs per user configuration " + initToAccepFactor); } - return maxJobsPerUser; + return initToAccepFactor; + } + + public void setInitToAcceptJobsFactor(String queue, int initToAcceptFactor) { + rmConf.setInt(toFullPropertyName(queue, "init-accept-jobs-factor"), + initToAcceptFactor); } /** - * Sets the maximum number of jobs which are allowed to be initialized - * for a user in the queue. + * Get the maximum active tasks per user to be initialized. * - * @param queue queue name. - * @param value maximum number of jobs allowed to be initialized per user. + * @param queue queue name */ - public void setMaxJobsPerUserToInitialize(String queue, int value) { - rmConf.setInt(toFullPropertyName(queue, - "maximum-initialized-jobs-per-user"), value); + public int getMaxInitializedActiveTasks(String queue) { + return rmConf.getInt(toFullPropertyName(queue, + "maximum-initialized-active-tasks"), + defaultMaxActiveTasksPerQueueToInitialize); } - + + /** + * Set the maximum active tasks per user to be initialized. + * + * @param queue queue name + * @param value maximum active tasks + */ + public void setMaxInitializedActiveTasks(String queue, int value) { + rmConf.setInt(toFullPropertyName(queue, "maximum-initialized-active-tasks"), + value); + } + + /** + * Get the maximum active tasks per user to be initialized. + * + * @param queue queue name + */ + public int getMaxInitializedActiveTasksPerUser(String queue) { + return rmConf.getInt(toFullPropertyName(queue, + "maximum-initialized-active-tasks-per-user"), + defaultMaxActiveTasksPerUserToInitialize); + } + + /** + * Set the maximum active tasks per user to be initialized. + * + * @param queue queue name + * @param value maximum active tasks + */ + public void setMaxInitializedActiveTasksPerUser(String queue, int value) { + rmConf.setInt(toFullPropertyName(queue, "maximum-initialized-active-tasks-per-user"), + value); + } + /** * Amount of time in milliseconds which poller thread and initialization * thread would sleep before looking at the queued jobs. Added: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java?rev=1077573&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java (added) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java Fri Mar 4 04:30:34 2011 @@ -0,0 +1,1061 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapred; + +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapred.CapacityTaskScheduler.TaskSchedulingMgr; +import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo; + + +/*********************************************************************** + * Keeping track of scheduling information for queues + * + * We need to maintain scheduling information relevant to a queue (its + * name, capacity, etc), along with information specific to + * each kind of task, Map or Reduce (num of running tasks, pending + * tasks etc). + * + * This scheduling information is used to decide how to allocate + * tasks, redistribute capacity, etc. + * + * A QueueSchedulingInfo(QSI) object represents scheduling information for + * a A TaskSchedulingInfo (TSI) object represents scheduling + * information for a particular kind of task (Map or Reduce). + * + **********************************************************************/ +class CapacitySchedulerQueue { + + static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class); + + private static class SlotsUsage { + /** + * the actual capacity, which depends on how many slots are available + * in the cluster at any given time. + */ + private int capacity = 0; + // number of running tasks + int numRunningTasks = 0; + // number of slots occupied by running tasks + int numSlotsOccupied = 0; + + //the actual maximum capacity which depends on how many slots are available + //in cluster at any given time. + private int maxCapacity = -1; + + /** + * for each user, we need to keep track of number of slots occupied by + * running tasks + */ + Map numSlotsOccupiedByUser = + new HashMap(); + + /** + * reset the variables associated with tasks + */ + void reset() { + numRunningTasks = 0; + numSlotsOccupied = 0; + numSlotsOccupiedByUser.clear(); + } + + + /** + * Returns the actual capacity. + * capacity. + * + * @return + */ + int getCapacity() { + return capacity; + } + + /** + * Mutator method for capacity + * + * @param capacity + */ + void setCapacity(int capacity) { + this.capacity = capacity; + } + + /** + * @return the numRunningTasks + */ + int getNumRunningTasks() { + return numRunningTasks; + } + + /** + * @return the numSlotsOccupied + */ + int getNumSlotsOccupied() { + return numSlotsOccupied; + } + + /** + * return information about the tasks + */ + @Override + public String toString() { + float occupiedSlotsAsPercent = + getCapacity() != 0 ? + ((float) numSlotsOccupied * 100 / getCapacity()) : 0; + StringBuffer sb = new StringBuffer(); + + sb.append("Capacity: " + capacity + " slots\n"); + + if(getMaxCapacity() >= 0) { + sb.append("Maximum capacity: " + getMaxCapacity() +" slots\n"); + } + sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n", + Integer.valueOf(numSlotsOccupied), Float + .valueOf(occupiedSlotsAsPercent))); + sb.append(String.format("Running tasks: %d\n", Integer + .valueOf(numRunningTasks))); + // include info on active users + if (numSlotsOccupied != 0) { + sb.append("Active users:\n"); + for (Map.Entry entry : numSlotsOccupiedByUser + .entrySet()) { + if ((entry.getValue() == null) || (entry.getValue().intValue() <= 0)) { + // user has no tasks running + continue; + } + sb.append("User '" + entry.getKey() + "': "); + int numSlotsOccupiedByThisUser = entry.getValue().intValue(); + float p = + (float) numSlotsOccupiedByThisUser * 100 / numSlotsOccupied; + sb.append(String.format("%d (%.1f%% of used capacity)\n", Long + .valueOf(numSlotsOccupiedByThisUser), Float.valueOf(p))); + } + } + return sb.toString(); + } + + int getMaxCapacity() { + return maxCapacity; + } + + void setMaxCapacity(int maxCapacity) { + this.maxCapacity = maxCapacity; + } + + int getNumSlotsOccupiedByUser(String user) { + Integer slots = numSlotsOccupiedByUser.get(user); + return (slots != null) ? slots : 0; + } + + + void updateCapacities(float capacityPercent, float maxCapacityPercent, + int clusterCapacity) { + //compute new capacity + setCapacity((int)(capacityPercent*clusterCapacity/100)); + + //compute new max map capacities + if(maxCapacityPercent > 0) { + setMaxCapacity((int)(maxCapacityPercent*clusterCapacity / 100)); + } + } + + void updateSlotsUsage(String user, int numRunningTasks, int numSlotsOccupied) { + this.numRunningTasks += numRunningTasks; + this.numSlotsOccupied += numSlotsOccupied; + Integer i = this.numSlotsOccupiedByUser.get(user); + int slots = numSlotsOccupied + ((i == null) ? 0 : i.intValue()); + this.numSlotsOccupiedByUser.put(user, slots); + } + } + + // Queue name + final String queueName; + + /** + * capacity(%) is set in the config + */ + volatile float capacityPercent = 0; + + + /** + * maxCapacityPercent(%) is set in config as + * mapred.capacity-scheduler..maximum-capacity + * maximum-capacity percent defines a limit beyond which a queue + * cannot expand. Remember this limit is dynamic and changes w.r.t + * cluster size. + */ + volatile float maxCapacityPercent = -1; + + /** + * to handle user limits, we need to know how many users have jobs in + * the + */ + Map numJobsByUser = new HashMap(); + + /** + * min value of user limit (same for all users) + */ + volatile int ulMin; + + /** + * The factor of queue-capacity above which a single user can consume + * queue resources. + */ + volatile float ulMinFactor; + + /** + * We keep a TaskSchedulingInfo object for each kind of task we support + */ + CapacitySchedulerQueue.SlotsUsage mapSlots; + CapacitySchedulerQueue.SlotsUsage reduceSlots; + + /** + * Whether the queue supports priorities. + */ + final boolean supportsPriorities; + + /** + * Information required to track job, user, queue limits + */ + + Map waitingJobs; // for waiting jobs + Map runningJobs; // for running jobs + + /** + * Active tasks in the queue + */ + int activeTasks = 0; + + /** + * Users in the queue + */ + Map users = new HashMap(); + + /** + * Comparator for ordering jobs in this queue + */ + public Comparator comparator; + + int maxJobsToInit; + int maxJobsToAccept; + int maxJobsPerUserToInit; + int maxJobsPerUserToAccept; + int maxActiveTasks; + int maxActiveTasksPerUser; + + // comparator for jobs in queues that don't support priorities + private static final Comparator STARTTIME_JOB_COMPARATOR + = new Comparator() { + public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) { + // the job that started earlier wins + if (o1.getStartTime() < o2.getStartTime()) { + return -1; + } else { + return (o1.getStartTime() == o2.getStartTime() + ? o1.getJobID().compareTo(o2.getJobID()) + : 1); + } + } + }; + + public CapacitySchedulerQueue(String queueName, CapacitySchedulerConf conf) { + this.queueName = queueName; + + // Do not allow changes to 'supportsPriorities' + supportsPriorities = conf.isPrioritySupported(queueName); + + initializeQueue(conf); + + if (supportsPriorities) { + // use the default priority-aware comparator + comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR; + } + else { + comparator = STARTTIME_JOB_COMPARATOR; + } + this.waitingJobs = + new TreeMap(comparator); + this.runningJobs = + new TreeMap(comparator); + + this.mapSlots = new SlotsUsage(); + this.reduceSlots = new SlotsUsage(); + } + + synchronized void init(float capacityPercent, float maxCapacityPercent, + int ulMin, float ulMinFactor, + int maxJobsToInit, int maxJobsPerUserToInit, + int maxActiveTasks, int maxActiveTasksPerUser, + int maxJobsToAccept, int maxJobsPerUserToAccept) { + this.capacityPercent = capacityPercent; + this.maxCapacityPercent = maxCapacityPercent; + this.ulMin = ulMin; + this.ulMinFactor = ulMinFactor; + + this.maxJobsToInit = maxJobsToInit; + this.maxJobsPerUserToInit = maxJobsPerUserToInit; + this.maxActiveTasks = maxActiveTasks; + this.maxActiveTasksPerUser = maxActiveTasksPerUser; + this.maxJobsToAccept = maxJobsToAccept; + this.maxJobsPerUserToAccept = maxJobsPerUserToAccept; + + LOG.info("Initialized '" + queueName + "' queue with " + + "cap=" + capacityPercent + ", " + + "maxCap=" + maxCapacityPercent + ", " + + "ulMin=" + ulMin + ", " + + "ulMinFactor=" + ulMinFactor + ", " + + "supportsPriorities=" + supportsPriorities + ", " + + "maxJobsToInit=" + maxJobsToInit + ", " + + "maxJobsToAccept=" + maxJobsToAccept + ", " + + "maxActiveTasks=" + maxActiveTasks + ", " + + "maxJobsPerUserToInit=" + maxJobsPerUserToInit + ", " + + "maxJobsPerUserToAccept=" + maxJobsPerUserToAccept + ", " + + "maxActiveTasksPerUser=" + maxActiveTasksPerUser + ); + } + + synchronized void initializeQueue(CapacitySchedulerQueue other) { + init(other.capacityPercent, other.maxCapacityPercent, + other.ulMin, other.ulMinFactor, + other.maxJobsToInit, other.maxJobsPerUserToInit, + other.maxActiveTasks, other.maxActiveTasksPerUser, + other.maxJobsToAccept, other.maxJobsPerUserToAccept); + } + + synchronized void initializeQueue(CapacitySchedulerConf conf) { + float capacityPercent = conf.getCapacity(queueName); + float maxCapacityPercent = conf.getMaxCapacity(queueName); + int ulMin = conf.getMinimumUserLimitPercent(queueName); + float ulMinFactor = conf.getUserLimitFactor(queueName); + + int maxSystemJobs = conf.getMaxSystemJobs(); + int maxJobsToInit = (int)Math.ceil(maxSystemJobs * capacityPercent/100.0); + int maxJobsPerUserToInit = + (int)Math.ceil(maxSystemJobs * capacityPercent/100.0 * ulMin/100.0); + int maxActiveTasks = conf.getMaxInitializedActiveTasks(queueName); + int maxActiveTasksPerUser = + conf.getMaxInitializedActiveTasksPerUser(queueName); + + int jobInitToAcceptFactor = conf.getInitToAcceptJobsFactor(queueName); + int maxJobsToAccept = maxJobsToInit * jobInitToAcceptFactor; + int maxJobsPerUserToAccept = maxJobsPerUserToInit * jobInitToAcceptFactor; + + init(capacityPercent, maxCapacityPercent, + ulMin, ulMinFactor, + maxJobsToInit, maxJobsPerUserToInit, + maxActiveTasks, maxActiveTasksPerUser, + maxJobsToAccept, maxJobsPerUserToAccept); + } + + /** + * @return the queueName + */ + String getQueueName() { + return queueName; + } + + /** + * @return the capacityPercent + */ + float getCapacityPercent() { + return capacityPercent; + } + + /** + * reset the variables associated with tasks + */ + void resetSlotsUsage(TaskType taskType) { + if (taskType == TaskType.MAP) { + mapSlots.reset(); + } else if (taskType == TaskType.REDUCE) { + reduceSlots.reset(); + } else { + throw new IllegalArgumentException("Illegal taskType=" + taskType); + } + } + + + /** + * Returns the actual capacity in terms of slots for the taskType. + * @param taskType + * @return actual capacity in terms of slots for the taskType + */ + int getCapacity(TaskType taskType) { + if (taskType == TaskType.MAP) { + return mapSlots.getCapacity(); + } else if (taskType == TaskType.REDUCE) { + return reduceSlots.getCapacity(); + } + + throw new IllegalArgumentException("Illegal taskType=" + taskType); + } + + /** + * Get the number of running tasks of the given taskType. + * @param taskType + * @return + */ + int getNumRunningTasks(TaskType taskType) { + if (taskType == TaskType.MAP) { + return mapSlots.getNumRunningTasks(); + } else if (taskType == TaskType.REDUCE) { + return reduceSlots.getNumRunningTasks(); + } + + throw new IllegalArgumentException("Illegal taskType=" + taskType); + } + + /** + * Get number of slots occupied of the taskType. + * @param taskType + * @return number of slots occupied of the taskType + */ + int getNumSlotsOccupied(TaskType taskType) { + if (taskType == TaskType.MAP) { + return mapSlots.getNumSlotsOccupied(); + } else if (taskType == TaskType.REDUCE) { + return reduceSlots.getNumSlotsOccupied(); + } + + throw new IllegalArgumentException("Illegal taskType=" + taskType); + } + + /** + * Get maximum number of slots for the taskType. + * @param taskType + * @return maximum number of slots for the taskType + */ + int getMaxCapacity(TaskType taskType) { + if (taskType == TaskType.MAP) { + return mapSlots.getMaxCapacity(); + } else if (taskType == TaskType.REDUCE) { + return reduceSlots.getMaxCapacity(); + } + + throw new IllegalArgumentException("Illegal taskType=" + taskType); + } + + /** + * Get number of slots occupied by a user of + * taskType. + * @param user + * @param taskType + * @return number of slots occupied by a user of + * taskType + */ + int getNumSlotsOccupiedByUser(String user, TaskType taskType) { + if (taskType == TaskType.MAP) { + return mapSlots.getNumSlotsOccupiedByUser(user); + } else if (taskType == TaskType.REDUCE) { + return reduceSlots.getNumSlotsOccupiedByUser(user); + } + + throw new IllegalArgumentException("Illegal taskType=" + taskType); + } + + /** + * A new job is added to the + * @param job + */ + void jobAdded(JobInProgress job) { + // update user-specific info + String user = job.getProfile().getUser(); + + Integer i = numJobsByUser.get(user); + if (null == i) { + i = 1; + // set the count for running tasks to 0 + mapSlots.numSlotsOccupiedByUser.put(user, 0); + reduceSlots.numSlotsOccupiedByUser.put(user, 0); + } + else { + i++; + } + numJobsByUser.put(user, i); + } + + int getNumJobsByUser(String user) { + Integer numJobs = numJobsByUser.get(user); + return (numJobs != null) ? numJobs : 0; + } + + /** + * A job from the queue has completed. + * @param job + */ + void jobCompleted(JobInProgress job) { + String user = job.getProfile().getUser(); + // update numJobsByUser + if (LOG.isDebugEnabled()) { + LOG.debug("Job to be removed for user " + user); + } + Integer i = numJobsByUser.get(job.getProfile().getUser()); + i--; // i should never be null! + if (0 == i.intValue()) { + numJobsByUser.remove(user); + // remove job footprint from our TSIs + mapSlots.numSlotsOccupiedByUser.remove(user); + reduceSlots.numSlotsOccupiedByUser.remove(user); + if (LOG.isDebugEnabled()) { + LOG.debug("No more jobs for user, number of users = " + + numJobsByUser.size()); + } + } + else { + numJobsByUser.put(user, i); + if (LOG.isDebugEnabled()) { + LOG.debug("User still has " + i + " jobs, number of users = " + + numJobsByUser.size()); + } + } + } + + /** + * Update queue usage. + * @param type + * @param user + * @param numRunningTasks + * @param numSlotsOccupied + */ + void update(TaskType type, String user, + int numRunningTasks, int numSlotsOccupied) { + if (type == TaskType.MAP) { + mapSlots.updateSlotsUsage(user, numRunningTasks, numSlotsOccupied); + } else if (type == TaskType.REDUCE) { + reduceSlots.updateSlotsUsage(user, numRunningTasks, numSlotsOccupied); + } + } + + /** + * Update queue usage across all running jobs. + * @param mapClusterCapacity + * @param reduceClusterCapacity + * @param mapScheduler + * @param reduceScheduler + */ + void updateAll(int mapClusterCapacity, int reduceClusterCapacity, + TaskSchedulingMgr mapScheduler, TaskSchedulingMgr reduceScheduler) { + // Compute new capacities for maps and reduces + mapSlots.updateCapacities(capacityPercent, maxCapacityPercent, + mapClusterCapacity); + reduceSlots.updateCapacities(capacityPercent, maxCapacityPercent, + reduceClusterCapacity); + + // reset running/pending tasks, tasks per user + resetSlotsUsage(TaskType.MAP); + resetSlotsUsage(TaskType.REDUCE); + + Collection jobs = getRunningJobs(); // Safe to iterate since + // we get a copy here + for (JobInProgress j : jobs) { + if (j.getStatus().getRunState() != JobStatus.RUNNING) { + continue; + } + + int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j); + int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j); + int numRunningMapSlots = + numMapsRunningForThisJob * mapScheduler.getSlotsPerTask(j); + int numRunningReduceSlots = + numReducesRunningForThisJob * reduceScheduler.getSlotsPerTask(j); + int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j); + int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j); + int numReservedMapSlotsForThisJob = + (mapScheduler.getNumReservedTaskTrackers(j) * + mapScheduler.getSlotsPerTask(j)); + int numReservedReduceSlotsForThisJob = + (reduceScheduler.getNumReservedTaskTrackers(j) * + reduceScheduler.getSlotsPerTask(j)); + + j.setSchedulingInfo( + CapacityTaskScheduler.getJobQueueSchedInfo(numMapsRunningForThisJob, + numRunningMapSlots, + numReservedMapSlotsForThisJob, + numReducesRunningForThisJob, + numRunningReduceSlots, + numReservedReduceSlotsForThisJob)); + + update(TaskType.MAP, j.getProfile().getUser(), + numMapsRunningForThisJob, numMapSlotsForThisJob); + update(TaskType.REDUCE, j.getProfile().getUser(), + numReducesRunningForThisJob, numReduceSlotsForThisJob); + + if (LOG.isDebugEnabled()) { + LOG.debug(String.format(queueName + " - updateQSI: job %s: run(m)=%d, " + + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d," + + " finished(r)=%d, failed(m)=%d, failed(r)=%d, " + + "spec(m)=%d, spec(r)=%d, total(m)=%d, total(r)=%d", j + .getJobID().toString(), Integer + .valueOf(numMapsRunningForThisJob), Integer + .valueOf(numMapSlotsForThisJob), Integer + .valueOf(numReducesRunningForThisJob), Integer + .valueOf(numReduceSlotsForThisJob), Integer.valueOf(j + .finishedMaps()), Integer.valueOf(j.finishedReduces()), Integer + .valueOf(j.failedMapTasks), + Integer.valueOf(j.failedReduceTasks), Integer + .valueOf(j.speculativeMapTasks), Integer + .valueOf(j.speculativeReduceTasks), Integer + .valueOf(j.numMapTasks), Integer.valueOf(j.numReduceTasks))); + } + } + } + + boolean doesQueueSupportPriorities() { + return supportsPriorities; + } + + /** + * return information about the queue + * + * @return a String representing the information about the + */ + @Override + public String toString(){ + // We print out the queue information first, followed by info + // on map and reduce tasks and job info + StringBuilder sb = new StringBuilder(); + sb.append("Queue configuration\n"); + sb.append("Capacity Percentage: "); + sb.append(capacityPercent); + sb.append("%\n"); + sb.append("User Limit: " + ulMin + "%\n"); + sb.append("Priority Supported: " + + (doesQueueSupportPriorities() ? "YES":"NO") + "\n"); + sb.append("-------------\n"); + + sb.append("Map tasks\n"); + sb.append(mapSlots.toString()); + sb.append("-------------\n"); + sb.append("Reduce tasks\n"); + sb.append(reduceSlots.toString()); + sb.append("-------------\n"); + + sb.append("Job info\n"); + sb.append("Number of Waiting Jobs: " + getNumWaitingJobs() + "\n"); + sb.append("Number of users who have submitted jobs: " + + numJobsByUser.size() + "\n"); + return sb.toString(); + } + + /** + * Functionality to deal with job initialization + */ + + + // per-user information + static class UserInfo { + int runningJobs; + int waitingJobs; + + int activeTasks; + + int getNumRunningJobs() { + return runningJobs; + } + + int getNumWaitingJobs() { + return waitingJobs; + } + + int getNumActiveTasks() { + return activeTasks; + } + + public void jobAdded(JobInProgress job) { + ++waitingJobs; + } + + public void jobInitialized(JobInProgress job) { + --waitingJobs; + + ++runningJobs; + activeTasks += job.desiredTasks(); + } + + public void jobCompleted(JobInProgress job) { + --runningJobs; + activeTasks -= job.desiredTasks(); + } + + boolean isInactive() { + return activeTasks == 0 && runningJobs == 0 && waitingJobs == 0; + } + } + + synchronized Collection getWaitingJobs() { + return Collections.unmodifiableCollection( + new LinkedList(waitingJobs.values())); + } + + synchronized Collection getRunningJobs() { + return Collections.unmodifiableCollection( + new LinkedList(runningJobs.values())); + } + + synchronized int getNumActiveTasks() { + return activeTasks; + } + + synchronized int getNumRunningJobs() { + return runningJobs.size(); + } + + synchronized int getNumRunningJobsByUser(String user) { + UserInfo userInfo = users.get(user); + return (userInfo == null) ? 0 : userInfo.getNumRunningJobs(); + } + + synchronized int getNumActiveTasksByUser(String user) { + UserInfo userInfo = users.get(user); + return (userInfo == null) ? 0 : userInfo.getNumActiveTasks(); + } + + synchronized int getNumWaitingJobsByUser(String user) { + UserInfo userInfo = users.get(user); + return (userInfo == null) ? 0 : userInfo.getNumWaitingJobs(); + } + + synchronized void addRunningJob(JobInProgress job) { + JobSchedulingInfo jobSchedInfo = new JobSchedulingInfo(job); + + if (runningJobs.containsKey(jobSchedInfo)) { + LOG.info("job " + job.getJobID() + " already running in queue'" + + queueName + "'!"); + return; + } + + runningJobs.put(jobSchedInfo,job); + + // Update queue stats + activeTasks += job.desiredTasks(); + + // Update user stats + String user = job.getProfile().getUser(); + UserInfo userInfo = users.get(user); + userInfo.jobInitialized(job); + } + + synchronized JobInProgress removeRunningJob(JobSchedulingInfo jobInfo) { + JobInProgress job = runningJobs.remove(jobInfo); + + // We have to be careful, we might be trying to remove a job + // which might not have been initialized + if (job != null) { + // Update user stats + String user = job.getProfile().getUser(); + synchronized (users) { + UserInfo userInfo = users.get(user); + + synchronized (userInfo) { + userInfo.jobCompleted(job); + + if (userInfo.isInactive()) { + users.remove(userInfo); + } + } + } + + // Update queue stats + activeTasks -= job.desiredTasks(); + } + + return job; + } + + synchronized void addWaitingJob(JobInProgress job) throws IOException { + JobSchedulingInfo jobSchedInfo = new JobSchedulingInfo(job); + if (waitingJobs.containsKey(jobSchedInfo)) { + LOG.info("job " + job.getJobID() + " already waiting in queue '" + + queueName + "'!"); + return; + } + + String user = job.getProfile().getUser(); + + // Check acceptance limits + checkJobSubmissionLimits(job, user); + + waitingJobs.put(new JobSchedulingInfo(job), job); + + // Update user stats + UserInfo userInfo = users.get(user); + if (userInfo == null) { + userInfo = new UserInfo(); + users.put(user, userInfo); + } + userInfo.jobAdded(job); + } + + synchronized JobInProgress removeWaitingJob(JobSchedulingInfo schedInfo) { + return waitingJobs.remove(schedInfo); + } + + synchronized int getNumActiveUsers() { + return users.size(); + } + + synchronized int getNumWaitingJobs() { + return waitingJobs.size(); + } + + Comparator getComparator() { + return comparator; + } + + /** + * Functions to deal with queue-limits. + */ + + /** + * Check if the queue can be assigned numSlots + * of the given taskType so that the queue doesn't exceed its + * configured maximum-capacity. + * + * @param taskType + * @param numSlots + * @return true if slots can be assigned + */ + boolean assignSlotsToQueue(TaskType taskType, int numSlots) { + // Check if the queue is running over it's maximum-capacity + if (getMaxCapacity(taskType) > 0) { // Check if max capacity is enabled + if ((getNumSlotsOccupied(taskType) + numSlots) > + getMaxCapacity(taskType)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Queue " + queueName + " " + "has reached its max " + + taskType + " capacity"); + LOG.debug("Current running tasks " + getCapacity(taskType)); + } + return false; + } + } + + return true; + } + /** + * Check if the given job and user and + * queue can be assigned the requested number of slots of + * the given taskType for the . + * + * This checks to ensure that queue and user are under appropriate limits. + * + * @param taskType + * @param job + * @param user + * @return true if the given job/user/queue can be assigned + * the requested number of slots, false otherwise + */ + boolean assignSlotsToJob(TaskType taskType, JobInProgress job, String user) { + // Check to ensure we will not go over the queue's max-capacity + if (!assignSlotsToQueue(taskType, job.getNumSlotsPerTask(taskType))) { + return false; + } + + // what is our current capacity? It is equal to the queue-capacity if + // we're running below capacity. If we're running over capacity, then its + // #running plus slotPerTask of the job (which is the number of extra + // slots we're getting). + int currentCapacity; + int queueCapacity = getCapacity(taskType); + if (getNumSlotsOccupied(taskType) < queueCapacity) { + currentCapacity = queueCapacity; + } + else { + currentCapacity = + getNumSlotsOccupied(taskType) + job.getNumSlotsPerTask(taskType); + } + + // Never allow a single user to take more than the + // queue's configured capacity * user-limit-factor. + // Also, the queue's configured capacity should be higher than + // queue-hard-limit * ulMin + int limit = + Math.min( + Math.max(divideAndCeil(currentCapacity, numJobsByUser.size()), + divideAndCeil(ulMin*currentCapacity, 100)), + (int)(queueCapacity * ulMinFactor) + ); + if (getNumSlotsOccupiedByUser(user, taskType) >= limit) { + if (LOG.isDebugEnabled()) { + LOG.debug("User " + user + " is over limit, num slots occupied=" + + getNumSlotsOccupiedByUser(user, taskType) + + ", limit=" + limit); + } + return false; + } + + return true; + } + + /** + * Ceil of result of dividing two integers. + * + * This is *not* a utility method. + * Neither a or b should be negative. + * + * @param a + * @param b + * @return ceil of the result of a/b + */ + private static int divideAndCeil(int a, int b) { + if (b == 0) { + LOG.info("divideAndCeil called with a=" + a + " b=" + b); + return 0; + } + return (a + (b - 1)) / b; + } + + /** + * Check if the given job can be accepted to the + * queue on behalf of the user. + * @param job + * @param user + * @return true if the job can be accepted, + * false otherwise + */ + synchronized void checkJobSubmissionLimits(JobInProgress job, String user) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("checkJobSubmissionLimits - " + + "qWaitJobs=" + getNumWaitingJobs() + " " + + "qRunJobs=" + getNumRunningJobs() + " " + + "maxJobsToAccept=" + maxJobsToAccept + + "user=" + user + " " + + "uWaitJobs=" + getNumWaitingJobsByUser(user) + " " + + "uRunJobs=" + getNumRunningJobsByUser(user) + " " + + "maxJobsPerUserToAccept=" + maxJobsPerUserToAccept + " " + + ""); + } + + // Across all jobs in queue + if ((getNumWaitingJobs() + getNumRunningJobs()) >= maxJobsToAccept) { + throw new IOException( + "Job '" + job.getJobID() + "' from user '" + user + + "' rejected since queue '" + queueName + + "' already has " + getNumWaitingJobs() + " waiting jobs and " + + getNumRunningJobs() + " running jobs and exceeds limit of " + + maxJobsToAccept + " jobs to accept"); + } + + // Across all jobs of the user + if ((getNumWaitingJobsByUser(user) + getNumRunningJobsByUser(user)) >= + maxJobsPerUserToAccept) { + throw new IOException( + "Job '" + job.getJobID() + "' rejected since user '" + user + + "' already has " + getNumWaitingJobsByUser(user) + " waiting jobs" + + " and " + getNumRunningJobsByUser(user) + " running jobs," + + " it exceeds limit of " + maxJobsToAccept + " jobs to accept" + + " in queue '" + queueName + "'"); + } + } + + /** + * Check if the job can be initialized in the queue. + * + * @param job + * @param currentlyInitializedJobs + * @param currentlyInitializedTasks + * @return true if the job can be initialized, + * false otherwise + */ + synchronized boolean initializeJobForQueue(JobInProgress job, + int currentlyInitializedJobs, int currentlyInitializedTasks) { + + // Check if queue has sufficient number of jobs + int runningJobs = getNumRunningJobs(); + if ((runningJobs + currentlyInitializedJobs) >= maxJobsToInit) { + LOG.info(getQueueName() + " already has " + runningJobs + + " running jobs and " + currentlyInitializedJobs + " jobs about to be" + + " initialized, cannot initialize " + job.getJobID() + + " since it will exceeed limit of " + + maxJobsToInit + " initialized jobs for this queue"); + return false; + } + + // Check if queue has too many active tasks + if ((activeTasks + currentlyInitializedTasks + job.desiredTasks()) >= + maxActiveTasks) { + LOG.info("Queue '" + getQueueName() + "' has " + activeTasks + + " active tasks and " + currentlyInitializedTasks + " tasks about to" + + " be initialized, cannot initialize job '" + job.getJobID() + + "' for user '" + job.getProfile().getUser() + "' with " + + job.desiredTasks() + " tasks since it will exceed limit of " + + maxActiveTasks + " active tasks for this queue"); + return false; + } + + return true; + } + + /** + * Check if the job can be initialized in the queue + * on behalf of the user. + * + * @param job + * @param user + * @param currentlyInitializedJobs + * @param currentlyInitializedTasks + * @return true if the job can be initialized, + * false otherwise + */ + synchronized boolean initializeJobForUser(JobInProgress job, + String user, int currentlyInitializedJobs, + int currentlyInitializedTasks) { + + // Check if the user has too many jobs + int userRunningJobs = getNumRunningJobsByUser(user); + if ((userRunningJobs + currentlyInitializedJobs) >= + maxJobsPerUserToInit) { + LOG.info(getQueueName() + " already has " + userRunningJobs + + " running jobs and " + currentlyInitializedJobs + " jobs about to be" + + " initialized, for user " + user + + "; cannot initialize " + job.getJobID() + + " since it will exceeed limit of " + + maxJobsPerUserToInit + + " initialized jobs per user for this queue"); + return false; + } + + // Check if the user has too many active tasks + int userActiveTasks = getNumActiveTasksByUser(user); + if ((userActiveTasks + currentlyInitializedTasks) >= + maxActiveTasksPerUser) { + LOG.info(getQueueName() + " has " + userActiveTasks + + " active tasks and " + currentlyInitializedTasks + " tasks about to" + + " be initialized for user " + user + + ", cannot initialize " + job.getJobID() + " with " + + job.desiredTasks() + " tasks since it will exceed limit of " + + maxActiveTasksPerUser + + " active tasks per user for this queue"); + return false; + } + + return true; + } + +} \ No newline at end of file Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerServlet.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerServlet.java?rev=1077573&r1=1077572&r2=1077573&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerServlet.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerServlet.java Fri Mar 4 04:30:34 2011 @@ -30,8 +30,7 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.apache.hadoop.mapred.CapacityTaskScheduler.QueueSchedulingInfo; -import org.apache.hadoop.mapred.CapacityTaskScheduler.TaskSchedulingInfo; +import org.apache.hadoop.mapreduce.TaskType; import org.apache.hadoop.mapred.JobHistory.JobInfo; import org.apache.hadoop.util.StringUtils; @@ -121,34 +120,33 @@ public class CapacitySchedulerServlet ex "Reduce Task Used Capacity" + "Running Reduces \n"); JobQueuesManager queuesManager = scheduler.getJobQueuesManager(); - for (QueueSchedulingInfo qsi : scheduler.getQueueInfoMap().values()) { - String queueName = qsi.getQueueName(); - TaskSchedulingInfo maptsi = scheduler.getMapScheduler().getTSI(qsi); - TaskSchedulingInfo redtsi = scheduler.getReduceScheduler().getTSI(qsi); + for (CapacitySchedulerQueue queue : scheduler.getQueueInfoMap().values()) { + String queueName = queue.getQueueName(); out.print("\n"); out.printf( "%s\n", queueName, queueName); - out.printf("%s\n", queuesManager.getNumRunningJobs(queueName)); - out.printf("%s\n", queuesManager.getNumWaitingJobs(queueName)); - out.printf("%.1f%%\n", qsi.getCapacityPercent()); - int mapCapacity = maptsi.getCapacity(); - int mapSlotsOccupied = maptsi.getNumSlotsOccupied(); - int reduceSlotsOccupied = redtsi.getNumSlotsOccupied(); + out.printf("%s\n", queue.getNumRunningJobs()); + out.printf("%s\n", queue.getNumWaitingJobs()); + out.printf("%.1f%%\n", queue.getCapacityPercent()); + int mapCapacity = queue.getCapacity(TaskType.MAP); + int mapSlotsOccupied = queue.getNumSlotsOccupied(TaskType.MAP); + int reduceSlotsOccupied = queue.getNumSlotsOccupied(TaskType.REDUCE); float occupiedSlotsAsPercent = mapCapacity != 0 ? ((float) mapSlotsOccupied * 100 / mapCapacity) : 0; out.printf("%s\n", mapCapacity); out.printf("%s (%.1f%% of Capacity)\n", mapSlotsOccupied, occupiedSlotsAsPercent); - out.printf("%s\n", maptsi.getNumRunningTasks()); - int reduceCapacity = redtsi.getCapacity(); - float redOccupiedSlotsAsPercent = reduceCapacity != 0 ? ((float) reduceSlotsOccupied * 100 / mapCapacity) - : 0; + out.printf("%s\n", queue.getNumRunningTasks(TaskType.MAP)); + int reduceCapacity = queue.getCapacity(TaskType.REDUCE); + float redOccupiedSlotsAsPercent = + (reduceCapacity != 0 ? ((float)reduceSlotsOccupied*100 / mapCapacity) + : 0); out.printf("%s\n", reduceCapacity); out.printf("%s (%.1f%% of Capacity)\n", reduceSlotsOccupied, redOccupiedSlotsAsPercent); - out.printf("%s\n", redtsi.getNumRunningTasks()); + out.printf("%s\n", queue.getNumRunningTasks(TaskType.REDUCE)); } out.print("\n"); }