Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 79379 invoked from network); 5 Jun 2009 05:59:33 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 5 Jun 2009 05:59:33 -0000 Received: (qmail 39236 invoked by uid 500); 5 Jun 2009 05:59:45 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 39152 invoked by uid 500); 5 Jun 2009 05:59:44 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 39143 invoked by uid 99); 5 Jun 2009 05:59:44 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Jun 2009 05:59:44 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Jun 2009 05:59:40 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7DCF82388849; Fri, 5 Jun 2009 05:59:20 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r781918 - in /hadoop/core/branches/branch-0.20: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/webapps/job/ Date: Fri, 05 Jun 2009 05:59:20 -0000 To: core-commits@hadoop.apache.org From: yhemanth@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090605055920.7DCF82388849@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: yhemanth Date: Fri Jun 5 05:59:19 2009 New Revision: 781918 URL: http://svn.apache.org/viewvc?rev=781918&view=rev Log: HADOOP-5884. Fixes accounting in capacity scheduler so that high RAM jobs take more slots. Contributed by Vinod Kumar Vavilapalli. Modified: hadoop/core/branches/branch-0.20/CHANGES.txt hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java hadoop/core/branches/branch-0.20/src/webapps/job/jobdetails.jsp Modified: hadoop/core/branches/branch-0.20/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=781918&r1=781917&r2=781918&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/CHANGES.txt (original) +++ hadoop/core/branches/branch-0.20/CHANGES.txt Fri Jun 5 05:59:19 2009 @@ -122,6 +122,9 @@ happens in MROutputThread after the last call to the map/reduce method, the exception goes undetected. (Amar Kamat via ddas) + HADOOP-5884. Fixes accounting in capacity scheduler so that high RAM jobs + take more slots. (Vinod Kumar Vavilapalli via yhemanth) + Release 0.20.0 - 2009-04-15 INCOMPATIBLE CHANGES Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=781918&r1=781917&r2=781918&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original) +++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Jun 5 05:59:19 2009 @@ -164,10 +164,10 @@ * Sets the capacity of the given queue. * * @param queue name of the queue - * @param gc percent of the cluster for the queue. + * @param capacity percent of the cluster for the queue. */ - public void setCapacity(String queue,float gc) { - rmConf.setFloat(toFullPropertyName(queue, "capacity"),gc); + public void setCapacity(String queue,float capacity) { + rmConf.setFloat(toFullPropertyName(queue, "capacity"),capacity); } /** Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=781918&r1=781917&r2=781918&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original) +++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Jun 5 05:59:19 2009 @@ -24,8 +24,6 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,8 +32,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapred.JobTracker.IllegalStateException; -import org.apache.hadoop.util.StringUtils; - /** * A {@link TaskScheduler} that implements the requirements in HADOOP-3421 @@ -78,14 +74,20 @@ private static class TaskSchedulingInfo { /** - * the actual gc, which depends on how many slots are available + * the actual capacity, which depends on how many slots are available * in the cluster at any given time. */ int capacity = 0; // number of running tasks int numRunningTasks = 0; - /** for each user, we need to keep track of number of running tasks */ - Map numRunningTasksByUser = + // number of slots occupied by running tasks + int numSlotsOccupied = 0; + + /** + * for each user, we need to keep track of number of slots occupied by + * running tasks + */ + Map numSlotsOccupiedByUser = new HashMap(); /** @@ -93,32 +95,41 @@ */ void resetTaskVars() { numRunningTasks = 0; - for (String s: numRunningTasksByUser.keySet()) { - numRunningTasksByUser.put(s, 0); + numSlotsOccupied = 0; + for (String s: numSlotsOccupiedByUser.keySet()) { + numSlotsOccupiedByUser.put(s, Integer.valueOf(0)); } } /** * return information about the tasks */ - public String toString(){ - float runningTasksAsPercent = capacity!= 0 ? - ((float)numRunningTasks * 100/capacity):0; + @Override + public String toString() { + float occupiedSlotsAsPercent = + capacity != 0 ? ((float) numSlotsOccupied * 100 / capacity) : 0; StringBuffer sb = new StringBuffer(); - sb.append("Capacity: " + capacity + "\n"); - sb.append(String.format("Running tasks: %.1f%% of Capacity\n", - runningTasksAsPercent)); + sb.append("Capacity: " + capacity + " slots\n"); + sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n", + Integer.valueOf(numSlotsOccupied), Float + .valueOf(occupiedSlotsAsPercent))); + sb.append(String.format("Running tasks: %d\n", Integer + .valueOf(numRunningTasks))); // include info on active users - if (numRunningTasks != 0) { + if (numSlotsOccupied != 0) { sb.append("Active users:\n"); - for (Map.Entry entry: numRunningTasksByUser.entrySet()) { + for (Map.Entry entry : numSlotsOccupiedByUser + .entrySet()) { if ((entry.getValue() == null) || (entry.getValue().intValue() <= 0)) { // user has no tasks running continue; } - sb.append("User '" + entry.getKey()+ "': "); - float p = (float)entry.getValue().intValue()*100/numRunningTasks; - sb.append(String.format("%.1f%% of running tasks\n", p)); + sb.append("User '" + entry.getKey() + "': "); + int numSlotsOccupiedByThisUser = entry.getValue().intValue(); + float p = + (float) numSlotsOccupiedByThisUser * 100 / numSlotsOccupied; + sb.append(String.format("%d (%.1f%% of used capacity)\n", Long + .valueOf(numSlotsOccupiedByThisUser), Float.valueOf(p))); } } return sb.toString(); @@ -152,10 +163,10 @@ TaskSchedulingInfo mapTSI; TaskSchedulingInfo reduceTSI; - public QueueSchedulingInfo(String queueName, float gcPercent, + public QueueSchedulingInfo(String queueName, float capacityPercent, int ulMin, JobQueuesManager jobQueuesManager) { this.queueName = new String(queueName); - this.capacityPercent = gcPercent; + this.capacityPercent = capacityPercent; this.ulMin = ulMin; this.jobQueuesManager = jobQueuesManager; this.mapTSI = new TaskSchedulingInfo(); @@ -164,13 +175,14 @@ /** * return information about the queue + * @return a String representing the information about the queue. */ + @Override public String toString(){ // We print out the queue information first, followed by info // on map and reduce tasks and job info StringBuffer sb = new StringBuffer(); sb.append("Queue configuration\n"); - //sb.append("Name: " + queueName + "\n"); sb.append("Capacity Percentage: "); sb.append(capacityPercent); sb.append("%\n"); @@ -281,7 +293,15 @@ protected CapacityTaskScheduler.TYPE type = null; abstract Task obtainNewTask(TaskTrackerStatus taskTracker, - JobInProgress job) throws IOException; + JobInProgress job) throws IOException; + + int getSlotsOccupied(JobInProgress job) { + return getRunningTasks(job) * getSlotsPerTask(job); + } + + abstract int getClusterCapacity(); + abstract int getSlotsPerTask(JobInProgress job); + abstract int getRunningTasks(JobInProgress job); abstract int getPendingTasks(JobInProgress job); abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi); /** @@ -313,12 +333,12 @@ public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) { TaskSchedulingInfo t1 = getTSI(q1); TaskSchedulingInfo t2 = getTSI(q2); - // look at how much capacity they've filled. Treat a queue with gc=0 - // equivalent to a queue running at capacity + // look at how much capacity they've filled. Treat a queue with + // capacity=0 equivalent to a queue running at capacity double r1 = (0 == t1.capacity)? 1.0f: - (double)t1.numRunningTasks/(double)t1.capacity; + (double)t1.numSlotsOccupied/(double)t1.capacity; double r2 = (0 == t2.capacity)? 1.0f: - (double)t2.numRunningTasks/(double)t2.capacity; + (double)t2.numSlotsOccupied/(double)t2.capacity; if (r1r2) return 1; else return 0; @@ -340,7 +360,17 @@ protected final static ReduceQueueComparator reduceComparator = new ReduceQueueComparator(); // and this is the comparator to use protected QueueComparator queueComparator; - + + // Returns queues sorted according to the QueueComparator. + // Mainly for testing purposes. + String[] getOrderedQueues() { + List queues = new ArrayList(qsiForAssigningTasks.size()); + for (QueueSchedulingInfo qsi : qsiForAssigningTasks) { + queues.add(qsi.queueName); + } + return queues.toArray(new String[queues.size()]); + } + TaskSchedulingMgr(CapacityTaskScheduler sched) { scheduler = sched; } @@ -357,24 +387,26 @@ } - private boolean isUserOverLimit(String user, QueueSchedulingInfo qsi) { - // what is our current capacity? It's capacity if we're running below capacity. - // If we're running over capacity, then its #running plus 1 (which is the - // extra slot we're getting). + private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) { + // what is our current capacity? It is equal to the queue-capacity if + // we're running below capacity. If we're running over capacity, then its + // #running plus slotPerTask of the job (which is the number of extra + // slots we're getting). int currentCapacity; TaskSchedulingInfo tsi = getTSI(qsi); - if (tsi.numRunningTasks < tsi.capacity) { + if (tsi.numSlotsOccupied < tsi.capacity) { currentCapacity = tsi.capacity; } else { - currentCapacity = tsi.numRunningTasks+1; + currentCapacity = tsi.numSlotsOccupied + getSlotsPerTask(j); } int limit = Math.max((int)(Math.ceil((double)currentCapacity/ (double)qsi.numJobsByUser.size())), (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0))); - if (tsi.numRunningTasksByUser.get(user) >= limit) { - LOG.debug("User " + user + " is over limit, num running tasks = " + - tsi.numRunningTasksByUser.get(user) + ", limit = " + limit); + String user = j.getProfile().getUser(); + if (tsi.numSlotsOccupiedByUser.get(user) >= limit) { + LOG.debug("User " + user + " is over limit, num slots occupied = " + + tsi.numSlotsOccupiedByUser.get(user) + ", limit = " + limit); return true; } else { @@ -403,7 +435,7 @@ continue; } // check if the job's user is over limit - if (isUserOverLimit(j.getProfile().getUser(), qsi)) { + if (isUserOverLimit(j, qsi)) { continue; } //If this job meets memory requirements. Ask the JobInProgress for @@ -489,8 +521,11 @@ // The caller is responsible for ensuring that the QSI objects and the // collections are up-to-date. private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws IOException { + + printQSIs(); + for (QueueSchedulingInfo qsi : qsiForAssigningTasks) { - // we may have queues with gc=0. We shouldn't look at jobs from + // we may have queues with capacity=0. We shouldn't look at jobs from // these queues if (0 == getTSI(qsi).capacity) { continue; @@ -516,20 +551,23 @@ // nothing to give return TaskLookupResult.getNoTaskFoundResult(); } - + // for debugging. private void printQSIs() { - StringBuffer s = new StringBuffer(); - for (QueueSchedulingInfo qsi: qsiForAssigningTasks) { - TaskSchedulingInfo tsi = getTSI(qsi); - Collection runJobs = - scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName); - s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" + - tsi.numRunningTasks + ", gc=" + tsi.capacity - + ", run jobs="+ runJobs.size() + - "*** "); + if (LOG.isDebugEnabled()) { + StringBuffer s = new StringBuffer(); + for (QueueSchedulingInfo qsi : qsiForAssigningTasks) { + TaskSchedulingInfo tsi = getTSI(qsi); + Collection runJobs = + scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName); + s.append(String.format(" Queue '%s'(%s): runningTasks=%d, " + + "occupiedSlots=%d, capacity=%d, runJobs=%d", qsi.queueName, + this.type, Integer.valueOf(tsi.numRunningTasks), Integer + .valueOf(tsi.numSlotsOccupied), Integer + .valueOf(tsi.capacity), Integer.valueOf(runJobs.size()))); + } + LOG.debug(s); } - LOG.debug(s); } /** @@ -559,11 +597,14 @@ * The scheduling algorithms for map tasks. */ private static class MapSchedulingMgr extends TaskSchedulingMgr { - MapSchedulingMgr(CapacityTaskScheduler dad) { - super(dad); + + MapSchedulingMgr(CapacityTaskScheduler schedulr) { + super(schedulr); type = CapacityTaskScheduler.TYPE.MAP; queueComparator = mapComparator; } + + @Override Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) throws IOException { ClusterStatus clusterStatus = @@ -572,16 +613,30 @@ return job.obtainNewMapTask(taskTracker, numTaskTrackers, scheduler.taskTrackerManager.getNumberOfUniqueHosts()); } + + @Override int getClusterCapacity() { return scheduler.taskTrackerManager.getClusterStatus().getMaxMapTasks(); } + + @Override int getRunningTasks(JobInProgress job) { return job.runningMaps(); } + + @Override int getPendingTasks(JobInProgress job) { return job.pendingMaps(); } + @Override + int getSlotsPerTask(JobInProgress job) { + long myVmem = job.getJobConf().getMemoryForMapTask(); + return (int) (Math.ceil((float) myVmem + / (float) scheduler.getMemSizeForMapSlot())); + } + + @Override TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) { return qsi.mapTSI; } @@ -601,11 +656,14 @@ * The scheduling algorithms for reduce tasks. */ private static class ReduceSchedulingMgr extends TaskSchedulingMgr { - ReduceSchedulingMgr(CapacityTaskScheduler dad) { - super(dad); + + ReduceSchedulingMgr(CapacityTaskScheduler schedulr) { + super(schedulr); type = CapacityTaskScheduler.TYPE.REDUCE; queueComparator = reduceComparator; } + + @Override Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) throws IOException { ClusterStatus clusterStatus = @@ -614,16 +672,31 @@ return job.obtainNewReduceTask(taskTracker, numTaskTrackers, scheduler.taskTrackerManager.getNumberOfUniqueHosts()); } + + @Override int getClusterCapacity() { - return scheduler.taskTrackerManager.getClusterStatus().getMaxReduceTasks(); + return scheduler.taskTrackerManager.getClusterStatus() + .getMaxReduceTasks(); } + + @Override int getRunningTasks(JobInProgress job) { return job.runningReduces(); } + + @Override int getPendingTasks(JobInProgress job) { return job.pendingReduces(); } + @Override + int getSlotsPerTask(JobInProgress job) { + long myVmem = job.getJobConf().getMemoryForReduceTask(); + return (int) (Math.ceil((float) myVmem + / (float) scheduler.getMemSizeForReduceSlot())); + } + + @Override TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) { return qsi.reduceTSI; } @@ -655,7 +728,10 @@ protected CapacitySchedulerConf schedConf; /** whether scheduler has started or not */ private boolean started = false; - + + static String JOB_SCHEDULING_INFO_FORMAT_STRING = + "%s running map tasks using %d map slots," + + " %s running reduce tasks using %d reduce slots."; /** * A clock class - can be mocked out for testing. */ @@ -664,6 +740,7 @@ return System.currentTimeMillis(); } } + // can be replaced with a global type, if we have one protected static enum TYPE { MAP, REDUCE @@ -709,12 +786,13 @@ JobConf.normalizeMemoryConfigValue(conf.getLong( JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, JobConf.DISABLED_MEMORY_LIMIT)); - LOG.info(new StringBuilder().append("Scheduler configured with ").append( - "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT,").append( - " limitMaxMemForMapTasks, limitMaxMemForReduceTasks)").append( - memSizeForMapSlotOnJT).append(", ").append(memSizeForReduceSlotOnJT) - .append(", ").append(limitMaxMemForMapTasks).append(", ").append( - limitMaxMemForReduceTasks).append(")")); + LOG.info(String.format("Scheduler configured with " + + "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT, " + + "limitMaxMemForMapTasks, limitMaxMemForReduceTasks)" + + " (%d,%d,%d,%d)", Long.valueOf(memSizeForMapSlotOnJT), Long + .valueOf(memSizeForReduceSlotOnJT), Long + .valueOf(limitMaxMemForMapTasks), Long + .valueOf(limitMaxMemForReduceTasks))); } long getMemSizeForMapSlot() { @@ -733,6 +811,15 @@ return limitMaxMemForReduceTasks; } + String[] getOrderedQueues(CapacityTaskScheduler.TYPE type) { + if (type.equals(CapacityTaskScheduler.TYPE.MAP)) { + return mapScheduler.getOrderedQueues(); + } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) { + return reduceScheduler.getOrderedQueues(); + } + return null; + } + @Override public synchronized void start() throws IOException { if (started) return; @@ -755,15 +842,15 @@ Set queuesWithoutConfiguredCapacity = new HashSet(); float totalCapacity = 0.0f; for (String queueName: queues) { - float gc = schedConf.getCapacity(queueName); - if(gc == -1.0) { + float capacity = schedConf.getCapacity(queueName); + if(capacity == -1.0) { queuesWithoutConfiguredCapacity.add(queueName); }else { - totalCapacity += gc; + totalCapacity += capacity; } int ulMin = schedConf.getMinimumUserLimitPercent(queueName); // create our QSI and add to our hashmap - QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, gc, + QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, capacity, ulMin, jobQueuesManager); queueInfoMap.put(queueName, qsi); @@ -877,29 +964,49 @@ if (j.getStatus().getRunState() != JobStatus.RUNNING) { continue; } - int runningMaps = j.runningMaps(); - int runningReduces = j.runningReduces(); - qsi.mapTSI.numRunningTasks += runningMaps; - qsi.reduceTSI.numRunningTasks += runningReduces; + + int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j); + int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j); + int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j); + int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j); + j.setSchedulingInfo(String.format(JOB_SCHEDULING_INFO_FORMAT_STRING, + Integer.valueOf(numMapsRunningForThisJob), Integer + .valueOf(numMapSlotsForThisJob), Integer + .valueOf(numReducesRunningForThisJob), Integer + .valueOf(numReduceSlotsForThisJob))); + qsi.mapTSI.numRunningTasks += numMapsRunningForThisJob; + qsi.reduceTSI.numRunningTasks += numReducesRunningForThisJob; + qsi.mapTSI.numSlotsOccupied += numMapSlotsForThisJob; + qsi.reduceTSI.numSlotsOccupied += numReduceSlotsForThisJob; Integer i = - qsi.mapTSI.numRunningTasksByUser.get(j.getProfile().getUser()); - qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(), - i+runningMaps); - i = qsi.reduceTSI.numRunningTasksByUser.get(j.getProfile().getUser()); - qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(), - i+runningReduces); - LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " + - j.runningMaps() + ", run(r) = " + j.runningReduces() + - ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + - j.finishedReduces() + ", failed(m) = " + j.failedMapTasks + - ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " + - j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks - + ", total(m) = " + j.numMapTasks + ", total(r) = " + - j.numReduceTasks); + qsi.mapTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser()); + qsi.mapTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(), + Integer.valueOf(i.intValue() + numMapSlotsForThisJob)); + i = qsi.reduceTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser()); + qsi.reduceTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(), + Integer.valueOf(i.intValue() + numReduceSlotsForThisJob)); + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("updateQSI: job %s: run(m)=%d, " + + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d," + + " finished(r)=%d, failed(m)=%d, failed(r)=%d, " + + "spec(m)=%d, spec(r)=%d, total(m)=%d, total(r)=%d", j + .getJobID().toString(), Integer + .valueOf(numMapsRunningForThisJob), Integer + .valueOf(numMapSlotsForThisJob), Integer + .valueOf(numReducesRunningForThisJob), Integer + .valueOf(numReduceSlotsForThisJob), Integer.valueOf(j + .finishedMaps()), Integer.valueOf(j.finishedReduces()), Integer + .valueOf(j.failedMapTasks), + Integer.valueOf(j.failedReduceTasks), Integer + .valueOf(j.speculativeMapTasks), Integer + .valueOf(j.speculativeReduceTasks), Integer + .valueOf(j.numMapTasks), Integer.valueOf(j.numReduceTasks))); + } + /* * it's fine walking down the entire list of running jobs - there * probably will not be many, plus, we may need to go through the - * list to compute numRunningTasksByUser. If this is expensive, we + * list to compute numSlotsOccupiedByUser. If this is expensive, we * can keep a list of running jobs per user. Then we only need to * consider the first few jobs per user. */ @@ -914,10 +1021,8 @@ * The grand plan for assigning a task. * First, decide whether a Map or Reduce task should be given to a TT * (if the TT can accept either). - * Next, pick a queue. We only look at queues that need a slot. Among - * these, we first look at queues whose ac is less than gc (queues that - * gave up capacity in the past). Next, we look at any other queue that - * needs a slot. + * Next, pick a queue. We only look at queues that need a slot. Among these, + * we first look at queues whose (# of running tasks)/capacity is the least. * Next, pick a job in a queue. we pick the job at the front of the queue * unless its user is over the user limit. * Finally, given a job, pick a task from the job. @@ -1019,8 +1124,10 @@ if (null == i) { i = 1; // set the count for running tasks to 0 - qsi.mapTSI.numRunningTasksByUser.put(job.getProfile().getUser(), 0); - qsi.reduceTSI.numRunningTasksByUser.put(job.getProfile().getUser(), 0); + qsi.mapTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(), + Integer.valueOf(0)); + qsi.reduceTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(), + Integer.valueOf(0)); } else { i++; @@ -1042,8 +1149,8 @@ if (0 == i.intValue()) { qsi.numJobsByUser.remove(job.getProfile().getUser()); // remove job footprint from our TSIs - qsi.mapTSI.numRunningTasksByUser.remove(job.getProfile().getUser()); - qsi.reduceTSI.numRunningTasksByUser.remove(job.getProfile().getUser()); + qsi.mapTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser()); + qsi.reduceTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser()); LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size()); } else { Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=781918&r1=781917&r2=781918&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original) +++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Jun 5 05:59:19 2009 @@ -39,8 +39,6 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.conf.Configuration; - - public class TestCapacityScheduler extends TestCase { static final Log LOG = @@ -605,13 +603,13 @@ // represents a fake queue configuration info static class FakeQueueInfo { String queueName; - float gc; + float capacity; boolean supportsPrio; int ulMin; - public FakeQueueInfo(String queueName, float gc, boolean supportsPrio, int ulMin) { + public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin) { this.queueName = queueName; - this.gc = gc; + this.capacity = capacity; this.supportsPrio = supportsPrio; this.ulMin = ulMin; } @@ -641,10 +639,10 @@ }*/ public float getCapacity(String queue) { - if(queueMap.get(queue).gc == -1) { + if(queueMap.get(queue).capacity == -1) { return super.getCapacity(queue); } - return queueMap.get(queue).gc; + return queueMap.get(queue).capacity; } public int getMinimumUserLimitPercent(String queue) { @@ -899,13 +897,6 @@ return queue.toArray(new JobInProgress[0]); } - /*protected void submitJobs(int number, int state, int maps, int reduces) - throws IOException { - for (int i = 0; i < number; i++) { - submitJob(state, maps, reduces); - } - }*/ - // tests if tasks can be assinged when there are multiple jobs from a same // user public void testJobFinished() throws Exception { @@ -1042,7 +1033,7 @@ String[] qs = {"default", "q2"}; taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); - // set the gc % as 10%, so that gc will be zero initially as + // set the capacity % as 10%, so that capacity will be zero initially as // the cluster capacity increase slowly. queues.add(new FakeQueueInfo("default", 10.0f, true, 25)); queues.add(new FakeQueueInfo("q2", 90.0f, true, 25)); @@ -1076,7 +1067,7 @@ // add another tt to increase tt slots taskTrackerManager.addTaskTracker("tt5"); // now job from default should run, as it is furthest away - // in terms of runningMaps / gc. + // in terms of runningMaps / capacity. checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4"); verifyCapacity("1", "default"); verifyCapacity("9", "q2"); @@ -1087,7 +1078,7 @@ String schedInfo = taskTrackerManager.getQueueManager(). getSchedulerInfo(queue).toString(); assertTrue(schedInfo.contains("Map tasks\nCapacity: " - + expectedCapacity)); + + expectedCapacity + " slots")); } // test capacity transfer @@ -1268,7 +1259,82 @@ // first in the queue checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4"); } - + + /** + * Test to verify that high memory jobs hit user limits faster than any normal + * job. + * + * @throws IOException + */ + public void testUserLimitsForHighMemoryJobs() + throws IOException { + taskTrackerManager = new FakeTaskTrackerManager(1, 10, 10); + scheduler.setTaskTrackerManager(taskTrackerManager); + String[] qs = { "default" }; + taskTrackerManager.addQueues(qs); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 100.0f, true, 50)); + resConf.setFakeQueues(queues); + // enabled memory-based scheduling + // Normal job in the cluster would be 1GB maps/reduces + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + // Submit one normal job to the other queue. + JobConf jConf = new JobConf(conf); + jConf.setMemoryForMapTask(1 * 1024); + jConf.setMemoryForReduceTask(1 * 1024); + jConf.setNumMapTasks(6); + jConf.setNumReduceTasks(6); + jConf.setUser("u1"); + jConf.setQueueName("default"); + FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf); + + LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of " + + "6 map and 6 reduce tasks"); + jConf = new JobConf(conf); + jConf.setMemoryForMapTask(2 * 1024); + jConf.setMemoryForReduceTask(2 * 1024); + jConf.setNumMapTasks(6); + jConf.setNumReduceTasks(6); + jConf.setQueueName("default"); + jConf.setUser("u2"); + FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf); + + // Verify that normal job takes 3 task assignments to hit user limits + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000005_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_r_000005_0 on tt1"); + // u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits + // are hit. So u2 should get slots + + checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); + checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1"); + checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1"); + checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1"); + + // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce + // slots. Because of high memory tasks, giving u2 another task would + // overflow limits. So, no more tasks should be given to anyone. + assertNull(scheduler.assignTasks(tracker("tt1"))); + assertNull(scheduler.assignTasks(tracker("tt1"))); + } + /* * Following is the testing strategy for testing scheduling information. * - start capacity scheduler with two queues. @@ -1318,21 +1384,35 @@ scheduler.assignTasks(tracker("tt1")); // heartbeat scheduler.assignTasks(tracker("tt2")); // heartbeat int totalMaps = taskTrackerManager.getClusterStatus().getMaxMapTasks(); - int totalReduces = taskTrackerManager.getClusterStatus().getMaxReduceTasks(); + int totalReduces = + taskTrackerManager.getClusterStatus().getMaxReduceTasks(); QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager(); - String schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); - String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo(); + String schedulingInfo = + queueManager.getJobQueueInfo("default").getSchedulingInfo(); + String schedulingInfo2 = + queueManager.getJobQueueInfo("q2").getSchedulingInfo(); String[] infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 16); - assertEquals(infoStrings[1] , "Capacity Percentage: 50.0%"); - assertEquals(infoStrings[6] , "Capacity: " + totalMaps * 50/100); - assertEquals(infoStrings[10] , "Capacity: " + totalReduces * 50/100); - assertEquals(infoStrings[2] , "User Limit: 25%"); - assertEquals(infoStrings[3] , "Priority Supported: YES"); - assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[14] , "Number of Waiting Jobs: 0"); - assertEquals(infoStrings[15] , "Number of users who have submitted jobs: 0"); + assertEquals(infoStrings.length, 18); + assertEquals(infoStrings[0], "Queue configuration"); + assertEquals(infoStrings[1], "Capacity Percentage: 50.0%"); + assertEquals(infoStrings[2], "User Limit: 25%"); + assertEquals(infoStrings[3], "Priority Supported: YES"); + assertEquals(infoStrings[4], "-------------"); + assertEquals(infoStrings[5], "Map tasks"); + assertEquals(infoStrings[6], "Capacity: " + totalMaps * 50 / 100 + + " slots"); + assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[8], "Running tasks: 0"); + assertEquals(infoStrings[9], "-------------"); + assertEquals(infoStrings[10], "Reduce tasks"); + assertEquals(infoStrings[11], "Capacity: " + totalReduces * 50 / 100 + + " slots"); + assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[13], "Running tasks: 0"); + assertEquals(infoStrings[14], "-------------"); + assertEquals(infoStrings[15], "Job info"); + assertEquals(infoStrings[16], "Number of Waiting Jobs: 0"); + assertEquals(infoStrings[17], "Number of users who have submitted jobs: 0"); assertEquals(schedulingInfo, schedulingInfo2); //Testing with actual job submission. @@ -1343,10 +1423,13 @@ infoStrings = schedulingInfo.split("\n"); //waiting job should be equal to number of jobs submitted. - assertEquals(infoStrings.length, 16); - assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5"); + assertEquals(infoStrings.length, 18); + assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[8], "Running tasks: 0"); + assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[13], "Running tasks: 0"); + assertEquals(infoStrings[16], "Number of Waiting Jobs: 5"); + assertEquals(infoStrings[17], "Number of users who have submitted jobs: 1"); //Initalize the jobs but don't raise events controlledInitializationPoller.selectJobsToInitialize(); @@ -1354,12 +1437,14 @@ schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 16); + assertEquals(infoStrings.length, 18); //should be previous value as nothing is scheduled because no events //has been raised after initialization. - assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5"); + assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[8], "Running tasks: 0"); + assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[13], "Running tasks: 0"); + assertEquals(infoStrings[16], "Number of Waiting Jobs: 5"); //Raise status change event so that jobs can move to running queue. raiseStatusChangeEvents(scheduler.jobQueuesManager); @@ -1376,10 +1461,14 @@ schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 18); - assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity"); - assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[16] , "Number of Waiting Jobs: 4"); + assertEquals(infoStrings.length, 20); + assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)"); + assertEquals(infoStrings[8], "Running tasks: 1"); + assertEquals(infoStrings[9], "Active users:"); + assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)"); + assertEquals(infoStrings[14], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[15], "Running tasks: 0"); + assertEquals(infoStrings[18], "Number of Waiting Jobs: 4"); //assign a reduce task Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); @@ -1388,10 +1477,16 @@ schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 20); - assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity"); - assertEquals(infoStrings[13],"Running tasks: 100.0% of Capacity"); - assertEquals(infoStrings[18] , "Number of Waiting Jobs: 4"); + assertEquals(infoStrings.length, 22); + assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)"); + assertEquals(infoStrings[8], "Running tasks: 1"); + assertEquals(infoStrings[9], "Active users:"); + assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)"); + assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)"); + assertEquals(infoStrings[15], "Running tasks: 1"); + assertEquals(infoStrings[16], "Active users:"); + assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)"); + assertEquals(infoStrings[20], "Number of Waiting Jobs: 4"); //Complete the job and check the running tasks count FakeJobInProgress u1j1 = userJobs.get(0); @@ -1404,10 +1499,12 @@ schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 16); - assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[14] , "Number of Waiting Jobs: 4"); + assertEquals(infoStrings.length, 18); + assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[8], "Running tasks: 0"); + assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[13], "Running tasks: 0"); + assertEquals(infoStrings[16], "Number of Waiting Jobs: 4"); //Fail a job which is initialized but not scheduled and check the count. FakeJobInProgress u1j2 = userJobs.get(1); @@ -1421,10 +1518,14 @@ schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 16); - assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[14] , "Number of Waiting Jobs: 3"); + assertEquals(infoStrings.length, 18); + //should be previous value as nothing is scheduled because no events + //has been raised after initialization. + assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[8], "Running tasks: 0"); + assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[13], "Running tasks: 0"); + assertEquals(infoStrings[16], "Number of Waiting Jobs: 3"); //Fail a job which is not initialized but is in the waiting queue. FakeJobInProgress u1j5 = userJobs.get(4); @@ -1439,10 +1540,14 @@ schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 16); - assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[14] , "Number of Waiting Jobs: 2"); + assertEquals(infoStrings.length, 18); + //should be previous value as nothing is scheduled because no events + //has been raised after initialization. + assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[8], "Running tasks: 0"); + assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[13], "Running tasks: 0"); + assertEquals(infoStrings[16], "Number of Waiting Jobs: 2"); //Raise status change events as none of the intialized jobs would be //in running queue as we just failed the second job which was initialized @@ -1465,10 +1570,12 @@ schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 18); - assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity"); - assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[16] , "Number of Waiting Jobs: 1"); + assertEquals(infoStrings.length, 20); + assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)"); + assertEquals(infoStrings[8], "Running tasks: 1"); + assertEquals(infoStrings[9], "Active users:"); + assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)"); + assertEquals(infoStrings[18], "Number of Waiting Jobs: 1"); //Fail the executing job taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED); @@ -1478,11 +1585,10 @@ schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 16); - assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); - assertEquals(infoStrings[14] , "Number of Waiting Jobs: 1"); - + assertEquals(infoStrings.length, 18); + assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)"); + assertEquals(infoStrings[8], "Running tasks: 0"); + assertEquals(infoStrings[16], "Number of Waiting Jobs: 1"); } /** @@ -1556,8 +1662,10 @@ scheduler.start(); // The situation : Two jobs in the queue. First job with only maps and no - // reduces and is a high memory job. Second job is a normal job with both maps and reduces. - // First job cannot run for want of memory for maps. In this case, second job's reduces should run. + // reduces and is a high memory job. Second job is a normal job with both + // maps and reduces. + // First job cannot run for want of memory for maps. In this case, second + // job's reduces should run. LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of " + "2 map tasks"); @@ -1583,11 +1691,16 @@ // first, a map from j1 will run checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + // Total 2 map slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 100.0f); checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L); // at this point, the scheduler tries to schedule another map from j1. // there isn't enough space. The second job's reduce should be scheduled. checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1"); + // Total 1 reduce slot should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1, + 100.0f); checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L); } @@ -1622,7 +1735,7 @@ scheduler.start(); LOG.debug("Submit one normal memory(1GB maps/reduces) job of " - + "1 map, 0 reduce tasks."); + + "1 map, 1 reduce tasks."); JobConf jConf = new JobConf(conf); jConf.setMemoryForMapTask(1 * 1024); jConf.setMemoryForReduceTask(1 * 1024); @@ -1634,8 +1747,19 @@ // Fill the second tt with this job. checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); + // Total 1 map slot should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 25.0f); + assertEquals(String.format( + CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 0, 0), + (String) job1.getSchedulingInfo()); checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); + // Total 1 map slot should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1, + 25.0f); + assertEquals(String.format( + CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 1, 1), + (String) job1.getSchedulingInfo()); checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L); LOG.debug("Submit one high memory(2GB maps/reduces) job of " @@ -1650,8 +1774,20 @@ FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf); checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); + // Total 3 map slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f); + assertEquals(String.format( + CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 0, 0), + (String) job2.getSchedulingInfo()); checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L); + checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1"); + // Total 3 reduce slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3, + 75.0f); + assertEquals(String.format( + CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2), + (String) job2.getSchedulingInfo()); checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L); LOG.debug("Submit one normal memory(1GB maps/reduces) job of " @@ -1670,8 +1806,17 @@ assertNull(scheduler.assignTasks(tracker("tt2"))); assertNull(scheduler.assignTasks(tracker("tt1"))); assertNull(scheduler.assignTasks(tracker("tt2"))); + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f); + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3, + 75.0f); checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L); checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L); + assertEquals(String.format( + CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2), + (String) job2.getSchedulingInfo()); + assertEquals(String.format( + CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 0, 0), + (String) job3.getSchedulingInfo()); } /** @@ -1720,10 +1865,23 @@ // 1st cycle - 1 map gets assigned. Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + // Total 1 map slot should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f); checkMemReservedForTasksOnTT("tt1", 512L, 0L); + + // 1st cycle of reduces - 1 reduce gets assigned. + Task t1 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); + // Total 1 reduce slot should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1, + 50.0f); + checkMemReservedForTasksOnTT("tt1", 512L, 512L); // kill this job ! taskTrackerManager.killJob(job1.getJobID()); + // No more map/reduce slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 0, 0, 0.0f); + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 0, 0, + 0.0f); // retire the job taskTrackerManager.removeJob(job1.getJobID()); @@ -1748,57 +1906,23 @@ assertNull(scheduler.assignTasks(tracker("tt1"))); checkMemReservedForTasksOnTT("tt1", null, null); - // finish the task on the tracker. + // finish the tasks on the tracker. taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1); + taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), job1); + // now a new task can be assigned. t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); + // Total 1 map slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f); checkMemReservedForTasksOnTT("tt1", 512L, 0L); // reduce can be assigned. t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1"); + // Total 1 reduce slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1, + 50.0f); checkMemReservedForTasksOnTT("tt1", 512L, 512L); } - - protected TaskTrackerStatus tracker(String taskTrackerName) { - return taskTrackerManager.getTaskTracker(taskTrackerName); - } - - protected Task checkAssignment(String taskTrackerName, - String expectedTaskString) throws IOException { - List tasks = scheduler.assignTasks(tracker(taskTrackerName)); - assertNotNull(expectedTaskString, tasks); - assertEquals(expectedTaskString, 1, tasks.size()); - assertEquals(expectedTaskString, tasks.get(0).toString()); - return tasks.get(0); - } - - /** - * Get the amount of memory that is reserved for tasks on the taskTracker and - * verify that it matches what is expected. - * - * @param taskTracker - * @param expectedMemForMapsOnTT - * @param expectedMemForReducesOnTT - */ - private void checkMemReservedForTasksOnTT(String taskTracker, - Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) { - Long observedMemForMapsOnTT = - scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker), - CapacityTaskScheduler.TYPE.MAP); - Long observedMemForReducesOnTT = - scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker), - CapacityTaskScheduler.TYPE.REDUCE); - if (expectedMemForMapsOnTT == null) { - assertTrue(observedMemForMapsOnTT == null); - } else { - assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT)); - } - if (expectedMemForReducesOnTT == null) { - assertTrue(observedMemForReducesOnTT == null); - } else { - assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT)); - } - } /* * Test cases for Job Initialization poller. @@ -2006,10 +2130,9 @@ checkFailedInitializedJobMovement(); // Check failed waiting job movement - checkFailedWaitingJobMovement(); - + checkFailedWaitingJobMovement(); } - + public void testStartWithoutDefaultQueueConfigured() throws Exception { //configure a single queue which is not default queue String[] qs = {"q1"}; @@ -2189,9 +2312,9 @@ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1"); taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", fjob2); taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", fjob2); - taskTrackerManager.finalizeJob(fjob2); - + taskTrackerManager.finalizeJob(fjob2); } + /** * Test case to test scheduling of *
    @@ -2207,17 +2330,16 @@ *
      *
    • Submit one high ram job which has speculative reduce.
    • *
    • Submit a normal job which has no speculative reduce.
    • - *
    • Scheduler should schedule first all reduce tasks from first job and block - * the cluster till both reduces are completed.
    • + *
    • Scheduler should schedule first all reduce tasks from first job and + * block the cluster till both reduces are completed.
    • *
    * *
* @throws IOException */ public void testHighRamJobWithSpeculativeExecution() throws IOException { - // 2 map and 2 reduce slots + // 2 TTs, 3 map and 3 reduce slots on each TT taskTrackerManager = new FakeTaskTrackerManager(2, 3, 3); - // 1GB for each map, 1GB for each reduce taskTrackerManager.addQueues(new String[] { "default" }); ArrayList queues = new ArrayList(); @@ -2225,6 +2347,7 @@ resConf.setFakeQueues(queues); scheduler.setTaskTrackerManager(taskTrackerManager); // enabled memory-based scheduling + // 1GB for each map, 1GB for each reduce scheduler.getConf().setLong( JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 3 * 1024L); @@ -2248,8 +2371,9 @@ jConf.setUser("u1"); jConf.setMapSpeculativeExecution(true); jConf.setReduceSpeculativeExecution(false); - FakeJobInProgress job1 = new FakeJobInProgress(new JobID("test", ++jobCounter), - jConf, taskTrackerManager,"u1"); + FakeJobInProgress job1 = + new FakeJobInProgress(new JobID("test", ++jobCounter), jConf, + taskTrackerManager, "u1"); taskTrackerManager.submitJob(job1); //Submit normal job @@ -2273,21 +2397,29 @@ //complete none of the tasks from other jobs would be scheduled. checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L); - assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0); + // Total 2 map slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f); + //make same tracker get back, check if you are blocking. Your job //has speculative map task so tracker should be blocked even tho' it //can run job2's map. assertNull(scheduler.assignTasks(tracker("tt1"))); + // Total 2 map slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f); checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L); //TT2 now gets speculative map of the job1 checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2"); + // Total 4 map slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 4, 66.7f); checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L); // Now since the first job has no more speculative maps, it can schedule // the second job. checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); + // Total 5 map slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 5, 83.3f); checkMemReservedForTasksOnTT("tt1", 3 * 1024L, 0L); //finish everything @@ -2310,14 +2442,15 @@ jConf.setUser("u1"); jConf.setMapSpeculativeExecution(false); jConf.setReduceSpeculativeExecution(true); - FakeJobInProgress job3 = new FakeJobInProgress(new JobID("test", ++jobCounter), - jConf, taskTrackerManager,"u1"); + FakeJobInProgress job3 = + new FakeJobInProgress(new JobID("test", ++jobCounter), jConf, + taskTrackerManager, "u1"); taskTrackerManager.submitJob(job3); //Submit normal job w.r.t reduces jConf = new JobConf(); - jConf.setMemoryForMapTask(2 * 1024L); - jConf.setMemoryForReduceTask(1 * 104L); + jConf.setMemoryForMapTask(1 * 1024L); + jConf.setMemoryForReduceTask(1 * 1024L); jConf.setNumMapTasks(1); jConf.setNumReduceTasks(1); jConf.setQueueName("default"); @@ -2331,32 +2464,167 @@ // Finish up the map scheduler checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1"); + // Total 2 map slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f); checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L); + checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2"); - checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L); + // Total 3 map slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 50.0f); + checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L); // first, a reduce from j3 will run // at this point, there is a speculative task for the same job to be //scheduled. This task would be scheduled. Till the tasks from job3 gets //complete none of the tasks from other jobs would be scheduled. checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1"); - assertEquals("pending reduces greater than zero " , job3.pendingMaps(), 0); + assertEquals("pending reduces greater than zero ", job3.pendingReduces(), + 0); + // Total 2 reduce slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2, + 33.3f); checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2*1024L); //make same tracker get back, check if you are blocking. Your job //has speculative reduce task so tracker should be blocked even tho' it //can run job4's reduce. assertNull(scheduler.assignTasks(tracker("tt1"))); - //TT2 now gets speculative map of the job1 + // Total 2 reduce slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2, + 33.3f); + + //TT2 now gets speculative reduce of the job3 checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2"); - checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L); + // Total 4 reduce slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 4, + 66.7f); + checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 2 * 1024L); // Now since j3 has no more speculative reduces, it can schedule // the j4. checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1"); + // Total 5 reduce slots should be accounted for. + checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 5, + 83.3f); checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 3 * 1024L); } + /** + * Test to verify that queue ordering is based on the number of slots occupied + * and hence to verify that presence of high memory jobs is reflected properly + * while determining used capacities of queues and hence the queue ordering. + * + * @throws IOException + */ + public void testQueueOrdering() + throws IOException { + taskTrackerManager = new FakeTaskTrackerManager(2, 6, 6); + scheduler.setTaskTrackerManager(taskTrackerManager); + String[] qs = { "default", "q1" }; + String[] reversedQs = { qs[1], qs[0] }; + taskTrackerManager.addQueues(qs); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 50.0f, true, 100)); + queues.add(new FakeQueueInfo("q1", 50.0f, true, 100)); + resConf.setFakeQueues(queues); + // enabled memory-based scheduling + // Normal job in the cluster would be 1GB maps/reduces + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of " + + "6 map and 6 reduce tasks"); + JobConf jConf = new JobConf(conf); + jConf.setMemoryForMapTask(2 * 1024); + jConf.setMemoryForReduceTask(2 * 1024); + jConf.setNumMapTasks(6); + jConf.setNumReduceTasks(6); + jConf.setQueueName("default"); + jConf.setUser("u1"); + FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf); + + // Submit a normal job to the other queue. + jConf = new JobConf(conf); + jConf.setMemoryForMapTask(1 * 1024); + jConf.setMemoryForReduceTask(1 * 1024); + jConf.setNumMapTasks(6); + jConf.setNumReduceTasks(6); + jConf.setUser("u1"); + jConf.setQueueName("q1"); + FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf); + + // Map 1 of high memory job + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + checkQueuesOrder(qs, scheduler + .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP)); + + // Reduce 1 of high memory job + checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); + checkQueuesOrder(qs, scheduler + .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE)); + + // Map 1 of normal job + checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); + checkQueuesOrder(reversedQs, scheduler + .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP)); + + // Reduce 1 of normal job + checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1"); + checkQueuesOrder(reversedQs, scheduler + .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE)); + + // Map 2 of normal job + checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1"); + checkQueuesOrder(reversedQs, scheduler + .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP)); + + // Reduce 2 of normal job + checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1"); + checkQueuesOrder(reversedQs, scheduler + .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE)); + + // Now both the queues are equally served. But the comparator doesn't change + // the order if queues are equally served. + + // Map 3 of normal job + checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2"); + checkQueuesOrder(reversedQs, scheduler + .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP)); + + // Reduce 3 of normal job + checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2"); + checkQueuesOrder(reversedQs, scheduler + .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE)); + + // Map 2 of high memory job + checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); + checkQueuesOrder(qs, scheduler + .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP)); + + // Reduce 2 of high memory job + checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2"); + checkQueuesOrder(qs, scheduler + .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE)); + + // Map 4 of normal job + checkAssignment("tt2", "attempt_test_0002_m_000004_0 on tt2"); + checkQueuesOrder(reversedQs, scheduler + .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP)); + + // Reduce 4 of normal job + checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2"); + checkQueuesOrder(reversedQs, scheduler + .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE)); + } + private void checkFailedInitializedJobMovement() throws IOException { JobQueuesManager mgr = scheduler.jobQueuesManager; @@ -2436,6 +2704,89 @@ userJobs.put(user, jips); } return userJobs; + } + + + protected TaskTrackerStatus tracker(String taskTrackerName) { + return taskTrackerManager.getTaskTracker(taskTrackerName); + } + + protected Task checkAssignment(String taskTrackerName, + String expectedTaskString) throws IOException { + List tasks = scheduler.assignTasks(tracker(taskTrackerName)); + assertNotNull(expectedTaskString, tasks); + assertEquals(expectedTaskString, 1, tasks.size()); + assertEquals(expectedTaskString, tasks.get(0).toString()); + return tasks.get(0); + } + + /** + * Get the amount of memory that is reserved for tasks on the taskTracker and + * verify that it matches what is expected. + * + * @param taskTracker + * @param expectedMemForMapsOnTT + * @param expectedMemForReducesOnTT + */ + private void checkMemReservedForTasksOnTT(String taskTracker, + Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) { + Long observedMemForMapsOnTT = + scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker), + CapacityTaskScheduler.TYPE.MAP); + Long observedMemForReducesOnTT = + scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker), + CapacityTaskScheduler.TYPE.REDUCE); + if (expectedMemForMapsOnTT == null) { + assertTrue(observedMemForMapsOnTT == null); + } else { + assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT)); + } + if (expectedMemForReducesOnTT == null) { + assertTrue(observedMemForReducesOnTT == null); + } else { + assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT)); + } + } + /** + * Verify the number of slots of type 'type' from the queue 'queue'. + * + * @param queue + * @param type + * @param numActiveUsers in the queue at present. + * @param expectedOccupiedSlots + * @param expectedOccupiedSlotsPercent + * @return + */ + private void checkOccupiedSlots(String queue, + CapacityTaskScheduler.TYPE type, int numActiveUsers, + int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) { + scheduler.updateQSIInfoForTests(); + QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager(); + String schedulingInfo = + queueManager.getJobQueueInfo(queue).getSchedulingInfo(); + String[] infoStrings = schedulingInfo.split("\n"); + int index = -1; + if (type.equals(CapacityTaskScheduler.TYPE.MAP)) { + index = 7; + } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) { + index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers); + } + LOG.info(infoStrings[index]); + assertEquals(String.format("Used capacity: %d (%.1f%% of Capacity)", + expectedOccupiedSlots, expectedOccupiedSlotsPercent), + infoStrings[index]); + } + + private void checkQueuesOrder(String[] expectedOrder, String[] observedOrder) { + assertTrue("Observed and expected queues are not of same length.", + expectedOrder.length == observedOrder.length); + int i = 0; + for (String expectedQ : expectedOrder) { + assertTrue("Observed and expected queues are not in the same order. " + + "Differ at index " + i + ". Got " + observedOrder[i] + + " instead of " + expectedQ, expectedQ.equals(observedOrder[i])); + i++; + } } } Modified: hadoop/core/branches/branch-0.20/src/webapps/job/jobdetails.jsp URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/webapps/job/jobdetails.jsp?rev=781918&r1=781917&r2=781918&view=diff ============================================================================== --- hadoop/core/branches/branch-0.20/src/webapps/job/jobdetails.jsp (original) +++ hadoop/core/branches/branch-0.20/src/webapps/job/jobdetails.jsp Fri Jun 5 05:59:19 2009 @@ -248,6 +248,10 @@ "" + flakyTaskTrackers + "
\n"); } + if (job.getSchedulingInfo() != null) { + out.print("Job Scheduling information: " + + job.getSchedulingInfo().toString() +"\n"); + } out.print("
\n"); out.print(""); out.print("" +
Kind% CompleteNum Tasks