Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 13165 invoked from network); 12 Jan 2009 20:43:00 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 12 Jan 2009 20:43:00 -0000 Received: (qmail 61585 invoked by uid 500); 12 Jan 2009 20:42:59 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 61553 invoked by uid 500); 12 Jan 2009 20:42:59 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 61543 invoked by uid 99); 12 Jan 2009 20:42:59 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Jan 2009 12:42:59 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Jan 2009 20:42:58 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9F25E2388896; Mon, 12 Jan 2009 12:42:38 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r733895 - in /hadoop/core/trunk/src/contrib/fairscheduler/src: java/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/ Date: Mon, 12 Jan 2009 20:42:34 -0000 To: core-commits@hadoop.apache.org From: matei@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090112204238.9F25E2388896@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: matei Date: Mon Jan 12 12:42:29 2009 New Revision: 733895 URL: http://svn.apache.org/viewvc?rev=733895&view=rev Log: HADOOP-4943. Fair share scheduler does not utilize all slots if the task trackers are configured heterogeneously. Contributed by Zheng Shao. Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java?rev=733895&r1=733894&r2=733895&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java (original) +++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java Mon Jan 12 12:42:29 2009 @@ -31,24 +31,22 @@ * out uniformly across the nodes rather than being clumped up on whichever * machines sent out heartbeats earliest. */ - int getCap(TaskTrackerStatus tracker, - int totalRunnableTasks, int localMaxTasks) { - int numTaskTrackers = taskTrackerManager.taskTrackers().size(); - return Math.min(localMaxTasks, - (int) Math.ceil((double) totalRunnableTasks / numTaskTrackers)); + int getCap(int totalRunnableTasks, int localMaxTasks, int totalSlots) { + double load = ((double)totalRunnableTasks) / totalSlots; + return (int) Math.ceil(localMaxTasks * Math.min(1.0, load)); } @Override public boolean canAssignMap(TaskTrackerStatus tracker, - int totalRunnableMaps) { - return tracker.countMapTasks() < getCap(tracker, totalRunnableMaps, - tracker.getMaxMapTasks()); + int totalRunnableMaps, int totalMapSlots) { + return tracker.countMapTasks() < getCap(totalRunnableMaps, + tracker.getMaxMapTasks(), totalMapSlots); } @Override public boolean canAssignReduce(TaskTrackerStatus tracker, - int totalRunnableReduces) { - return tracker.countReduceTasks() < getCap(tracker, totalRunnableReduces, - tracker.getMaxReduceTasks()); + int totalRunnableReduces, int totalReduceSlots) { + return tracker.countReduceTasks() < getCap(totalRunnableReduces, + tracker.getMaxReduceTasks(), totalReduceSlots); } } Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=733895&r1=733894&r2=733895&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original) +++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Mon Jan 12 12:42:29 2009 @@ -232,14 +232,20 @@ runnableMaps += runnableTasks(job, TaskType.MAP); runnableReduces += runnableTasks(job, TaskType.REDUCE); } + + // Compute total map/reduce slots + // In the future we can precompute this if the Scheduler becomes a + // listener of tracker join/leave events. + int totalMapSlots = getTotalSlots(TaskType.MAP); + int totalReduceSlots = getTotalSlots(TaskType.REDUCE); // Scan to see whether any job needs to run a map, then a reduce ArrayList tasks = new ArrayList(); TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE}; for (TaskType taskType: types) { boolean canAssign = (taskType == TaskType.MAP) ? - loadMgr.canAssignMap(tracker, runnableMaps) : - loadMgr.canAssignReduce(tracker, runnableReduces); + loadMgr.canAssignMap(tracker, runnableMaps, totalMapSlots) : + loadMgr.canAssignReduce(tracker, runnableReduces, totalReduceSlots); if (canAssign) { // Figure out the jobs that need this type of task List candidates = new ArrayList(); Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java?rev=733895&r1=733894&r2=733895&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java (original) +++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java Mon Jan 12 12:42:29 2009 @@ -63,17 +63,19 @@ * Can a given {@link TaskTracker} run another map task? * @param tracker The machine we wish to run a new map on * @param totalRunnableMaps Set of running jobs in the cluster + * @param totalMapSlots The total number of map slots in the cluster * @return true if another map can be launched on tracker */ public abstract boolean canAssignMap(TaskTrackerStatus tracker, - int totalRunnableMaps); + int totalRunnableMaps, int totalMapSlots); /** * Can a given {@link TaskTracker} run another reduce task? * @param tracker The machine we wish to run a new map on - * @param totalReducesNeeded Set of running jobs in the cluster + * @param totalRunnableReduces Set of running jobs in the cluster + * @param totalReduceSlots The total number of reduce slots in the cluster * @return true if another reduce can be launched on tracker */ public abstract boolean canAssignReduce(TaskTrackerStatus tracker, - int totalRunnableReduces); + int totalRunnableReduces, int totalReduceSlots); } Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=733895&r1=733894&r2=733895&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original) +++ hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Mon Jan 12 12:42:29 2009 @@ -1150,6 +1150,24 @@ assertEquals(0.28, info4.mapFairShare, 0.01); assertEquals(0.28, info4.reduceFairShare, 0.01); } + + /** + * Tests that max-running-tasks per node are set by assigning load + * equally accross the cluster in CapBasedLoadManager. + */ + public void testCapBasedLoadManager() { + CapBasedLoadManager loadMgr = new CapBasedLoadManager(); + // Arguments to getCap: totalRunnableTasks, nodeCap, totalSlots + // Desired behavior: return ceil(nodeCap * min(1, runnableTasks/totalSlots)) + assertEquals(1, loadMgr.getCap(1, 1, 100)); + assertEquals(1, loadMgr.getCap(1, 2, 100)); + assertEquals(1, loadMgr.getCap(1, 10, 100)); + assertEquals(1, loadMgr.getCap(200, 1, 100)); + assertEquals(1, loadMgr.getCap(1, 5, 100)); + assertEquals(3, loadMgr.getCap(50, 5, 100)); + assertEquals(5, loadMgr.getCap(100, 5, 100)); + assertEquals(5, loadMgr.getCap(200, 5, 100)); + } private void advanceTime(long time) { clock.advance(time);