Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 89130 invoked from network); 4 Mar 2011 03:25:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:25:51 -0000 Received: (qmail 93509 invoked by uid 500); 4 Mar 2011 03:25:51 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 93451 invoked by uid 500); 4 Mar 2011 03:25:50 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 93442 invoked by uid 99); 4 Mar 2011 03:25:50 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:25:50 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:25:44 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 60DA02388901; Fri, 4 Mar 2011 03:25:22 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1076951 - in /hadoop/common/branches/branch-0.20-security-patches: conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/docs/src/documentation/content... Date: Fri, 04 Mar 2011 03:25:22 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304032522.60DA02388901@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Fri Mar 4 03:25:21 2011 New Revision: 1076951 URL: http://svn.apache.org/viewvc?rev=1076951&view=rev Log: commit 2a7c1121115819f918adb47c4aebd4daa693a0ea Author: Lee Tucker Date: Thu Jul 30 17:40:40 2009 -0700 Applying patch 2855407.mr532.patch Modified: hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml Modified: hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template?rev=1076951&r1=1076950&r2=1076951&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template (original) +++ hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template Fri Mar 4 03:25:21 2011 @@ -45,6 +45,44 @@ of the job queue. + + + mapred.capacity-scheduler.queue.default.max.map.slots + -1 + + This value is the maximum map slots that can be used in a + queue at any point of time. So for example assuming above config value + is 100 , not more than 100 tasks would be in the queue at any point of + time, assuming each task takes one slot. + + Default value of -1 would disable this capping feature + + Typically the queue capacity should be equal to this limit. + If queue capacity is more than this limit, excess capacity will be + used by the other queues. If queue capacity is less than the above + limit , then the limit would be the queue capacity - as in the current + implementation + + + + + mapred.capacity-scheduler.queue.default.max.reduce.slots + -1 + + This value is the maximum reduce slots that can be used in a + queue at any point of time. So for example assuming above config value + is 100 , not more than 100 reduce tasks would be in the queue at any point + of time, assuming each task takes one slot. + + Default value of -1 would disable this capping feature + + Typically the queue capacity should be equal to this limit. + If queue capacity is more than this limit, excess capacity will be + used by the other queues. If queue capacity is less than the above + limit , then the limit would be the queue capacity - as in the current + implementation + + Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=1076951&r1=1076950&r2=1076951&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Mar 4 03:25:21 2011 @@ -76,6 +76,18 @@ class CapacitySchedulerConf { "mapred.capacity-scheduler.task.limit.maxpmem"; /** + * Configuration that provides the maximum cap for the map task in a queue + * at any given point of time. + */ + static final String MAX_MAP_CAP_PROPERTY = "max.map.slots"; + + /** + * Configuration that provides the maximum cap for the reduce task in a queue + * at any given point of time. + */ + static final String MAX_REDUCE_CAP_PROPERTY = "max.reduce.slots"; + + /** * The constant which defines the default initialization thread * polling interval, denoted in milliseconds. */ @@ -357,4 +369,40 @@ class CapacitySchedulerConf { rmConf.setInt( "mapred.capacity-scheduler.init-worker-threads", poolSize); } + + /** + * get the max map slots cap + * @param queue + * @return + */ + public int getMaxMapCap(String queue) { + return rmConf.getInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),-1); + } + + /** + * Used for testing + * @param queue + * @param val + */ + public void setMaxMapCap(String queue,int val) { + rmConf.setInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),val); + } + + /** + * get the max reduce slots cap + * @param queue + * @return + */ + public int getMaxReduceCap(String queue) { + return rmConf.getInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),-1); + } + + /** + * Used for testing + * @param queue + * @param val + */ + public void setMaxReduceCap(String queue,int val) { + rmConf.setInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),val); + } } Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1076951&r1=1076950&r2=1076951&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar 4 03:25:21 2011 @@ -76,23 +76,36 @@ class CapacityTaskScheduler extends Task **********************************************************************/ private static class TaskSchedulingInfo { + + private static final String LIMIT_NORMALIZED_CAPACITY_STRING + = "(Capacity is restricted to max limit of %d slots.\n" + + "Remaining %d slots will be used by other queues.)\n"; /** * the actual capacity, which depends on how many slots are available * in the cluster at any given time. */ - int capacity = 0; + private int capacity = 0; // number of running tasks int numRunningTasks = 0; // number of slots occupied by running tasks int numSlotsOccupied = 0; /** + * max task limit + * This value is the maximum slots that can be used in a + * queue at any point of time. So for example assuming above config value + * is 100 , not more than 100 tasks would be in the queue at any point of + * time, assuming each task takes one slot. + */ + private int maxTaskLimit = -1; + + /** * for each user, we need to keep track of number of slots occupied by * running tasks */ Map numSlotsOccupiedByUser = new HashMap(); - + /** * reset the variables associated with tasks */ @@ -104,15 +117,53 @@ class CapacityTaskScheduler extends Task } } + + int getMaxTaskLimit() { + return maxTaskLimit; + } + + void setMaxTaskLimit(int maxTaskCap) { + this.maxTaskLimit = maxTaskCap; + } + + /** + * This method checks for maxTaskLimit and sends minimum of maxTaskLimit and + * capacity. + * @return + */ + int getCapacity() { + return ((maxTaskLimit >= 0) && (maxTaskLimit < capacity)) ? maxTaskLimit : + capacity; + } + + /** + * Mutator method for capacity + * @param capacity + */ + void setCapacity(int capacity) { + this.capacity = capacity; + } + + /** * return information about the tasks */ @Override public String toString() { float occupiedSlotsAsPercent = - capacity != 0 ? ((float) numSlotsOccupied * 100 / capacity) : 0; + getCapacity() != 0 ? + ((float) numSlotsOccupied * 100 / getCapacity()) : 0; StringBuffer sb = new StringBuffer(); + sb.append("Capacity: " + capacity + " slots\n"); + //If maxTaskLimit is less than the capacity + if (maxTaskLimit >= 0 && maxTaskLimit < capacity) { + sb.append(String.format(LIMIT_NORMALIZED_CAPACITY_STRING, + maxTaskLimit, (capacity-maxTaskLimit))); + } + if (maxTaskLimit >= 0) { + sb.append(String.format("Maximum Slots Limit: %d\n", maxTaskLimit)); + } sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n", Integer.valueOf(numSlotsOccupied), Float .valueOf(occupiedSlotsAsPercent))); @@ -166,14 +217,17 @@ class CapacityTaskScheduler extends Task TaskSchedulingInfo mapTSI; TaskSchedulingInfo reduceTSI; - public QueueSchedulingInfo(String queueName, float capacityPercent, - int ulMin, JobQueuesManager jobQueuesManager) { + public QueueSchedulingInfo(String queueName, float capacityPercent, + int ulMin, JobQueuesManager jobQueuesManager, + int mapCap, int reduceCap) { this.queueName = new String(queueName); this.capacityPercent = capacityPercent; this.ulMin = ulMin; this.jobQueuesManager = jobQueuesManager; this.mapTSI = new TaskSchedulingInfo(); this.reduceTSI = new TaskSchedulingInfo(); + this.mapTSI.setMaxTaskLimit(mapCap); + this.reduceTSI.setMaxTaskLimit(reduceCap); } /** @@ -194,7 +248,7 @@ class CapacityTaskScheduler extends Task (jobQueuesManager.doesQueueSupportPriorities(queueName))? "YES":"NO")); sb.append("-------------\n"); - + sb.append("Map tasks\n"); sb.append(mapTSI.toString()); sb.append("-------------\n"); @@ -339,8 +393,9 @@ class CapacityTaskScheduler extends Task * capacity. This ordered list is iterated over, when assigning tasks. */ private List qsiForAssigningTasks = - new ArrayList(); - /** + new ArrayList(); + + /** * Comparator to sort queues. * For maps, we need to sort on QueueSchedulingInfo.mapTSI. For * reducers, we use reduceTSI. So we'll need separate comparators. @@ -353,10 +408,10 @@ class CapacityTaskScheduler extends Task TaskSchedulingInfo t2 = getTSI(q2); // look at how much capacity they've filled. Treat a queue with // capacity=0 equivalent to a queue running at capacity - double r1 = (0 == t1.capacity)? 1.0f: - (double)t1.numSlotsOccupied/(double)t1.capacity; - double r2 = (0 == t2.capacity)? 1.0f: - (double)t2.numSlotsOccupied/(double)t2.capacity; + double r1 = (0 == t1.getCapacity())? 1.0f: + (double)t1.numSlotsOccupied/(double) t1.getCapacity(); + double r2 = (0 == t2.getCapacity())? 1.0f: + (double)t2.numSlotsOccupied/(double) t2.getCapacity(); if (r1r2) return 1; else return 0; @@ -412,8 +467,8 @@ class CapacityTaskScheduler extends Task // slots we're getting). int currentCapacity; TaskSchedulingInfo tsi = getTSI(qsi); - if (tsi.numSlotsOccupied < tsi.capacity) { - currentCapacity = tsi.capacity; + if (tsi.numSlotsOccupied < tsi.getCapacity()) { + currentCapacity = tsi.getCapacity(); } else { currentCapacity = tsi.numSlotsOccupied + getSlotsPerTask(j); @@ -597,7 +652,11 @@ class CapacityTaskScheduler extends Task for (QueueSchedulingInfo qsi : qsiForAssigningTasks) { // we may have queues with capacity=0. We shouldn't look at jobs from // these queues - if (0 == getTSI(qsi).capacity) { + if (0 == getTSI(qsi).getCapacity()) { + continue; + } + + if(this.areTasksInQueueOverLimit(qsi)) { continue; } TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi); @@ -622,6 +681,31 @@ class CapacityTaskScheduler extends Task return TaskLookupResult.getNoTaskFoundResult(); } + + /** + * Check if the max task limit is set for this queue + * if set , ignore this qsi if current num of occupied + * slots of a TYPE in the queue is >= getMaxTaskCap(). + * @param qsi + * @return + */ + + private boolean areTasksInQueueOverLimit(QueueSchedulingInfo qsi) { + TaskSchedulingInfo tsi = getTSI(qsi); + if (tsi.getMaxTaskLimit() >= 0) { + if (tsi.numSlotsOccupied >= tsi.getCapacity()) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Queue " + qsi.queueName + " has reached its max " + type + + " limit "); + LOG.debug("Current running tasks " + tsi.getCapacity()); + } + return true; + } + } + return false; + } + // for debugging. private void printQSIs() { if (LOG.isDebugEnabled()) { @@ -629,12 +713,16 @@ class CapacityTaskScheduler extends Task for (QueueSchedulingInfo qsi : qsiForAssigningTasks) { TaskSchedulingInfo tsi = getTSI(qsi); Collection runJobs = - scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName); - s.append(String.format(" Queue '%s'(%s): runningTasks=%d, " - + "occupiedSlots=%d, capacity=%d, runJobs=%d", qsi.queueName, + scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName); + s.append( + String.format( + " Queue '%s'(%s): runningTasks=%d, " + + "occupiedSlots=%d, capacity=%d, runJobs=%d maxTaskLimit=%d ", + qsi.queueName, this.type, Integer.valueOf(tsi.numRunningTasks), Integer - .valueOf(tsi.numSlotsOccupied), Integer - .valueOf(tsi.capacity), Integer.valueOf(runJobs.size()))); + .valueOf(tsi.numSlotsOccupied), Integer + .valueOf(tsi.getCapacity()), Integer.valueOf(runJobs.size()), + Integer.valueOf(tsi.getMaxTaskLimit()))); } LOG.debug(s); } @@ -970,8 +1058,9 @@ class CapacityTaskScheduler extends Task } int ulMin = schedConf.getMinimumUserLimitPercent(queueName); // create our QSI and add to our hashmap - QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, capacity, - ulMin, jobQueuesManager); + QueueSchedulingInfo qsi = new QueueSchedulingInfo( + queueName, capacity, ulMin, jobQueuesManager, schedConf.getMaxMapCap( + queueName), schedConf.getMaxReduceCap(queueName)); queueInfoMap.put(queueName, qsi); // create the queues of job objects @@ -1068,12 +1157,12 @@ class CapacityTaskScheduler extends Task for (QueueSchedulingInfo qsi: queueInfoMap.values()) { // compute new capacities, if TT slots have changed if (mapClusterCapacity != prevMapClusterCapacity) { - qsi.mapTSI.capacity = - (int)(qsi.capacityPercent*mapClusterCapacity/100); + qsi.mapTSI.setCapacity((int) + (qsi.capacityPercent*mapClusterCapacity/100)); } if (reduceClusterCapacity != prevReduceClusterCapacity) { - qsi.reduceTSI.capacity = - (int)(qsi.capacityPercent*reduceClusterCapacity/100); + qsi.reduceTSI.setCapacity((int) + (qsi.capacityPercent*reduceClusterCapacity/100)); } // reset running/pending tasks, tasks per user qsi.mapTSI.resetTaskVars(); Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1076951&r1=1076950&r2=1076951&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar 4 03:25:21 2011 @@ -862,6 +862,86 @@ public class TestCapacityScheduler exten 1, rqueue.size()); } + + /** + * Test the max map limit. + * @throws IOException + */ + public void testMaxMapCap() throws IOException { + this.setUp(4,1,1); + taskTrackerManager.addQueues(new String[] {"default"}); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 100.0f, false, 1)); + resConf.setFakeQueues(queues); + resConf.setMaxMapCap("default",2); + resConf.setMaxReduceCap("default",-1); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + //submit the Job + FakeJobInProgress fjob1 = + submitJobAndInit(JobStatus.PREP,3,1,"default","user"); + + List task1 = scheduler.assignTasks(tracker("tt1")); + List task2 = scheduler.assignTasks(tracker("tt2")); + + //Once the 2 tasks are running the third assigment should be reduce. + checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3"); + //This should fail. + List task4 = scheduler.assignTasks(tracker("tt4")); + assertNull(task4); + //Now complete the task 1. + // complete the job + taskTrackerManager.finishTask("tt1", task1.get(0).getTaskID().toString(), + fjob1); + //We have completed the tt1 task which was a map task so we expect one map + //task to be picked up + checkAssignment("tt4","attempt_test_0001_m_000003_0 on tt4"); + } + + /** + * Test max reduce limit + * @throws IOException + */ + public void testMaxReduceCap() throws IOException { + this.setUp(4, 1, 1); + taskTrackerManager.addQueues(new String[]{"default"}); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 100.0f, false, 1)); + resConf.setFakeQueues(queues); + resConf.setMaxMapCap("default", -1); + resConf.setMaxReduceCap("default", 2); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + //submit the Job + FakeJobInProgress fjob1 = + submitJobAndInit(JobStatus.PREP, 1, 3, "default", "user"); + + List task1 = scheduler.assignTasks(tracker("tt1")); + List task2 = scheduler.assignTasks(tracker("tt2")); + List task3 = scheduler.assignTasks(tracker("tt3")); + + //This should fail. 1 map, 2 reduces , we have reached the limit. + List task4 = scheduler.assignTasks(tracker("tt4")); + assertNull(task4); + //Now complete the task 1 i.e map task. + // complete the job + taskTrackerManager.finishTask( + "tt1", task1.get(0).getTaskID().toString(), + fjob1); + + //This should still fail as only map task is done + task4 = scheduler.assignTasks(tracker("tt4")); + assertNull(task4); + + //Complete the reduce task + taskTrackerManager.finishTask( + "tt2", task2.get(0).getTaskID().toString(), fjob1); + + //One reduce is done hence assign the new reduce. + checkAssignment("tt4","attempt_test_0001_r_000003_0 on tt4"); + } // test if the queue reflects the changes private void testJobOrderChange(FakeJobInProgress fjob1, @@ -1132,6 +1212,144 @@ public class TestCapacityScheduler exten checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2"); } + /** + * Creates a queue with max task limit of 2 + * submit 1 job in the queue which is high ram(2 slots) . As 2 slots are + * given to high ram job and are reserved , no other tasks are accepted . + * + * @throws IOException + */ + public void testHighMemoryBlockingWithMaxLimit() + throws IOException { + + // 2 map and 1 reduce slots + taskTrackerManager = new FakeTaskTrackerManager(2, 2, 1); + + taskTrackerManager.addQueues(new String[] { "defaultXYZ" }); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("defaultXYZ", 100.0f, true, 25)); + resConf.setFakeQueues(queues); + resConf.setMaxMapCap("defaultXYZ",2); + scheduler.setTaskTrackerManager(taskTrackerManager); + // enabled memory-based scheduling + // Normal job in the cluster would be 1GB maps/reduces + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, + 2 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, + 1 * 1024); + scheduler.getConf().setLong( + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + // The situation : Submit 2 jobs with high memory map task + //Set the max limit for queue to 2 , + // try submitting more map tasks to the queue , it should not happen + + LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of " + + "2 map tasks"); + JobConf jConf = new JobConf(conf); + jConf.setMemoryForMapTask(2 * 1024); + jConf.setMemoryForReduceTask(0); + jConf.setNumMapTasks(2); + jConf.setNumReduceTasks(0); + jConf.setQueueName("defaultXYZ"); + jConf.setUser("u1"); + FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf); + + LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of " + + "2 map/red tasks"); + jConf = new JobConf(conf); + jConf.setMemoryForMapTask(1 * 1024); + jConf.setMemoryForReduceTask(1 * 1024); + jConf.setNumMapTasks(2); + jConf.setNumReduceTasks(2); + jConf.setQueueName("defaultXYZ"); + jConf.setUser("u1"); + FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf); + + // first, a map from j1 will run this is a high memory job so it would + // occupy the 2 slots + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + + checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1); + checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L); + + // at this point, the scheduler tries to schedule another map from j1. + // there isn't enough space. The second job's reduce should be scheduled. + checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1"); + + checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1); + checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L); + + //at this point , the scheduler tries to schedule another map from j2 for + //another task tracker. + // This should not happen as all the map slots are taken + //by the first task itself.hence reduce task from the second job is given + + checkAssignment("tt2","attempt_test_0002_r_000002_0 on tt2"); + } + + /** + * test if user limits automatically adjust to max map or reduce limit + */ + public void testUserLimitsWithMaxLimits() throws Exception { + setUp(4, 4, 4); + // set up some queues + String[] qs = {"default"}; + taskTrackerManager.addQueues(qs); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 100.0f, true, 50)); + resConf.setFakeQueues(queues); + resConf.setMaxMapCap("default", 2); + resConf.setMaxReduceCap("default", 2); + scheduler.setResourceManagerConf(resConf); + scheduler.start(); + + // submit a job + FakeJobInProgress fjob1 = + submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1"); + FakeJobInProgress fjob2 = + submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2"); + + // for queue 'default', the capacity for maps is 2. + // But the max map limit is 2 + // hence user should be getting not more than 1 as it is the 50%. + Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + + //Now we should get the task from the other job. As the + //first user has reached his max map limit. + checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); + + //Now we are done with map limit , now if we ask for task we should + // get reduce from 1st job + checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3"); + // Now we're at full capacity for maps. 1 done with reduces for job 1 so + // now we should get 1 reduces for job 2 + Task t4 = checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4"); + + taskTrackerManager.finishTask( + "tt1", t1.getTaskID().toString(), + fjob1); + + //tt1 completed the task so we have 1 map slot for u1 + // we are assigning the 2nd map task from fjob1 + checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1"); + + taskTrackerManager.finishTask( + "tt4", t4.getTaskID().toString(), + fjob2); + //tt4 completed the task , so we have 1 reduce slot for u2 + //we are assigning the 2nd reduce from fjob2 + checkAssignment("tt4", "attempt_test_0002_r_000002_0 on tt4"); + + } + + // test user limits public void testUserLimits() throws Exception { // set up some queues @@ -2707,17 +2925,25 @@ public class TestCapacityScheduler exten /** * Verify the number of slots of type 'type' from the queue 'queue'. + * incrMapIndex and incrReduceIndex are set , when expected output string is + * changed.these values can be set if the index of + * "Used capacity: %d (%.1f%% of Capacity)" + * is changed. * * @param queue * @param type * @param numActiveUsers in the queue at present. * @param expectedOccupiedSlots * @param expectedOccupiedSlotsPercent - * @return + * @param incrMapIndex + * @param incrReduceIndex */ - private void checkOccupiedSlots(String queue, - TaskType type, int numActiveUsers, - int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) { + private void checkOccupiedSlots( + String queue, + TaskType type, int numActiveUsers, + int expectedOccupiedSlots, float expectedOccupiedSlotsPercent,int incrMapIndex + ,int incrReduceIndex + ) { scheduler.updateQSIInfoForTests(); QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager(); String schedulingInfo = @@ -2725,9 +2951,9 @@ public class TestCapacityScheduler exten String[] infoStrings = schedulingInfo.split("\n"); int index = -1; if (type.equals(TaskType.MAP)) { - index = 7; + index = 7+ incrMapIndex; } else if (type.equals(TaskType.REDUCE)) { - index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers); + index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers)+incrReduceIndex; } LOG.info(infoStrings[index]); assertEquals(String.format("Used capacity: %d (%.1f%% of Capacity)", @@ -2735,6 +2961,24 @@ public class TestCapacityScheduler exten infoStrings[index]); } + /** + * + * @param queue + * @param type + * @param numActiveUsers + * @param expectedOccupiedSlots + * @param expectedOccupiedSlotsPercent + */ + private void checkOccupiedSlots( + String queue, + TaskType type, int numActiveUsers, + int expectedOccupiedSlots, float expectedOccupiedSlotsPercent + ) { + checkOccupiedSlots( + queue, type, numActiveUsers, expectedOccupiedSlots, + expectedOccupiedSlotsPercent,0,0); + } + private void checkQueuesOrder(String[] expectedOrder, String[] observedOrder) { assertTrue("Observed and expected queues are not of same length.", expectedOrder.length == observedOrder.length); Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml?rev=1076951&r1=1076950&r2=1076951&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml Fri Mar 4 03:25:21 2011 @@ -204,6 +204,38 @@ users, no user can use more than 25% of the queue's resources. A value of 100 implies no user limits are imposed. + mapred.capacity-scheduler.queue.<queue-name>.max.map.slots + + This value is the maximum max slots that can be used in a + queue at any point of time. So for example assuming above config value + is 100 , not more than 100 tasks would be in the queue at any point of + time, assuming each task takes one slot. + + Default value of -1 would disable this capping feature + + Typically the queue capacity should be equal to this limit. + If queue capacity is more than this limit, excess capacity will be + used by the other queues. If queue capacity is less than the above + limit , then the limit would be the queue capacity - as in the current + implementation + + + mapred.capacity-scheduler.queue.<queue-name>.max.reduce.slots + + This value is the maximum reduce slots that can be used in a + queue at any point of time. So for example assuming above config value + is 100 , not more than 100 tasks would be in the queue at any point of + time, assuming each task takes one slot. + + Default value of -1 would disable this capping feature + + Typically the queue capacity should be equal to this limit. + If queue capacity is more than this limit, excess capacity will be + used by the other queues. If queue capacity is less than the above + limit , then the limit would be the queue capacity - as in the current + implementation + +