Return-Path: Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: (qmail 89997 invoked from network); 4 Mar 2011 03:26:14 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 4 Mar 2011 03:26:14 -0000 Received: (qmail 95155 invoked by uid 500); 4 Mar 2011 03:26:13 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 95125 invoked by uid 500); 4 Mar 2011 03:26:13 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 95118 invoked by uid 99); 4 Mar 2011 03:26:13 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:26:13 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 04 Mar 2011 03:26:09 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 69AEA2388C16; Fri, 4 Mar 2011 03:25:48 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1076956 - in /hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src: java/org/apache/hadoop/mapred/CapacityTaskScheduler.java test/org/apache/hadoop/mapred/TestCapacityScheduler.java Date: Fri, 04 Mar 2011 03:25:48 -0000 To: common-commits@hadoop.apache.org From: omalley@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110304032548.69AEA2388C16@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: omalley Date: Fri Mar 4 03:25:48 2011 New Revision: 1076956 URL: http://svn.apache.org/viewvc?rev=1076956&view=rev Log: commit 5f8761f9b4042f0fb1e47846dd93713d8903cd67 Author: Lee Tucker Date: Thu Jul 30 17:40:44 2009 -0700 Applying patch 2871273.mr722.patch Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1076956&r1=1076955&r2=1076956&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar 4 03:25:48 2011 @@ -529,21 +529,22 @@ class CapacityTaskScheduler extends Task continue; } } else { - //if memory requirements don't match then we check if the - //job has either pending or speculative task or has insufficient number - //of 'reserved' tasktrackers to cover all pending tasks. If so - //we reserve the current tasktracker for this job so that - //high memory jobs are not starved - if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTrackerStatus) || - !hasSufficientReservedTaskTrackers(j)) { + // if memory requirements don't match then we check if the job has + // pending tasks and has insufficient number of 'reserved' + // tasktrackers to cover all pending tasks. If so we reserve the + // current tasktracker for this job so that high memory jobs are not + // starved + if ((getPendingTasks(j) != 0 && !hasSufficientReservedTaskTrackers(j))) { // Reserve all available slots on this tasktracker - LOG.info(j.getJobID() + ": Reserving " + taskTracker.getTrackerName() + - " since memory-requirements don't match"); - taskTracker.reserveSlots(type, j, taskTracker.getAvailableSlots(type)); - + LOG.info(j.getJobID() + ": Reserving " + + taskTracker.getTrackerName() + + " since memory-requirements don't match"); + taskTracker.reserveSlots(type, j, taskTracker + .getAvailableSlots(type)); + // Block return TaskLookupResult.getMemFailedResult(); - } + } }//end of memory check block // if we're here, this job has no task to run. Look at the next job. }//end of for loop Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1076956&r1=1076955&r2=1076956&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original) +++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar 4 03:25:48 2011 @@ -2048,30 +2048,29 @@ public class TestCapacityScheduler exten jConf.setUser("u1"); FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf); - // Job2 cannot fit on tt2 or tt1. Blocking. Job3 also will not run. + // Job2 cannot fit on tt1. So tt1 is reserved for a map slot of job2 assertNull(scheduler.assignTasks(tracker("tt1"))); - assertNull(scheduler.assignTasks(tracker("tt2"))); assertNull(scheduler.assignTasks(tracker("tt1"))); - assertNull(scheduler.assignTasks(tracker("tt2"))); - // reserved tasktrackers contribute to occupied slots - // for maps, both tasktrackers are reserved. - checkOccupiedSlots("default", TaskType.MAP, 1, 7, 175.0f); - // for reduces, only one tasktracker is reserved, because - // the reduce scheduler is not visited for tt1 (as it has - // 0 slots free). - checkOccupiedSlots("default", TaskType.REDUCE, 1, 5, - 125.0f); + + // reserved tasktrackers contribute to occupied slots for maps. + checkOccupiedSlots("default", TaskType.MAP, 1, 5, 125.0f); + // occupied slots for reduces remain unchanged as tt1 is not reserved for + // reduces. + checkOccupiedSlots("default", TaskType.REDUCE, 1, 3, 75.0f); checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L); checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L); LOG.info(job2.getSchedulingInfo()); assertEquals(String.format( CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, - 1, 2, 4, 1, 2, 2), + 1, 2, 2, 1, 2, 0), (String) job2.getSchedulingInfo()); assertEquals(String.format( CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 0, 0, 0, 0), (String) job3.getSchedulingInfo()); + + // One reservation is already done for job2. So job3 should go ahead. + checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2"); } /** @@ -2574,18 +2573,14 @@ public class TestCapacityScheduler exten } /** - * Test case to test scheduling of jobs with speculative execution - * in the face of high RAM jobs. - * - * Essentially, the test verifies that if a high RAM job has speculative - * tasks that cannot run because of memory requirements, we block - * that node and do not return any tasks to it. - * + * Test to verify that TTs are reserved for high memory jobs, but only till a + * TT is reserved for each of the pending task. * @throws IOException */ - public void testHighRamJobWithSpeculativeExecution() throws IOException { - // 2 TTs, 3 map and 3 reduce slots on each TT - taskTrackerManager = new FakeTaskTrackerManager(2, 3, 3); + public void testTTReservingWithHighMemoryJobs() + throws IOException { + // 3 taskTrackers, 2 map and 0 reduce slots on each TT + taskTrackerManager = new FakeTaskTrackerManager(3, 2, 0); taskTrackerManager.addQueues(new String[] { "default" }); ArrayList queues = new ArrayList(); @@ -2593,96 +2588,105 @@ public class TestCapacityScheduler exten resConf.setFakeQueues(queues); scheduler.setTaskTrackerManager(taskTrackerManager); // enabled memory-based scheduling - // 1GB for each map, 1GB for each reduce + // Normal job in the cluster would be 1GB maps/reduces scheduler.getConf().setLong( - JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, - 3 * 1024L); + JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024); scheduler.getConf().setLong( - JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L); + JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024); scheduler.getConf().setLong( - JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, - 3 * 1024L); + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024); scheduler.getConf().setLong( - JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024L); + JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024); scheduler.setResourceManagerConf(resConf); scheduler.start(); - // Submit a normal job that should occupy a node + LOG.debug("Submit a regular memory(1GB vmem maps/reduces) job of " + + "3 map/red tasks"); JobConf jConf = new JobConf(conf); + jConf = new JobConf(conf); jConf.setMemoryForMapTask(1 * 1024); - jConf.setMemoryForReduceTask(0); - jConf.setNumMapTasks(2); - jConf.setNumReduceTasks(0); + jConf.setMemoryForReduceTask(1 * 1024); + jConf.setNumMapTasks(3); + jConf.setNumReduceTasks(3); jConf.setQueueName("default"); jConf.setUser("u1"); - FakeJobInProgress job1 = submitJob(JobStatus.PREP, jConf); - - //Submit a high memory job with speculative tasks. - jConf = new JobConf(); + FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf); + + // assign one map task of job1 on all the TTs + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); + checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); + checkAssignment("tt3", "attempt_test_0001_m_000003_0 on tt3"); + scheduler.updateQSIInfoForTests(); + + LOG.info(job1.getSchedulingInfo()); + assertEquals(String.format( + CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 3, 3, 0, 0, + 0, 0), (String) job1.getSchedulingInfo()); + + LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of " + + "2 map tasks"); jConf.setMemoryForMapTask(2 * 1024); jConf.setMemoryForReduceTask(0); - jConf.setNumMapTasks(1); + jConf.setNumMapTasks(2); jConf.setNumReduceTasks(0); jConf.setQueueName("default"); jConf.setUser("u1"); - jConf.setMapSpeculativeExecution(true); - jConf.setReduceSpeculativeExecution(false); - FakeJobInProgress job2 = - new FakeJobInProgress(new JobID("test", ++jobCounter), jConf, - taskTrackerManager, "u1"); - taskTrackerManager.submitJob(job2); + FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf); - //Submit normal job + LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of " + + "2 map/red tasks"); jConf = new JobConf(conf); jConf.setMemoryForMapTask(1 * 1024); - jConf.setMemoryForReduceTask(0); - jConf.setNumMapTasks(1); - jConf.setNumReduceTasks(0); + jConf.setMemoryForReduceTask(1 * 1024); + jConf.setNumMapTasks(2); + jConf.setNumReduceTasks(2); jConf.setQueueName("default"); jConf.setUser("u1"); - jConf.setMapSpeculativeExecution(false); - jConf.setReduceSpeculativeExecution(false); - FakeJobInProgress job3 = submitJob(JobStatus.PREP, jConf); - - controlledInitializationPoller.selectJobsToInitialize(); - raiseStatusChangeEvents(scheduler.jobQueuesManager); + FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf); - // Have one node on which all tasks of job1 are scheduled. - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); - checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1"); + // Job2, a high memory job cannot be accommodated on a any TT. But with each + // trip to the scheduler, each of the TT should be reserved by job2. + assertNull(scheduler.assignTasks(tracker("tt1"))); + scheduler.updateQSIInfoForTests(); + LOG.info(job2.getSchedulingInfo()); + assertEquals(String.format( + CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 2, 0, + 0, 0), (String) job2.getSchedulingInfo()); - // raise events to initialize the 3rd job - controlledInitializationPoller.selectJobsToInitialize(); - raiseStatusChangeEvents(scheduler.jobQueuesManager); + assertNull(scheduler.assignTasks(tracker("tt2"))); + scheduler.updateQSIInfoForTests(); + LOG.info(job2.getSchedulingInfo()); + assertEquals(String.format( + CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 4, 0, + 0, 0), (String) job2.getSchedulingInfo()); - // On the second node, one task of the high RAM job can be scheduled. - checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); - checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L); - assertEquals("pending maps greater than zero " , job2.pendingMaps(), 0); - // Total 4 map slots should be accounted for. - checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f); - - // now when the first node gets back, it cannot run any task - // because job2 has a speculative task that can run on this node. - // This is even though job3's tasks can run on this node. + // Job2 has only 2 pending tasks. So no more reservations. Job3 should get + // slots on tt3. tt1 and tt2 should not be assigned any slots with the + // reservation stats intact. assertNull(scheduler.assignTasks(tracker("tt1"))); - // Reservation will count for 2 more slots. - checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f); - - // finish one task from tt1. - taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", - job1); - - // now, we can schedule the speculative task on tt1 - checkAssignment("tt1", "attempt_test_0002_m_000001_1 on tt1"); - - // finish one more task from tt1. - taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", - job1); - - // now the new job's tasks can be scheduled. - checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1"); - } + scheduler.updateQSIInfoForTests(); + LOG.info(job2.getSchedulingInfo()); + assertEquals(String.format( + CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 4, 0, + 0, 0), (String) job2.getSchedulingInfo()); + + assertNull(scheduler.assignTasks(tracker("tt2"))); + scheduler.updateQSIInfoForTests(); + LOG.info(job2.getSchedulingInfo()); + assertEquals(String.format( + CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 4, 0, + 0, 0), (String) job2.getSchedulingInfo()); + + checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3"); + scheduler.updateQSIInfoForTests(); + LOG.info(job2.getSchedulingInfo()); + assertEquals(String.format( + CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 4, 0, + 0, 0), (String) job2.getSchedulingInfo()); + + // No more tasks there in job3 also + assertNull(scheduler.assignTasks(tracker("tt3"))); +} /** * Test to verify that queue ordering is based on the number of slots occupied