Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 29389 invoked from network); 1 Jul 2010 23:10:40 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 1 Jul 2010 23:10:40 -0000 Received: (qmail 55361 invoked by uid 500); 1 Jul 2010 23:10:40 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 55310 invoked by uid 500); 1 Jul 2010 23:10:39 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 55301 invoked by uid 99); 1 Jul 2010 23:10:39 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Jul 2010 23:10:39 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 01 Jul 2010 23:10:36 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 6F52823888E8; Thu, 1 Jul 2010 23:09:13 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r959808 - in /hadoop/mapreduce/branches/branch-0.21: CHANGES.txt src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Date: Thu, 01 Jul 2010 23:09:13 -0000 To: mapreduce-commits@hadoop.apache.org From: matei@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100701230913.6F52823888E8@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: matei Date: Thu Jul 1 23:09:12 2010 New Revision: 959808 URL: http://svn.apache.org/viewvc?rev=959808&view=rev Log: MAPREDUCE-1845. FairScheduler.tasksToPreempt() can return negative number. Contributed by Scott Chen. Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt hadoop/mapreduce/branches/branch-0.21/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java hadoop/mapreduce/branches/branch-0.21/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=959808&r1=959807&r2=959808&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original) +++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Thu Jul 1 23:09:12 2010 @@ -703,6 +703,9 @@ Release 0.21.0 - Unreleased BUG FIXES + MAPREDUCE-1845. FairScheduler.tasksToPreempt() can return negative number. + (Scott Chen via matei) + MAPREDUCE-1791. Remote cluster control functionality needs JavaDocs improvement (Konstantin Boudnik) Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=959808&r1=959807&r2=959808&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Thu Jul 1 23:09:12 2010 @@ -840,11 +840,11 @@ public class FairScheduler extends TaskS int tasksDueToFairShare = 0; if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { int target = Math.min(sched.getMinShare(), sched.getDemand()); - tasksDueToMinShare = target - sched.getRunningTasks(); + tasksDueToMinShare = Math.max(0, target - sched.getRunningTasks()); } if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { int target = (int) Math.min(sched.getFairShare(), sched.getDemand()); - tasksDueToFairShare = target - sched.getRunningTasks(); + tasksDueToFairShare = Math.max(0, target - sched.getRunningTasks()); } int tasksToPreempt = Math.max(tasksDueToMinShare, tasksDueToFairShare); if (tasksToPreempt > 0) { Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=959808&r1=959807&r2=959808&view=diff ============================================================================== --- hadoop/mapreduce/branches/branch-0.21/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original) +++ hadoop/mapreduce/branches/branch-0.21/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Thu Jul 1 23:09:12 2010 @@ -2620,4 +2620,81 @@ public class TestFairScheduler extends T for (int i = 0; i < tasks.size(); i++) assertEquals("assignment " + i, expectedTasks[i], tasks.get(i).toString()); } + + /** + * This test submits a job that takes all 2 slots in a pool has both a min + * share of 2 slots with minshare timeout of 5s, and then a second job in + * default pool with a fair share timeout of 5s. After 60 seconds, this pool + * will be starved of fair share (2 slots of each type), and we test that it + * does not kill more than 2 tasks of each type. + */ + public void testFairSharePreemptionWithShortTimeout() throws Exception { + // Enable preemption in scheduler + scheduler.preemptionEnabled = true; + // Set up pools file + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println("5"); + out.println(""); + out.println("2"); + out.println("2"); + out.println("5"); + out.println(""); + out.println(""); + out.close(); + scheduler.getPoolManager().reloadAllocs(); + Pool pool1 = scheduler.getPoolManager().getPool("pool1"); + Pool defaultPool = scheduler.getPoolManager().getPool("default"); + + // Submit job 1 and assign all slots to it. Sleep a bit before assigning + // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first. + JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10, "pool1"); + JobInfo info1 = scheduler.infos.get(job1); + checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); + advanceTime(100); + checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2"); + + advanceTime(10000); + assertEquals(4, info1.mapSchedulable.getRunningTasks()); + assertEquals(4, info1.reduceSchedulable.getRunningTasks()); + assertEquals(4.0, info1.mapSchedulable.getFairShare()); + assertEquals(4.0, info1.reduceSchedulable.getFairShare()); + // Ten seconds later, submit job 2. + JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "default"); + + // Advance time by 6 seconds without update the scheduler. + // This simulates the time gap between update and task preemption. + clock.advance(6000); + assertEquals(4, info1.mapSchedulable.getRunningTasks()); + assertEquals(4, info1.reduceSchedulable.getRunningTasks()); + assertEquals(2.0, info1.mapSchedulable.getFairShare()); + assertEquals(2.0, info1.reduceSchedulable.getFairShare()); + assertEquals(0, scheduler.tasksToPreempt(pool1.getMapSchedulable(), + clock.getTime())); + assertEquals(0, scheduler.tasksToPreempt(pool1.getReduceSchedulable(), + clock.getTime())); + assertEquals(2, scheduler.tasksToPreempt(defaultPool.getMapSchedulable(), + clock.getTime())); + assertEquals(2, scheduler.tasksToPreempt(defaultPool.getReduceSchedulable(), + clock.getTime())); + + // Test that the tasks actually get preempted and we can assign new ones + scheduler.preemptTasksIfNecessary(); + scheduler.update(); + assertEquals(2, job1.runningMaps()); + assertEquals(2, job1.runningReduces()); + checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2"); + checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2"); + checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); + checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); + assertNull(scheduler.assignTasks(tracker("tt1"))); + assertNull(scheduler.assignTasks(tracker("tt2"))); + } }