Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 14197 invoked from network); 8 Jan 2009 23:12:46 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Jan 2009 23:12:46 -0000 Received: (qmail 85739 invoked by uid 500); 8 Jan 2009 23:12:46 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 85572 invoked by uid 500); 8 Jan 2009 23:12:46 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 85563 invoked by uid 99); 8 Jan 2009 23:12:46 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Jan 2009 15:12:46 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Jan 2009 23:12:40 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id E993E238889C; Thu, 8 Jan 2009 15:12:19 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r732869 - in /hadoop/core/trunk/src/contrib/fairscheduler: README src/java/org/apache/hadoop/mapred/FairScheduler.java src/java/org/apache/hadoop/mapred/PoolManager.java src/test/org/apache/hadoop/mapred/TestFairScheduler.java Date: Thu, 08 Jan 2009 23:12:19 -0000 To: core-commits@hadoop.apache.org From: matei@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090108231219.E993E238889C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: matei Date: Thu Jan 8 15:12:19 2009 New Revision: 732869 URL: http://svn.apache.org/viewvc?rev=732869&view=rev Log: HADOOP-4789. Change fair scheduler to share between pools by default, not between invidual jobs. Modified: hadoop/core/trunk/src/contrib/fairscheduler/README hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Modified: hadoop/core/trunk/src/contrib/fairscheduler/README URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/README?rev=732869&r1=732868&r2=732869&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/fairscheduler/README (original) +++ hadoop/core/trunk/src/contrib/fairscheduler/README Thu Jan 8 15:12:19 2009 @@ -8,7 +8,8 @@ # implied. See the License for the specific language governing # permissions and limitations under the License. -This package implements fair scheduling for MapReduce jobs. +This package implements a fair scheduler for MapReduce jobs with additional +support for guaranteed shares and job limits. Fair scheduling is a method of assigning resources to jobs such that all jobs get, on average, an equal share of resources over time. When there is a single @@ -21,18 +22,24 @@ priorities are used as weights to determine the fraction of total compute time that each job should get. +The scheduler actually organizes jobs further into "pools", and shares resources +fairly between these pools. By default, there is a separate pool for each user, +so that each user gets the same share of the cluster no matter how many jobs +they submit. However, it is also possible to set a job's pool based on the +user's Unix group or any other jobconf property, such as the queue name +property used by the Capacity Scheduler (JIRA HADOOP-3445). Within each pool, +fair sharing is used to share capacity between the running jobs. Pools can also +be given weights to share the cluster non-proportionally in the config file. + In addition to providing fair sharing, the Fair Scheduler allows assigning -jobs to "pools" with guaranteed minimum shares. When a pool contains jobs, -it gets at least its minimum share, but when a pool does not need its full -capacity, the excess is shared between other running jobs. Thus pools are -a way to guarantee capacity for particular user groups while utilizing the -cluster efficiently when these users are not submitting any jobs. Within each -pool, fair sharing is used to share capacity between the running jobs. By -default the pool is set based on the queue.name property in the jobconf which -will be introduced with the Hadoop Resource Manager (JIRA 3445), but it's -possible to also have a pool per user or per Unix user group. +guaranteed minimum shares to pools, which is useful for ensuring that certain +users, groups or production applications always get sufficient resources. +When a pool contains jobs, it gets at least its minimum share, but when the pool +does not need its full guaranteed share, the excess is split between other +running jobs. This lets the scheduler guarantee capacity for pools while +utilizing resources efficiently when these pools don't contain jobs. -The fair scheduler lets all jobs run by default, but it is also possible to +The Fair Scheduler lets all jobs run by default, but it is also possible to limit the number of running jobs per user and per pool through the config file. This can be useful when a user must submit hundreds of jobs at once, or in general to improve performance if running too many jobs at once would @@ -45,8 +52,7 @@ Finally, the fair scheduler provides several extension points where the basic functionality can be extended. For example, the weight calculation can be modified to give a priority boost to new jobs, implementing a "shortest job -first" like policy which will reduce response times for interactive jobs even -further. +first" policy which reduces response times for interactive jobs even further. -------------------------------------------------------------------------------- @@ -88,17 +94,22 @@ for each pool, as well as the per-pool and per-user limits on number of running jobs. If this property is not provided, allocations are not used. This file must be in XML format, and can contain three types of elements: - - pool elements, which may contain elements for minMaps, minReduces and - maxRunningJobs (limit the number of jobs from the pool to run at once). - - user elements, which may contain a maxRunningJobs to limit jobs. - - A userMaxJobsDefault element, which sets the running job limit for any - users that do not have their own elements. + - pool elements, which may contain elements for minMaps, minReduces, + maxRunningJobs (limit the number of jobs from the pool to run at once), + and weight (to share the cluster non-proportionally with other pools). + - user elements, which may contain a maxRunningJobs to limit jobs. Note + that by default, there is a separate pool for each user, so these may not + be necessary; they are useful, however, if you create a pool per user + group or manually assign jobs to pools. + - A userMaxJobsDefault element, which sets the default running job limit + for any users whose limit is not specified. The following example file shows how to create each type of element: 5 5 + 2.0 6 @@ -106,11 +117,13 @@ 3 This example creates a pool sample_pool with a guarantee of 5 map slots - and 5 reduce slots. It also limits the number of running jobs per user + and 5 reduce slots. The pool also has a weight of 2.0, meaning it has a 2x + higher share of the cluster than other pools (the default weight is 1). + Finally, the example limits the number of running jobs per user to 3, except for sample_user, who can run 6 jobs concurrently. Any pool not defined in the allocations file will have no guaranteed - capacity. Also, any pool or user with no max running jobs set in the file - will be allowed to run an unlimited number of jobs. + capacity and a weight of 1.0. Also, any pool or user with no max running + jobs set in the file will be allowed to run an unlimited number of jobs. mapred.fairscheduler.assignmultiple: Allows the scheduler to assign both a map task and a reduce task on each @@ -128,9 +141,11 @@ mapred.fairscheduler.poolnameproperty: Specify which jobconf property is used to determine the pool that a job - belongs in. String, default: queue.name (the same property as the queue - name in the Hadoop Resource Manager, JIRA 3445). You can use user.name - or group.name to base it on the Unix user or Unix group for example. + belongs in. String, default: user.name (i.e. one pool for each user). + Some other useful values to set this to are: + - group.name (to create a pool per Unix group). + - mapred.job.queue.name (the same property as the queue name in the + Capacity Scheduler, JIRA HADOOP-3445). mapred.fairscheduler.weightadjuster: An extensibility point that lets you specify a class to adjust the weights Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=732869&r1=732868&r2=732869&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original) +++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Thu Jan 8 15:12:19 2009 @@ -450,11 +450,41 @@ } private void updateWeights() { + // First, calculate raw weights for each job for (Map.Entry entry: infos.entrySet()) { JobInProgress job = entry.getKey(); JobInfo info = entry.getValue(); - info.mapWeight = calculateWeight(job, TaskType.MAP); - info.reduceWeight = calculateWeight(job, TaskType.REDUCE); + info.mapWeight = calculateRawWeight(job, TaskType.MAP); + info.reduceWeight = calculateRawWeight(job, TaskType.REDUCE); + } + // Now calculate job weight sums for each pool + Map mapWeightSums = new HashMap(); + Map reduceWeightSums = new HashMap(); + for (Pool pool: poolMgr.getPools()) { + double mapWeightSum = 0; + double reduceWeightSum = 0; + for (JobInProgress job: pool.getJobs()) { + if (isRunnable(job)) { + if (runnableTasks(job, TaskType.MAP) > 0) { + mapWeightSum += infos.get(job).mapWeight; + } + if (runnableTasks(job, TaskType.REDUCE) > 0) { + reduceWeightSum += infos.get(job).reduceWeight; + } + } + } + mapWeightSums.put(pool.getName(), mapWeightSum); + reduceWeightSums.put(pool.getName(), mapWeightSum); + } + // And normalize the weights based on pool sums and pool weights + // to share fairly across pools (proportional to their weights) + for (Map.Entry entry: infos.entrySet()) { + JobInProgress job = entry.getKey(); + JobInfo info = entry.getValue(); + String pool = poolMgr.getPoolName(job); + double poolWeight = poolMgr.getPoolWeight(pool); + info.mapWeight *= (poolWeight / mapWeightSums.get(pool)); + info.reduceWeight *= (poolWeight / reduceWeightSums.get(pool)); } } @@ -615,7 +645,7 @@ } } - private double calculateWeight(JobInProgress job, TaskType taskType) { + private double calculateRawWeight(JobInProgress job, TaskType taskType) { if (!isRunnable(job)) { return 0; } else { Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=732869&r1=732868&r2=732869&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (original) +++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Thu Jan 8 15:12:19 2009 @@ -60,6 +60,9 @@ // Map and reduce minimum allocations for each pool private Map mapAllocs = new HashMap(); private Map reduceAllocs = new HashMap(); + + // Sharing weights for each pool + private Map poolWeights = new HashMap(); // Max concurrent running jobs for each pool and for each user; in addition, // for users that have no max specified, we use the userMaxJobsDefault. @@ -80,7 +83,7 @@ public PoolManager(Configuration conf) throws IOException, SAXException, AllocationConfigurationException, ParserConfigurationException { this.poolNameProperty = conf.get( - "mapred.fairscheduler.poolnameproperty", "mapred.job.queue.name"); + "mapred.fairscheduler.poolnameproperty", "user.name"); this.allocFile = conf.get("mapred.fairscheduler.allocation.file"); if (allocFile == null) { LOG.warn("No mapred.fairscheduler.allocation.file given in jobconf - " + @@ -162,6 +165,7 @@ Map reduceAllocs = new HashMap(); Map poolMaxJobs = new HashMap(); Map userMaxJobs = new HashMap(); + Map poolWeights = new HashMap(); int userMaxJobsDefault = Integer.MAX_VALUE; // Remember all pool names so we can display them on web UI, etc. @@ -204,6 +208,10 @@ String text = ((Text)field.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); poolMaxJobs.put(poolName, val); + } else if ("weight".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + double val = Double.parseDouble(text); + poolWeights.put(poolName, val); } } } else if ("user".equals(element.getTagName())) { @@ -237,6 +245,7 @@ this.poolMaxJobs = poolMaxJobs; this.userMaxJobs = userMaxJobs; this.userMaxJobsDefault = userMaxJobsDefault; + this.poolWeights = poolWeights; for (String name: poolNamesInAllocFile) { getPool(name); } @@ -321,4 +330,12 @@ return Integer.MAX_VALUE; } } + + public double getPoolWeight(String pool) { + if (poolWeights.containsKey(pool)) { + return poolWeights.get(pool); + } else { + return 1.0; + } + } } Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=732869&r1=732868&r2=732869&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original) +++ hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Thu Jan 8 15:12:19 2009 @@ -759,25 +759,25 @@ // Check that minimum and fair shares have been allocated assertEquals(0, info1.minMaps); assertEquals(0, info1.minReduces); - assertEquals(1.33, info1.mapFairShare, 0.1); - assertEquals(1.33, info1.reduceFairShare, 0.1); + assertEquals(2, info1.mapFairShare, 0.1); + assertEquals(2, info1.reduceFairShare, 0.1); assertEquals(1, info2.minMaps); assertEquals(1, info2.minReduces); - assertEquals(1.33, info2.mapFairShare, 0.1); - assertEquals(1.33, info2.reduceFairShare, 0.1); + assertEquals(1, info2.mapFairShare, 0.1); + assertEquals(1, info2.reduceFairShare, 0.1); assertEquals(1, info3.minMaps); assertEquals(1, info3.minReduces); - assertEquals(1.33, info3.mapFairShare, 0.1); - assertEquals(1.33, info3.reduceFairShare, 0.1); + assertEquals(1, info3.mapFairShare, 0.1); + assertEquals(1, info3.reduceFairShare, 0.1); // Advance time 100ms and check deficits advanceTime(100); - assertEquals(1133, info1.mapDeficit, 1.0); - assertEquals(1133, info1.reduceDeficit, 1.0); - assertEquals(333, info2.mapDeficit, 1.0); - assertEquals(333, info2.reduceDeficit, 1.0); - assertEquals(133, info3.mapDeficit, 1.0); - assertEquals(133, info3.reduceDeficit, 1.0); + assertEquals(1200, info1.mapDeficit, 1.0); + assertEquals(1200, info1.reduceDeficit, 1.0); + assertEquals(300, info2.mapDeficit, 1.0); + assertEquals(300, info2.reduceDeficit, 1.0); + assertEquals(100, info3.mapDeficit, 1.0); + assertEquals(100, info3.reduceDeficit, 1.0); // Assign tasks and check that slots are first given to needy jobs, but // that job 1 gets two tasks after due to having a larger deficit. @@ -1036,26 +1036,29 @@ JobInfo info10 = scheduler.infos.get(job10); advanceTime(10); - // Check scheduler variables - double SHARE = 4.0 / 7.0; // We have 4 slots and 7 runnable jobs - assertEquals(SHARE, info1.mapFairShare, 0.1); - assertEquals(SHARE, info1.reduceFairShare, 0.1); + // Check scheduler variables. The jobs in poolA should get half + // the total share, while those in the default pool should get + // the other half. This works out to 2 slots each for the jobs + // in poolA and 1/3 each for the jobs in the default pool because + // there are 2 runnable jobs in poolA and 6 jobs in the default pool. + assertEquals(0.33, info1.mapFairShare, 0.1); + assertEquals(0.33, info1.reduceFairShare, 0.1); assertEquals(0.0, info2.mapFairShare); assertEquals(0.0, info2.reduceFairShare); - assertEquals(SHARE, info3.mapFairShare, 0.1); - assertEquals(SHARE, info3.reduceFairShare, 0.1); - assertEquals(SHARE, info4.mapFairShare, 0.1); - assertEquals(SHARE, info4.reduceFairShare, 0.1); - assertEquals(SHARE, info5.mapFairShare, 0.1); - assertEquals(SHARE, info5.reduceFairShare, 0.1); - assertEquals(SHARE, info6.mapFairShare, 0.1); - assertEquals(SHARE, info6.reduceFairShare, 0.1); - assertEquals(SHARE, info7.mapFairShare, 0.1); - assertEquals(SHARE, info7.reduceFairShare, 0.1); + assertEquals(0.33, info3.mapFairShare, 0.1); + assertEquals(0.33, info3.reduceFairShare, 0.1); + assertEquals(0.33, info4.mapFairShare, 0.1); + assertEquals(0.33, info4.reduceFairShare, 0.1); + assertEquals(0.33, info5.mapFairShare, 0.1); + assertEquals(0.33, info5.reduceFairShare, 0.1); + assertEquals(0.33, info6.mapFairShare, 0.1); + assertEquals(0.33, info6.reduceFairShare, 0.1); + assertEquals(0.33, info7.mapFairShare, 0.1); + assertEquals(0.33, info7.reduceFairShare, 0.1); assertEquals(0.0, info8.mapFairShare); assertEquals(0.0, info8.reduceFairShare); - assertEquals(SHARE, info9.mapFairShare, 0.1); - assertEquals(SHARE, info9.reduceFairShare, 0.1); + assertEquals(2.0, info9.mapFairShare, 0.1); + assertEquals(2.0, info9.reduceFairShare, 0.1); assertEquals(0.0, info10.mapFairShare); assertEquals(0.0, info10.reduceFairShare); } @@ -1094,6 +1097,60 @@ assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 1)); } + + /** + * This test submits jobs in three pools: poolA, which has a weight + * of 2.0; poolB, which has a weight of 0.5; and the default pool, which + * should have a weight of 1.0. It then checks that the map and reduce + * fair shares are given out accordingly. We then submit a second job to + * pool B and check that each gets half of the pool (weight of 0.25). + */ + public void testPoolWeights() throws Exception { + // Set up pools file + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("2.0"); + out.println(""); + out.println(""); + out.println("0.5"); + out.println(""); + out.println(""); + out.close(); + scheduler.getPoolManager().reloadAllocs(); + + // Submit jobs, advancing time in-between to make sure that they are + // all submitted at distinct times. + JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); + JobInfo info1 = scheduler.infos.get(job1); + JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA"); + JobInfo info2 = scheduler.infos.get(job2); + JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB"); + JobInfo info3 = scheduler.infos.get(job3); + advanceTime(10); + + assertEquals(1.14, info1.mapFairShare, 0.01); + assertEquals(1.14, info1.reduceFairShare, 0.01); + assertEquals(2.28, info2.mapFairShare, 0.01); + assertEquals(2.28, info2.reduceFairShare, 0.01); + assertEquals(0.57, info3.mapFairShare, 0.01); + assertEquals(0.57, info3.reduceFairShare, 0.01); + + JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "poolB"); + JobInfo info4 = scheduler.infos.get(job4); + advanceTime(10); + + assertEquals(1.14, info1.mapFairShare, 0.01); + assertEquals(1.14, info1.reduceFairShare, 0.01); + assertEquals(2.28, info2.mapFairShare, 0.01); + assertEquals(2.28, info2.reduceFairShare, 0.01); + assertEquals(0.28, info3.mapFairShare, 0.01); + assertEquals(0.28, info3.reduceFairShare, 0.01); + assertEquals(0.28, info4.mapFairShare, 0.01); + assertEquals(0.28, info4.reduceFairShare, 0.01); + } + private void advanceTime(long time) { clock.advance(time); scheduler.update();