Return-Path: Delivered-To: apmail-hadoop-core-commits-archive@www.apache.org Received: (qmail 4804 invoked from network); 5 May 2009 07:35:48 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 5 May 2009 07:35:48 -0000 Received: (qmail 98850 invoked by uid 500); 5 May 2009 07:35:47 -0000 Delivered-To: apmail-hadoop-core-commits-archive@hadoop.apache.org Received: (qmail 98772 invoked by uid 500); 5 May 2009 07:35:47 -0000 Mailing-List: contact core-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-commits@hadoop.apache.org Received: (qmail 98763 invoked by uid 99); 5 May 2009 07:35:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2009 07:35:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 05 May 2009 07:35:43 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 2DCAE23889D9; Tue, 5 May 2009 07:30:21 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r771607 [2/2] - in /hadoop/core/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ Date: Tue, 05 May 2009 07:30:20 -0000 To: core-commits@hadoop.apache.org From: yhemanth@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090505073021.2DCAE23889D9@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=771607&r1=771606&r2=771607&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original) +++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue May 5 07:30:20 2009 @@ -500,15 +500,12 @@ static class FakeQueueInfo { String queueName; float gc; - int reclaimTimeLimit; boolean supportsPrio; int ulMin; - public FakeQueueInfo(String queueName, float gc, - int reclaimTimeLimit, boolean supportsPrio, int ulMin) { + public FakeQueueInfo(String queueName, float gc, boolean supportsPrio, int ulMin) { this.queueName = queueName; this.gc = gc; - this.reclaimTimeLimit = reclaimTimeLimit; this.supportsPrio = supportsPrio; this.ulMin = ulMin; } @@ -521,7 +518,6 @@ new LinkedHashMap(); String firstQueue; - private long reclaimCapacityInterval = 1000; void setFakeQueues(List queues) { for (FakeQueueInfo q: queues) { @@ -538,17 +534,13 @@ return firstQueue; }*/ - public float getGuaranteedCapacity(String queue) { + public float getCapacity(String queue) { if(queueMap.get(queue).gc == -1) { - return super.getGuaranteedCapacity(queue); + return super.getCapacity(queue); } return queueMap.get(queue).gc; } - public int getReclaimTimeLimit(String queue) { - return queueMap.get(queue).reclaimTimeLimit; - } - public int getMinimumUserLimitPercent(String queue) { return queueMap.get(queue).ulMin; } @@ -566,16 +558,6 @@ public int getMaxWorkerThreads() { return 1; } - - @Override - public long getReclaimCapacityInterval() { - return reclaimCapacityInterval ; - } - - @Override - public void setReclaimCapacityInterval(long value) { - this.reclaimCapacityInterval = value; - } } protected class FakeClock extends CapacityTaskScheduler.Clock { @@ -684,7 +666,7 @@ // start the scheduler taskTrackerManager.addQueues(new String[] {"default"}); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, 1, true, 1)); + queues.add(new FakeQueueInfo("default", 100.0f, true, 1)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); @@ -821,7 +803,7 @@ taskTrackerManager.addQueues(new String[] {"default"}); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 50.0f, 10, true, 25)); + queues.add(new FakeQueueInfo("default", 50.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); @@ -876,8 +858,8 @@ String[] qs = {"default", "q2"}; taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25)); - queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25)); + queues.add(new FakeQueueInfo("default", 50.0f, true, 25)); + queues.add(new FakeQueueInfo("q2", 50.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); @@ -900,7 +882,7 @@ String[] qs = { "default" }; taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, 300, true, 100)); + queues.add(new FakeQueueInfo("default", 100.0f, true, 100)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); @@ -923,29 +905,29 @@ subJobsList.get("u1").containsAll(jobs)); } - //Basic test to test GC allocation across the queues which have no - //GC configured. + //Basic test to test capacity allocation across the queues which have no + //capacity configured. - public void testGCAllocationToQueues() throws Exception { + public void testCapacityAllocationToQueues() throws Exception { String[] qs = {"default","q1","q2","q3","q4"}; taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default",25.0f,5000,true,25)); - queues.add(new FakeQueueInfo("q1",-1.0f,5000,true,25)); - queues.add(new FakeQueueInfo("q2",-1.0f,5000,true,25)); - queues.add(new FakeQueueInfo("q3",-1.0f,5000,true,25)); - queues.add(new FakeQueueInfo("q4",-1.0f,5000,true,25)); + queues.add(new FakeQueueInfo("default",25.0f,true,25)); + queues.add(new FakeQueueInfo("q1",-1.0f,true,25)); + queues.add(new FakeQueueInfo("q2",-1.0f,true,25)); + queues.add(new FakeQueueInfo("q3",-1.0f,true,25)); + queues.add(new FakeQueueInfo("q4",-1.0f,true,25)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); - assertEquals(18.75f, resConf.getGuaranteedCapacity("q1")); - assertEquals(18.75f, resConf.getGuaranteedCapacity("q2")); - assertEquals(18.75f, resConf.getGuaranteedCapacity("q3")); - assertEquals(18.75f, resConf.getGuaranteedCapacity("q4")); + assertEquals(18.75f, resConf.getCapacity("q1")); + assertEquals(18.75f, resConf.getCapacity("q2")); + assertEquals(18.75f, resConf.getCapacity("q3")); + assertEquals(18.75f, resConf.getCapacity("q4")); } - // Tests how GC is computed and assignment of tasks done - // on the basis of the GC. + // Tests how capacity is computed and assignment of tasks done + // on the basis of the capacity. public void testCapacityBasedAllocation() throws Exception { // set up some queues String[] qs = {"default", "q2"}; @@ -953,8 +935,8 @@ ArrayList queues = new ArrayList(); // set the gc % as 10%, so that gc will be zero initially as // the cluster capacity increase slowly. - queues.add(new FakeQueueInfo("default", 10.0f, 5000, true, 25)); - queues.add(new FakeQueueInfo("q2", 90.0f, 5000, true, 25)); + queues.add(new FakeQueueInfo("default", 10.0f, true, 25)); + queues.add(new FakeQueueInfo("q2", 90.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); @@ -967,35 +949,35 @@ // job from q2 runs first because it has some non-zero capacity. checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); - verifyGuaranteedCapacity("0", "default"); - verifyGuaranteedCapacity("3", "q2"); + verifyCapacity("0", "default"); + verifyCapacity("3", "q2"); // add another tt to increase tt slots taskTrackerManager.addTaskTracker("tt3"); checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2"); - verifyGuaranteedCapacity("0", "default"); - verifyGuaranteedCapacity("5", "q2"); + verifyCapacity("0", "default"); + verifyCapacity("5", "q2"); // add another tt to increase tt slots taskTrackerManager.addTaskTracker("tt4"); checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3"); - verifyGuaranteedCapacity("0", "default"); - verifyGuaranteedCapacity("7", "q2"); + verifyCapacity("0", "default"); + verifyCapacity("7", "q2"); // add another tt to increase tt slots taskTrackerManager.addTaskTracker("tt5"); // now job from default should run, as it is furthest away // in terms of runningMaps / gc. checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4"); - verifyGuaranteedCapacity("1", "default"); - verifyGuaranteedCapacity("9", "q2"); + verifyCapacity("1", "default"); + verifyCapacity("9", "q2"); } - private void verifyGuaranteedCapacity(String expectedCapacity, + private void verifyCapacity(String expectedCapacity, String queue) throws IOException { String schedInfo = taskTrackerManager.getQueueManager(). - getSchedulerInfo(queue).toString(); - assertTrue(schedInfo.contains("Map tasks\nGuaranteed Capacity: " + getSchedulerInfo(queue).toString(); + assertTrue(schedInfo.contains("Map tasks\nCapacity: " + expectedCapacity)); } @@ -1005,15 +987,15 @@ String[] qs = {"default", "q2"}; taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25)); - queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25)); + queues.add(new FakeQueueInfo("default", 50.0f, true, 25)); + queues.add(new FakeQueueInfo("q2", 50.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); // submit a job submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1"); - // for queue 'q2', the GC for maps is 2. Since we're the only user, + // for queue 'q2', the capacity for maps is 2. Since we're the only user, // we should get a task checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); // I should get another map task. @@ -1031,15 +1013,15 @@ String[] qs = {"default", "q2"}; taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25)); - queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25)); + queues.add(new FakeQueueInfo("default", 50.0f, true, 25)); + queues.add(new FakeQueueInfo("q2", 50.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); // submit a job submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1"); - // for queue 'q2', the GC for maps is 2. Since we're the only user, + // for queue 'q2', the capacity for maps is 2. Since we're the only user, // we should get a task checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); // Submit another job, from a different user @@ -1059,15 +1041,15 @@ String[] qs = {"default", "q2"}; taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25)); - queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25)); + queues.add(new FakeQueueInfo("default", 50.0f, true, 25)); + queues.add(new FakeQueueInfo("q2", 50.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); // submit a job submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1"); - // for queue 'q2', the GC for maps is 2. Since we're the only user, + // for queue 'q2', the capacity for maps is 2. Since we're the only user, // we should get a task checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); // since we're the only job, we get another map @@ -1087,15 +1069,15 @@ String[] qs = {"default", "q2"}; taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25)); - queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25)); + queues.add(new FakeQueueInfo("default", 50.0f, true, 25)); + queues.add(new FakeQueueInfo("q2", 50.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); // submit a job FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1"); - // for queue 'q2', the GC for maps is 2. Since we're the only user, + // for queue 'q2', the capacity for maps is 2. Since we're the only user, // we should get a task checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); // since we're the only job, we get another map @@ -1126,7 +1108,7 @@ String[] qs = {"default"}; taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, 10000, true, 25)); + queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); @@ -1177,283 +1159,7 @@ // first in the queue checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4"); } - - // test code to reclaim capacity - public void testReclaimCapacity() throws Exception { - // set up some queues - String[] qs = {"default", "q2", "q3"}; - taskTrackerManager.addQueues(qs); - ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25)); - queues.add(new FakeQueueInfo("q2", 25.0f, 1000, true, 25)); - queues.add(new FakeQueueInfo("q3", 25.0f, 1000, true, 25)); - resConf.setFakeQueues(queues); - resConf.setReclaimCapacityInterval(500); - scheduler.setResourceManagerConf(resConf); - scheduler.start(); - - // set up a situation where q2 is under capacity, and default & q3 - // are at/over capacity - FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1"); - FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1"); - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); - checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); - checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); - // now submit a job to q2 - FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1"); - // get scheduler to notice that q2 needs to reclaim - scheduler.reclaimCapacity(); - // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so - // we start reclaiming when 15 secs are left. - clock.advance(400000); - scheduler.reclaimCapacity(); - // no tasks should have been killed yet - assertEquals(j1.runningMapTasks, 3); - assertEquals(j2.runningMapTasks, 1); - clock.advance(200000); - scheduler.reclaimCapacity(); - // task from j1 will be killed - assertEquals(j1.runningMapTasks, 2); - assertEquals(j2.runningMapTasks, 1); - - } - - // test code to reclaim multiple capacity - public void testReclaimCapacity2() throws Exception { - // set up some queues - String[] qs = {"default", "q2", "q3", "q4"}; - taskTrackerManager.addQueues(qs); - ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25)); - queues.add(new FakeQueueInfo("q2", 20.0f, 1000, true, 25)); - queues.add(new FakeQueueInfo("q3", 20.0f, 1000, true, 25)); - queues.add(new FakeQueueInfo("q4", 10.0f, 1000, true, 25)); - resConf.setFakeQueues(queues); - resConf.setReclaimCapacityInterval(500); - scheduler.setResourceManagerConf(resConf); - scheduler.start(); - - // add some more TTs so our total map capacity is 10 - taskTrackerManager.addTaskTracker("tt3"); - taskTrackerManager.addTaskTracker("tt4"); - taskTrackerManager.addTaskTracker("tt5"); - - // q2 has nothing running, default is under cap, q3 and q4 are over cap - FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 2, 2, null, "u1"); - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); - FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1"); - checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1"); - FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q4", "u1"); - checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); - checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3"); - checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3"); - checkAssignment("tt4", "attempt_test_0003_m_000002_0 on tt4"); - checkAssignment("tt4", "attempt_test_0002_m_000004_0 on tt4"); - checkAssignment("tt5", "attempt_test_0002_m_000005_0 on tt5"); - checkAssignment("tt5", "attempt_test_0003_m_000003_0 on tt5"); - // at this point, q3 is running 5 tasks (with a cap of 2), q4 is - // running 3 tasks (with a cap of 1). - // If we submit a job to 'default', we need to get 3 slots back. - FakeJobInProgress j4 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1"); - // get scheduler to notice that q2 needs to reclaim - scheduler.reclaimCapacity(); - // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so - // we start reclaiming when 15 secs are left. - clock.advance(400000); - scheduler.reclaimCapacity(); - // nothing should have happened - assertEquals(j2.runningMapTasks, 5); - assertEquals(j3.runningMapTasks, 3); - // 3 tasks to kill, 5 running over cap. q3 should give up 3*3/5 = 2 slots. - // q4 should give up 2*3/5 = 1 slot. - clock.advance(200000); - scheduler.reclaimCapacity(); - assertEquals(j2.runningMapTasks, 3); - assertEquals(j3.runningMapTasks, 2); - - } - - // test code to reclaim capacity when the cluster is completely occupied - public void testReclaimCapacityWithFullCluster() throws Exception { - // set up some queues - String[] qs = {"default", "queue"}; - taskTrackerManager.addQueues(qs); - int maxSlots = taskTrackerManager.maxMapTasksPerTracker - * taskTrackerManager.taskTrackers().size(); - ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25)); - queues.add(new FakeQueueInfo("queue", 50.0f, 1000, true, 25)); - resConf.setFakeQueues(queues); - resConf.setReclaimCapacityInterval(500); - scheduler.setResourceManagerConf(resConf); - scheduler.start(); - - // now submit 1 job to queue "default" which should take up the cluster - FakeJobInProgress j1 = - submitJobAndInit(JobStatus.PREP, maxSlots, 0, "default", "u1"); - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); - checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); - checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1"); - checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2"); - - // now submit a job to queue "queue" - submitJobAndInit(JobStatus.PREP, maxSlots, 0, "queue", "u2"); - - scheduler.reclaimCapacity(); - - clock.advance(scheduler.schedConf.getReclaimTimeLimit("default") * 1000); - - scheduler.reclaimCapacity(); - - // check if the tasks are killed - assertEquals("Failed to reclaim tasks", j1.runningMapTasks, 2); - } - // test code to reclaim capacity in steps - public void testReclaimCapacityInSteps() throws Exception { - // set up some queues - String[] qs = {"default", "q2"}; - taskTrackerManager.addQueues(qs); - ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25)); - queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25)); - resConf.setFakeQueues(queues); - resConf.setReclaimCapacityInterval(500); - scheduler.setResourceManagerConf(resConf); - scheduler.start(); - - // set up a situation where q2 is under capacity, and default is - // at/over capacity - FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1"); - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); - checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1"); - checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2"); - // now submit a job to q2 - FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 1, 1, "q2", "u1"); - // get scheduler to notice that q2 needs to reclaim - scheduler.reclaimCapacity(); - // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so - // we start reclaiming when 15 secs are left. - clock.advance(400000); - // submit another job to q2 which causes more capacity to be reclaimed - j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2"); - clock.advance(200000); - scheduler.reclaimCapacity(); - // one task from j1 will be killed - assertEquals(j1.runningMapTasks, 3); - clock.advance(300000); - scheduler.reclaimCapacity(); - // timer for 2nd job hasn't fired, so nothing killed - assertEquals(j1.runningMapTasks, 3); - clock.advance(400000); - scheduler.reclaimCapacity(); - // one task from j1 will be killed - assertEquals(j1.runningMapTasks, 2); - - } - - /* - * Test case for checking the reclaim capacity with uninitalized jobs. - * - * Configure 2 queue with capacity scheduler. - * - * Submit a single job to the default queue and make it go above the gc - * of the queue. - * - * Then submit another job to the second queue but don't initialize it. - * - * Run reclaim capacity thread for the scheduler, in order to let scheduler - * know that it has to reclaim capacity. - * - * Advance the scheduler clock by appropriate milliseconds. - * - * Run scheduler.reclaimCapacity() to kill the appropriate tasks. - * - * Check running task count of the running job. - * - */ - public void testReclaimCapacityWithUninitializedJobs() throws IOException { - String[] qs = {"default", "q2"}; - taskTrackerManager.addQueues(qs); - ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25)); - queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25)); - resConf.setFakeQueues(queues); - scheduler.setResourceManagerConf(resConf); - scheduler.start(); - - //Submit one job to the default queue and get the capacity over the - //gc of the particular queue. - FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1"); - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); - checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1"); - checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2"); - - //Submit another job to the second queue but not initialize it. - submitJob(JobStatus.PREP, 10, 10, "q2", "u1"); - - //call scheduler's reclaim capacity in order to start reclaim capacity - //process. - scheduler.reclaimCapacity(); - //advance the clock to the position when the two task of the job would - //be killed. - clock.advance(600000); - //run reclaim capacity - scheduler.reclaimCapacity(); - //check the count of the running tasks. - assertEquals(j1.runningMapTasks, 2); - - } - - // test code to reclaim capacity with one queue haveing zero GC - // (HADOOP-4988). - // Simple test: reclaim capacity should work even if one of the - // queues has a gc of 0. - public void testReclaimCapacityWithZeroGC() throws Exception { - // set up some queues - String[] qs = {"default", "q2", "q3"}; - taskTrackerManager.addQueues(qs); - ArrayList queues = new ArrayList(); - // we want q3 to have 0 GC. Map slots = 4. - queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25)); - queues.add(new FakeQueueInfo("q2", 40.0f, 1000, true, 25)); - queues.add(new FakeQueueInfo("q3", 10.0f, 1000, true, 25)); - // note: because of the way we convert gc% into actual gc, q2's gc - // will be 1, not 2. - resConf.setFakeQueues(queues); - resConf.setReclaimCapacityInterval(500); - scheduler.setResourceManagerConf(resConf); - scheduler.start(); - - // set up a situation where q2 is under capacity, and default - // is over capacity - FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1"); - //FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1"); - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); - checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1"); - checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2"); - // now submit a job to q2 - FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1"); - // get scheduler to notice that q2 needs to reclaim - scheduler.reclaimCapacity(); - // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so - // we start reclaiming when 15 secs are left. - clock.advance(400000); - scheduler.reclaimCapacity(); - // no tasks should have been killed yet - assertEquals(j1.runningMapTasks, 4); - clock.advance(200000); - scheduler.reclaimCapacity(); - // task from j1 will be killed - assertEquals(j1.runningMapTasks, 3); - - } - /* * Following is the testing strategy for testing scheduling information. * - start capacity scheduler with two queues. @@ -1477,9 +1183,9 @@ * - Now fail a job which has not been initialized at all. * - Run the poller, so that it can clean up the job queue. * - Check the count, the waiting job count should be 2. - * - Now raise status change events to move the initialized jobs which + * - Now raise status change events to move the initialized jobs which * should be two in count to running queue. - * - Then schedule a map of the job in running queue. + * - Then schedule a map of the job in running queue. * - Run the poller because the poller is responsible for waiting * jobs count. Check the count, it should be using 100% map and one * waiting job @@ -1487,15 +1193,15 @@ * - Check the count, it should be now one waiting job and zero running * tasks */ - + public void testSchedulingInformation() throws Exception { String[] qs = {"default", "q2"}; taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1); scheduler.setTaskTrackerManager(taskTrackerManager); taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25)); - queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25)); + queues.add(new FakeQueueInfo("default", 50.0f, true, 25)); + queues.add(new FakeQueueInfo("q2", 50.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); @@ -1508,167 +1214,164 @@ String schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo(); String[] infoStrings = schedulingInfo.split("\n"); - - assertEquals(infoStrings.length, 17); - assertEquals(infoStrings[1] , "Guaranteed Capacity Percentage: 50.0%"); - assertEquals(infoStrings[7] , "Guaranteed Capacity: " + totalMaps * 50/100); - assertEquals(infoStrings[11] , "Guaranteed Capacity: " + totalReduces * 50/100); + assertEquals(infoStrings.length, 16); + assertEquals(infoStrings[1] , "Capacity Percentage: 50.0%"); + assertEquals(infoStrings[6] , "Capacity: " + totalMaps * 50/100); + assertEquals(infoStrings[10] , "Capacity: " + totalReduces * 50/100); assertEquals(infoStrings[2] , "User Limit: 25%"); - assertEquals(infoStrings[3] , "Reclaim Time limit: " + - StringUtils.formatTime(1000000)); - assertEquals(infoStrings[4] , "Priority Supported: YES"); - assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[15] , "Number of Waiting Jobs: 0"); - assertEquals(infoStrings[16] , "Number of users who have submitted jobs: 0"); + assertEquals(infoStrings[3] , "Priority Supported: YES"); + assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[14] , "Number of Waiting Jobs: 0"); + assertEquals(infoStrings[15] , "Number of users who have submitted jobs: 0"); assertEquals(schedulingInfo, schedulingInfo2); - + //Testing with actual job submission. - ArrayList userJobs = + ArrayList userJobs = submitJobs(1, 5, "default").get("u1"); - schedulingInfo = + schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - + //waiting job should be equal to number of jobs submitted. - assertEquals(infoStrings.length, 17); - assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5"); - + assertEquals(infoStrings.length, 16); + assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5"); + //Initalize the jobs but don't raise events controlledInitializationPoller.selectJobsToInitialize(); - - schedulingInfo = + + schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 17); + assertEquals(infoStrings.length, 16); //should be previous value as nothing is scheduled because no events //has been raised after initialization. - assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5"); - + assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5"); + //Raise status change event so that jobs can move to running queue. raiseStatusChangeEvents(scheduler.jobQueuesManager); //assign one job Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); //Initalize extra job. controlledInitializationPoller.selectJobsToInitialize(); - + //Get scheduling information, now the number of waiting job should have //changed to 4 as one is scheduled and has become running. // make sure we update our stats scheduler.updateQSIInfoForTests(); - schedulingInfo = + schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 19); - assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity"); - assertEquals(infoStrings[14],"Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[17] , "Number of Waiting Jobs: 4"); - + assertEquals(infoStrings.length, 18); + assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity"); + assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[16] , "Number of Waiting Jobs: 4"); + //assign a reduce task Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); // make sure we update our stats scheduler.updateQSIInfoForTests(); - schedulingInfo = + schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 21); - assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity"); - assertEquals(infoStrings[14],"Running tasks: 100.0% of Guaranteed Capacity"); - assertEquals(infoStrings[19] , "Number of Waiting Jobs: 4"); - + assertEquals(infoStrings.length, 20); + assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity"); + assertEquals(infoStrings[13],"Running tasks: 100.0% of Capacity"); + assertEquals(infoStrings[18] , "Number of Waiting Jobs: 4"); + //Complete the job and check the running tasks count FakeJobInProgress u1j1 = userJobs.get(0); taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1j1); taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1j1); taskTrackerManager.finalizeJob(u1j1); - + // make sure we update our stats scheduler.updateQSIInfoForTests(); - schedulingInfo = + schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 17); - assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[15] , "Number of Waiting Jobs: 4"); - + assertEquals(infoStrings.length, 16); + assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[14] , "Number of Waiting Jobs: 4"); + //Fail a job which is initialized but not scheduled and check the count. FakeJobInProgress u1j2 = userJobs.get(1); - assertTrue("User1 job 2 not initalized ", + assertTrue("User1 job 2 not initalized ", u1j2.getStatus().getRunState() == JobStatus.RUNNING); taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED); //Run initializer to clean up failed jobs controlledInitializationPoller.selectJobsToInitialize(); // make sure we update our stats scheduler.updateQSIInfoForTests(); - schedulingInfo = + schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 17); - assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[15] , "Number of Waiting Jobs: 3"); - + assertEquals(infoStrings.length, 16); + assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[14] , "Number of Waiting Jobs: 3"); + //Fail a job which is not initialized but is in the waiting queue. FakeJobInProgress u1j5 = userJobs.get(4); - assertFalse("User1 job 5 initalized ", + assertFalse("User1 job 5 initalized ", u1j5.getStatus().getRunState() == JobStatus.RUNNING); - + taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED); //run initializer to clean up failed job controlledInitializationPoller.selectJobsToInitialize(); // make sure we update our stats scheduler.updateQSIInfoForTests(); - schedulingInfo = + schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 17); - assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[15] , "Number of Waiting Jobs: 2"); - + assertEquals(infoStrings.length, 16); + assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[14] , "Number of Waiting Jobs: 2"); + //Raise status change events as none of the intialized jobs would be //in running queue as we just failed the second job which was initialized //and completed the first one. raiseStatusChangeEvents(scheduler.jobQueuesManager); - + //Now schedule a map should be job3 of the user as job1 succeeded job2 //failed and now job3 is running t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1"); FakeJobInProgress u1j3 = userJobs.get(2); - assertTrue("User Job 3 not running ", + assertTrue("User Job 3 not running ", u1j3.getStatus().getRunState() == JobStatus.RUNNING); - + //now the running count of map should be one and waiting jobs should be //one. run the poller as it is responsible for waiting count controlledInitializationPoller.selectJobsToInitialize(); // make sure we update our stats scheduler.updateQSIInfoForTests(); - schedulingInfo = + schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 19); - assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity"); - assertEquals(infoStrings[14],"Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[17] , "Number of Waiting Jobs: 1"); - + assertEquals(infoStrings.length, 18); + assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity"); + assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[16] , "Number of Waiting Jobs: 1"); + //Fail the executing job taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED); // make sure we update our stats scheduler.updateQSIInfoForTests(); //Now running counts should become zero - schedulingInfo = + schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 17); - assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity"); - assertEquals(infoStrings[15] , "Number of Waiting Jobs: 1"); - + assertEquals(infoStrings.length, 16); + assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity"); + assertEquals(infoStrings[14] , "Number of Waiting Jobs: 1"); + } /** @@ -1690,7 +1393,7 @@ taskTrackerManager.addQueues(new String[] { "default" }); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25)); + queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.setTaskTrackerManager(taskTrackerManager); @@ -1736,7 +1439,7 @@ taskTrackerManager.addQueues(new String[] { "default" }); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25)); + queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.setTaskTrackerManager(taskTrackerManager); @@ -1785,7 +1488,7 @@ taskTrackerManager.addQueues(new String[] { "default" }); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25)); + queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setTaskTrackerManager(taskTrackerManager); // enabled memory-based scheduling @@ -1870,7 +1573,7 @@ taskTrackerManager.addQueues(new String[] { "default" }); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25)); + queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); resConf.setFakeQueues(queues); scheduler.setTaskTrackerManager(taskTrackerManager); // enabled memory-based scheduling @@ -1933,7 +1636,7 @@ ttStatus.setReservedPhysicalMemory(0); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25)); + queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); taskTrackerManager.addQueues(new String[] { "default" }); resConf.setFakeQueues(queues); scheduler.setTaskTrackerManager(taskTrackerManager); @@ -2000,7 +1703,7 @@ ttStatus.setReservedPhysicalMemory(0); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25)); + queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); taskTrackerManager.addQueues(new String[] { "default" }); resConf.setFakeQueues(queues); scheduler.setTaskTrackerManager(taskTrackerManager); @@ -2099,7 +1802,7 @@ scheduler.setTaskTrackerManager(taskTrackerManager); taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100)); + queues.add(new FakeQueueInfo("default", 100.0f, true, 100)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); @@ -2224,7 +1927,7 @@ String[] qs = { "default"}; taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100)); + queues.add(new FakeQueueInfo("default", 100.0f, true, 100)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); @@ -2271,7 +1974,7 @@ String[] qs = { "default"}; taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100)); + queues.add(new FakeQueueInfo("default", 100.0f, true, 100)); resConf.setFakeQueues(queues); scheduler.setResourceManagerConf(resConf); scheduler.start(); Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java?rev=771607&r1=771606&r2=771607&view=diff ============================================================================== --- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java (original) +++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java Tue May 5 07:30:20 2009 @@ -49,13 +49,11 @@ public TestCapacitySchedulerConf() { defaultProperties = setupQueueProperties( - new String[] { "guaranteed-capacity", - "reclaim-time-limit", + new String[] { "capacity", "supports-priority", "minimum-user-limit-percent", "maximum-initialized-jobs-per-user"}, new String[] { "100", - "300", "false", "100", "2" } @@ -85,26 +83,22 @@ public void testQueues() { Map q1Props = setupQueueProperties( - new String[] { "guaranteed-capacity", - "reclaim-time-limit", + new String[] { "capacity", "supports-priority", "minimum-user-limit-percent", "maximum-initialized-jobs-per-user"}, new String[] { "10", - "600", "true", "25", "4"} ); Map q2Props = setupQueueProperties( - new String[] { "guaranteed-capacity", - "reclaim-time-limit", + new String[] { "capacity", "supports-priority", "minimum-user-limit-percent", "maximum-initialized-jobs-per-user"}, new String[] { "100", - "6000", "false", "50", "1"} @@ -126,7 +120,7 @@ public void testQueueWithDefaultProperties() { Map q1Props = setupQueueProperties( - new String[] { "guaranteed-capacity", + new String[] { "capacity", "minimum-user-limit-percent" }, new String[] { "20", "75" } @@ -143,7 +137,6 @@ for (String key : q1Props.keySet()) { expProperties.put(key, q1Props.get(key)); } - expProperties.put("reclaim-time-limit", "300"); expProperties.put("supports-priority", "false"); expProperties.put("maximum-initialized-jobs-per-user", "2"); queueDetails.put("default", expProperties); @@ -156,23 +149,19 @@ // write new values to the file... Map q1Props = setupQueueProperties( - new String[] { "guaranteed-capacity", - "reclaim-time-limit", + new String[] { "capacity", "supports-priority", "minimum-user-limit-percent" }, new String[] { "20.5", - "600", "true", "40" } ); Map q2Props = setupQueueProperties( - new String[] { "guaranteed-capacity", - "reclaim-time-limit", + new String[] { "capacity", "supports-priority", "minimum-user-limit-percent" }, new String[] { "100", - "3000", "false", "50" } ); @@ -198,23 +187,19 @@ endConfig(); Map q1Props = setupQueueProperties( - new String[] { "guaranteed-capacity", - "reclaim-time-limit", + new String[] { "capacity", "supports-priority", "minimum-user-limit-percent" }, new String[] { "-1", - "800", "true", "50" } ); Map q2Props = setupQueueProperties( - new String[] { "guaranteed-capacity", - "reclaim-time-limit", + new String[] { "capacity", "supports-priority", "minimum-user-limit-percent" }, new String[] { "-1", - "800", "true", "50" } ); @@ -235,18 +220,16 @@ startConfig(); writeUserDefinedDefaultConfiguration(); Map q1Props = setupQueueProperties( - new String[] { "guaranteed-capacity", - "reclaim-time-limit", + new String[] { "capacity", "supports-priority", "minimum-user-limit-percent" }, new String[] { "-1", - "800", "true", "50" } ); Map q2Props = setupQueueProperties( - new String[] { "guaranteed-capacity", + new String[] { "capacity", "supports-priority", "minimum-user-limit-percent" }, new String[] { "40", @@ -254,12 +237,10 @@ "50" } ); Map q3Props = setupQueueProperties( - new String[] { "guaranteed-capacity", - "reclaim-time-limit", + new String[] { "capacity", "supports-priority", "minimum-user-limit-percent" }, new String[] { "40", - "500", "true", "50" } ); @@ -269,7 +250,6 @@ testConf = new CapacitySchedulerConf(new Path(testConfFile)); Map> queueDetails = new HashMap>(); - q2Props.put("reclaim-time-limit", "800"); queueDetails.put("default", q1Props); queueDetails.put("production", q2Props); queueDetails.put("test", q3Props); @@ -280,12 +260,10 @@ openFile(); startConfig(); Map q1Props = setupQueueProperties( - new String[] { "guaranteed-capacity", - "reclaim-time-limit", + new String[] { "capacity", "supports-priority", "minimum-user-limit-percent" }, - new String[] { "-1", - "800", + new String[] { "-1", "true", "-50" } ); @@ -299,29 +277,6 @@ assertTrue(true); } } - public void testInvalidReclaimTimeLimit() throws IOException { - openFile(); - startConfig(); - Map q1Props = setupQueueProperties( - new String[] { "guaranteed-capacity", - "reclaim-time-limit", - "supports-priority", - "minimum-user-limit-percent" }, - new String[] { "-1", - "-800", - "true", - "50" } - ); - writeQueueDetails("default", q1Props); - endConfig(); - try { - testConf = new CapacitySchedulerConf(new Path(testConfFile)); - testConf.getReclaimTimeLimit("default"); - fail("Expect Invalid reclaim time limit to raise Exception"); - }catch(IllegalArgumentException e) { - assertTrue(true); - } - } public void testInitializationPollerProperties() throws Exception { @@ -372,42 +327,16 @@ } catch (IllegalArgumentException e) {} } - public void testInvalidReclaimCapacityInterval() throws IOException { - openFile(); - startConfig(); - Map q1Props = setupQueueProperties( - new String[] { "guaranteed-capacity", - "reclaim-time-limit", - "supports-priority", - "minimum-user-limit-percent" }, - new String[] { "-1", - "-800", - "true", - "50" } - ); - writeQueueDetails("default", q1Props); - writeProperty("mapred.capacity-scheduler.reclaimCapacity.interval", "0"); - endConfig(); - try { - testConf = new CapacitySchedulerConf(new Path(testConfFile)); - testConf.getReclaimCapacityInterval(); - fail("Expect Invalid reclaim capacity interval raise Exception"); - }catch(IllegalArgumentException e) { - assertTrue(true); - } - } - + private void checkQueueProperties( CapacitySchedulerConf testConf, Map> queueDetails) { for (String queueName : queueDetails.keySet()) { Map map = queueDetails.get(queueName); - assertEquals(Float.parseFloat(map.get("guaranteed-capacity")), - testConf.getGuaranteedCapacity(queueName)); + assertEquals(Float.parseFloat(map.get("capacity")), + testConf.getCapacity(queueName)); assertEquals(Integer.parseInt(map.get("minimum-user-limit-percent")), testConf.getMinimumUserLimitPercent(queueName)); - assertEquals(Integer.parseInt(map.get("reclaim-time-limit")), - testConf.getReclaimTimeLimit(queueName)); assertEquals(Boolean.parseBoolean(map.get("supports-priority")), testConf.isPrioritySupported(queueName)); } @@ -451,25 +380,21 @@ private void writeDefaultConfiguration() { - writeProperty("mapred.capacity-scheduler.default-reclaim-time-limit" - , "300"); writeProperty("mapred.capacity-scheduler.default-supports-priority" , "false"); writeProperty("mapred.capacity-scheduler.default-minimum-user-limit-percent" - , "100"); + , "100"); } - - + + private void writeUserDefinedDefaultConfiguration() { - writeProperty("mapred.capacity-scheduler.default-reclaim-time-limit" - , "800"); writeProperty("mapred.capacity-scheduler.default-supports-priority" , "true"); writeProperty("mapred.capacity-scheduler.default-minimum-user-limit-percent" - , "50"); + , "50"); } - - + + private void writeProperty(String name, String value) { writer.println(""); writer.println(" " + name + "");