Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 89484 invoked from network); 13 Oct 2009 13:29:06 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 13 Oct 2009 13:29:06 -0000 Received: (qmail 16360 invoked by uid 500); 13 Oct 2009 13:29:06 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 16340 invoked by uid 500); 13 Oct 2009 13:29:06 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 16330 invoked by uid 99); 13 Oct 2009 13:29:06 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Oct 2009 13:29:06 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Oct 2009 13:28:53 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 7FD6F23888EA; Tue, 13 Oct 2009 13:28:30 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r824750 [1/2] - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ Date: Tue, 13 Oct 2009 13:28:30 -0000 To: mapreduce-commits@hadoop.apache.org From: yhemanth@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091013132830.7FD6F23888EA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: yhemanth Date: Tue Oct 13 13:28:29 2009 New Revision: 824750 URL: http://svn.apache.org/viewvc?rev=824750&view=rev Log: MAPREDUCE-1030. Modified scheduling algorithm to return a map and reduce task per heartbeat in the capacity scheduler. Contributed by Rahul Kumar Singh. Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=824750&r1=824749&r2=824750&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Tue Oct 13 13:28:29 2009 @@ -748,4 +748,8 @@ MAPREDUCE-979. Fixed JobConf APIs related to memory parameters to return values of new configuration variables when deprecated variables are disabled. (Sreekanth Ramakrishnan via yhemanth) - + + MAPREDUCE-1030. Modified scheduling algorithm to return a map and reduce + task per heartbeat in the capacity scheduler. + (Rahul Kumar Singh via yhemanth) + Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=824750&r1=824749&r2=824750&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original) +++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue Oct 13 13:28:29 2009 @@ -906,8 +906,10 @@ /* * The grand plan for assigning a task. - * First, decide whether a Map or Reduce task should be given to a TT - * (if the TT can accept either). + * Always assigns 1 reduce and 1 map , if sufficient slots are + * available for each of types. + * If not , then which ever type of slots are available , that type of task is + * assigned. * Next, pick a queue. We only look at queues that need a slot. Among these, * we first look at queues whose (# of running tasks)/capacity is the least. * Next, pick a job in a queue. we pick the job at the front of the queue @@ -921,12 +923,12 @@ TaskLookupResult tlr; TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); + List result = new ArrayList(); /* - * If TT has Map and Reduce slot free, we need to figure out whether to - * give it a Map or Reduce task. - * Number of ways to do this. For now, base decision on how much is needed - * versus how much is used (default to Map, if equal). + * If TT has Map and Reduce slot free, we assign 1 map and 1 reduce + * We base decision on how much is needed + * versus how much is used */ ClusterStatus c = taskTrackerManager.getClusterStatus(); int mapClusterCapacity = c.getMaxMapTasks(); @@ -953,51 +955,26 @@ // make sure we get our map or reduce scheduling object to update its // collection of QSC objects too. - if ((maxReduceSlots - currentReduceSlots) > - (maxMapSlots - currentMapSlots)) { - // get a reduce task first + if (maxReduceSlots > currentReduceSlots) { + //reduce slot available , try to get a + //reduce task tlr = reduceScheduler.assignTasks(taskTracker); if (TaskLookupResult.LookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) { - // found a task; return - return Collections.singletonList(tlr.getTask()); - } - // if we didn't get any, look at map tasks, if TT has space - else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT - == tlr.getLookUpStatus() || - TaskLookupResult.LookUpStatus.NO_TASK_FOUND - == tlr.getLookUpStatus()) - && (maxMapSlots > currentMapSlots)) { - tlr = mapScheduler.assignTasks(taskTracker); - if (TaskLookupResult.LookUpStatus.TASK_FOUND == - tlr.getLookUpStatus()) { - return Collections.singletonList(tlr.getTask()); - } + result.add(tlr.getTask()); } } - else { - // get a map task first + + if(maxMapSlots > currentMapSlots) { + //map slot available , try to get a map task tlr = mapScheduler.assignTasks(taskTracker); if (TaskLookupResult.LookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) { - // found a task; return - return Collections.singletonList(tlr.getTask()); - } - // if we didn't get any, look at reduce tasks, if TT has space - else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT - == tlr.getLookUpStatus() - || TaskLookupResult.LookUpStatus.NO_TASK_FOUND - == tlr.getLookUpStatus()) - && (maxReduceSlots > currentReduceSlots)) { - tlr = reduceScheduler.assignTasks(taskTracker); - if (TaskLookupResult.LookUpStatus.TASK_FOUND == - tlr.getLookUpStatus()) { - return Collections.singletonList(tlr.getTask()); - } + result.add(tlr.getTask()); } } - - return null; + + return (result.isEmpty()) ? null : result; } Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java?rev=824750&r1=824749&r2=824750&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java (original) +++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java Tue Oct 13 13:28:29 2009 @@ -20,13 +20,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -48,6 +48,8 @@ public class CapacityTestUtils { static final Log LOG = LogFactory.getLog(org.apache.hadoop.mapred.CapacityTestUtils.class); + static final String MAP = "map"; + static final String REDUCE = "reduce"; /** @@ -160,18 +162,116 @@ } } - + /** + * The method accepts a attempt string and checks for validity of + * assignTask w.r.t attempt string. + * + * @param taskTrackerManager + * @param scheduler + * @param taskTrackerName + * @param expectedTaskString + * @return + * @throws IOException + */ static Task checkAssignment( CapacityTestUtils.FakeTaskTrackerManager taskTrackerManager, CapacityTaskScheduler scheduler, String taskTrackerName, String expectedTaskString) throws IOException { + Map expectedStrings = new HashMap(); + if (expectedTaskString.contains("_m_")) { + expectedStrings.put(MAP, expectedTaskString); + } else if (expectedTaskString.contains("_r_")) { + expectedStrings.put(REDUCE, expectedTaskString); + } + List tasks = checkMultipleTaskAssignment( + taskTrackerManager, scheduler, taskTrackerName, expectedStrings); + for (Task task : tasks) { + if (task.toString().equals(expectedTaskString)) { + return task; + } + } + return null; + } + + /** + * Checks the validity of tasks assigned by scheduler's assignTasks method + * According to JIRA:1030 every assignTasks call in CapacityScheduler + * would result in either MAP or REDUCE or BOTH. + * + * This method accepts a Map. + * The map should always have <=2 entried in hashMap. + * + * sample calling code . + * + * Map expectedStrings = new HashMap(); + * ...... + * ....... + * expectedStrings.clear(); + * expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1"); + * expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1"); + * checkMultipleTaskAssignment( + * taskTrackerManager, scheduler, "tt1", + * expectedStrings); + * + * @param taskTrackerManager + * @param scheduler + * @param taskTrackerName + * @param expectedTaskStrings + * @return + * @throws IOException + */ + static List checkMultipleTaskAssignment( + CapacityTestUtils.FakeTaskTrackerManager taskTrackerManager, + CapacityTaskScheduler scheduler, String taskTrackerName, + Map expectedTaskStrings) throws IOException { + //Call assign task List tasks = scheduler.assignTasks( taskTrackerManager.getTaskTracker( taskTrackerName)); - assertNotNull(expectedTaskString, tasks); - assertEquals(expectedTaskString, 1, tasks.size()); - assertEquals(expectedTaskString, tasks.get(0).toString()); - return tasks.get(0); + + if (tasks==null) { + if (expectedTaskStrings.size() > 0) { + fail("Expected some tasks to be assigned, but got none."); + } else { + return null; + } + } + + if (expectedTaskStrings.size() > tasks.size()) { + StringBuffer sb = new StringBuffer(); + sb.append("Expected strings different from actual strings."); + sb.append(" Expected string count=").append(expectedTaskStrings.size()); + sb.append(" Actual string count=").append(tasks.size()); + sb.append(" Expected strings="); + for (String expectedTask : expectedTaskStrings.values()) { + sb.append(expectedTask).append(","); + } + sb.append("Actual strings="); + for (Task actualTask : tasks) { + sb.append(actualTask.toString()).append(","); + } + fail(sb.toString()); + } + + for (Task task : tasks) { + LOG.info("tasks are : " + tasks.toString()); + if (task.isMapTask()) { + //check if expected string is set for map or not. + if (expectedTaskStrings.get(MAP) != null) { + assertEquals(expectedTaskStrings.get(MAP), task.toString()); + } else { + fail("No map task is expected, but got " + task.toString()); + } + } else { + //check if expectedStrings is set for reduce or not. + if (expectedTaskStrings.get(REDUCE) != null) { + assertEquals(expectedTaskStrings.get(REDUCE), task.toString()); + } else { + fail("No reduce task is expected, but got " + task.toString()); + } + } + } + return tasks; } static void verifyCapacity( Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=824750&r1=824749&r2=824750&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original) +++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Oct 13 13:28:29 2009 @@ -108,21 +108,21 @@ taskTrackerManager.initJob(fjob1); + //1 map and 1 reduce assigned List task1 = scheduler.assignTasks(tracker("tt1")); + //2 map are assigned reached the maxlimit List task2 = scheduler.assignTasks(tracker("tt2")); - //Once the 2 tasks are running the third assigment should be reduce. - checkAssignment( - taskTrackerManager, scheduler, "tt3", - "attempt_test_0001_r_000001_0 on tt3"); - //This should fail. - List task4 = scheduler.assignTasks(tracker("tt4")); - assertNull(task4); + //task3 is null as maxlimit is reached. + List task3 = scheduler.assignTasks(tracker("tt3")); + assertNull(task3); //Now complete the task 1. // complete the job + for(Task task: task1) { taskTrackerManager.finishTask( - task1.get(0).getTaskID().toString(), + task.getTaskID().toString(), fjob1); + } //We have completed the tt1 task which was a map task so we expect one map //task to be picked up checkAssignment( @@ -154,23 +154,29 @@ taskTrackerManager.initJob(fjob1); + //1 map and 1 reduce List task1 = scheduler.assignTasks(tracker("tt1")); + + // 1 reduce assigned List task2 = scheduler.assignTasks(tracker("tt2")); + + // No tasks should be assigned, as we have reached the max cap. List task3 = scheduler.assignTasks(tracker("tt3")); + assertNull(task3); - //This should fail. 1 map, 2 reduces , we have reached the limit. - List task4 = scheduler.assignTasks(tracker("tt4")); - assertNull(task4); //Now complete the task 1 i.e map task. - // complete the job - taskTrackerManager.finishTask( - task1.get(0).getTaskID().toString(), - fjob1); - - //This should still fail as only map task is done - task4 = scheduler.assignTasks(tracker("tt4")); - assertNull(task4); + for(Task task: task1) { + if (task.isMapTask()) { + taskTrackerManager.finishTask( + task.getTaskID().toString(), + fjob1); + } + } + //Still no slots available for reduce hence no tasks + //assigned + assertNull(scheduler.assignTasks(tracker("tt1"))); + //Complete the reduce task taskTrackerManager.finishTask( task2.get(0).getTaskID().toString(), fjob1); @@ -403,19 +409,27 @@ // submit a job with no queue specified. It should be accepted // and given to the default queue. - JobInProgress j = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1"); + JobInProgress j = taskTrackerManager.submitJobAndInit(JobStatus.PREP, + 10, 10, null, "u1"); + // when we ask for tasks, we should get them for the job submitted + Map expectedTaskStrings = new HashMap(); + expectedTaskStrings.put(CapacityTestUtils.MAP, + "attempt_test_0001_m_000001_0 on tt1"); + expectedTaskStrings.put(CapacityTestUtils.REDUCE, + "attempt_test_0001_r_000001_0 on tt1"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, + "tt1", expectedTaskStrings); - // when we ask for a task, we should get one, from the job submitted - Task t; - t = checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); // submit another job, to a different queue j = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1"); - // now when we get a task, it should be from the second job - t = checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0002_m_000001_0 on tt2"); + // now when we get tasks, it should be from the second job + expectedTaskStrings.clear(); + expectedTaskStrings.put(CapacityTestUtils.MAP, + "attempt_test_0002_m_000001_0 on tt2"); + expectedTaskStrings.put(CapacityTestUtils.REDUCE, + "attempt_test_0002_r_000001_0 on tt2"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, + "tt2", expectedTaskStrings); } public void testGetJobs() throws Exception { @@ -544,19 +558,30 @@ // submit a job taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1"); // for queue 'q2', the capacity for maps is 2. Since we're the only user, - // we should get a task - checkAssignment( + // we should get a task + Map expectedStrings = new HashMap(); + expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1"); + expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1"); + + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); - // I should get another map task. + expectedStrings); + + // I should get another map task. + //No redduces as there is 1 slot only for reduce on TT checkAssignment( taskTrackerManager, scheduler, "tt1", "attempt_test_0001_m_000002_0 on tt1"); + // Now we're at full capacity for maps. If I ask for another map task, - // I should get a map task from the default queue's capacity. - checkAssignment( + // I should get a map task from the default queue's capacity. + //same with reduce + expectedStrings.put(MAP,"attempt_test_0001_m_000003_0 on tt2"); + expectedStrings.put(REDUCE,"attempt_test_0001_r_000002_0 on tt2"); + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt2", - "attempt_test_0001_m_000003_0 on tt2"); + expectedStrings); + // and another checkAssignment( taskTrackerManager, scheduler, "tt2", @@ -608,7 +633,8 @@ jConf.setNumReduceTasks(0); jConf.setQueueName("defaultXYZM"); jConf.setUser("u1"); - FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf); + FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, jConf); LOG.debug( "Submit another regular memory(1GB vmem maps/reduces) job of " @@ -620,22 +646,18 @@ jConf.setNumReduceTasks(2); jConf.setQueueName("defaultXYZM"); jConf.setUser("u1"); - FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf); + FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, jConf); // first, a map from j1 will run this is a high memory job so it would - // occupy the 2 slots - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); - - checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 100.0f, 1, 1); - checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L); - - // at this point, the scheduler tries to schedule another map from j1. - // there isn't enough space. The second job's reduce should be scheduled. - checkAssignment( + // occupy the 2 slots and it would try to assign the reduce task from + //job2. + Map expectedStrings = new HashMap(); + expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1"); + expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1"); + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_r_000001_0 on tt1"); + expectedStrings); checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 100.0f, 1, 1); checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L); @@ -644,10 +666,11 @@ //another task tracker. // This should not happen as all the map slots are taken //by the first task itself.hence reduce task from the second job is given - - checkAssignment( + expectedStrings.clear(); + expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt2"); + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt2", - "attempt_test_0002_r_000002_0 on tt2"); + expectedStrings); } /** @@ -664,60 +687,71 @@ taskTrackerManager.setFakeQueues(queues); scheduler.start(); - scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getMapTSC().setMaxTaskLimit(2); - scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getReduceTSC().setMaxTaskLimit(2); + scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext() + .getMapTSC().setMaxTaskLimit(2); + scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext() + .getReduceTSC().setMaxTaskLimit(2); // submit a job FakeJobInProgress fjob1 = - taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1"); + taskTrackerManager.submitJobAndInit( + JobStatus.PREP, 10, 10, "default", "u1"); FakeJobInProgress fjob2 = - taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2"); + taskTrackerManager.submitJobAndInit( + JobStatus.PREP, 10, 10, "default", "u2"); // for queue 'default', the capacity for maps is 2. // But the max map limit is 2 // hence user should be getting not more than 1 as it is the 50%. - Task t1 = checkAssignment( + //same with reduce + Map expectedStrings = new HashMap(); + populateExpectedStrings(expectedStrings, + "attempt_test_0001_m_000001_0 on tt1", + "attempt_test_0001_r_000001_0 on tt1"); + List t1 = checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); + expectedStrings); //Now we should get the task from the other job. As the //first user has reached his max map limit. + //same with reduce + populateExpectedStrings(expectedStrings, + "attempt_test_0002_m_000001_0 on tt2", + "attempt_test_0002_r_000001_0 on tt2"); + checkMultipleTaskAssignment( + taskTrackerManager, scheduler, "tt2", + expectedStrings); + + //Now we are done with map and reduce limit , + // now if we ask for task we should + // get null. + List t3 = scheduler.assignTasks(tracker("tt3")); + assertNull(t3); + + //We completed 1 map and 1 reduce in here + for (Task task : t1) { + taskTrackerManager.finishTask( + task.getTaskID().toString(), + fjob1); + } - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0002_m_000001_0 on tt2"); - - //Now we are done with map limit , now if we ask for task we should - // get reduce from 1st job - checkAssignment( - taskTrackerManager, scheduler, "tt3", - "attempt_test_0001_r_000001_0 on tt3"); - // Now we're at full capacity for maps. 1 done with reduces for job 1 so - // now we should get 1 reduces for job 2 - Task t4 = checkAssignment( - taskTrackerManager, scheduler, "tt4", - "attempt_test_0002_r_000001_0 on tt4"); - - taskTrackerManager.finishTask( - t1.getTaskID().toString(), - fjob1); - - //tt1 completed the task so we have 1 map slot for u1 - // we are assigning the 2nd map task from fjob1 - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000002_0 on tt1"); - - taskTrackerManager.finishTask( - t4.getTaskID().toString(), - fjob2); - //tt4 completed the task , so we have 1 reduce slot for u2 - //we are assigning the 2nd reduce from fjob2 - checkAssignment( - taskTrackerManager, scheduler, "tt4", - "attempt_test_0002_r_000002_0 on tt4"); - + //again we would assign 1 map and 1 reduce + populateExpectedStrings(expectedStrings, + "attempt_test_0001_m_000002_0 on tt1", + "attempt_test_0001_r_000002_0 on tt1"); + checkMultipleTaskAssignment( + taskTrackerManager, scheduler, "tt1", + expectedStrings); + } + + // Utility method to construct a map of expected strings + // with exactly one map task and one reduce task. + private void populateExpectedStrings(Map expectedTaskStrings, + String mapTask, String reduceTask) { + expectedTaskStrings.clear(); + expectedTaskStrings.put(CapacityTestUtils.MAP, mapTask); + expectedTaskStrings.put(CapacityTestUtils.REDUCE, reduceTask); } @@ -736,26 +770,31 @@ // submit a job taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1"); - // for queue 'q2', the capacity for maps is 2. Since we're the only user, - // we should get a task - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); + // for queue 'q2', the capacity is 2 for maps and 1 for reduce. + // Since we're the only user, we should get tasks + Map expectedTaskStrings = new HashMap(); + populateExpectedStrings(expectedTaskStrings, + "attempt_test_0001_m_000001_0 on tt1", + "attempt_test_0001_r_000001_0 on tt1"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, + "tt1", expectedTaskStrings); + // Submit another job, from a different user taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2"); - // Now if I ask for a map task, it should come from the second job - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_m_000001_0 on tt1"); - // Now we're at full capacity for maps. If I ask for another map task, - // I should get a map task from the default queue's capacity. - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0001_m_000002_0 on tt2"); + // Now if I ask for a task, it should come from the second job + checkAssignment(taskTrackerManager, scheduler, + "tt1", "attempt_test_0002_m_000001_0 on tt1"); + + // Now we're at full capacity. If I ask for another task, + // I should get tasks from the default queue's capacity. + populateExpectedStrings(expectedTaskStrings, + "attempt_test_0001_m_000002_0 on tt2", + "attempt_test_0002_r_000001_0 on tt2"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, + "tt2", expectedTaskStrings); // and another - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0002_m_000002_0 on tt2"); + checkAssignment(taskTrackerManager, scheduler, + "tt2", "attempt_test_0002_m_000002_0 on tt2"); } // test user limits when a 2nd job is submitted much after first job @@ -773,21 +812,28 @@ // submit a job taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1"); - // for queue 'q2', the capacity for maps is 2. Since we're the only user, - // we should get a task - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); + // for queue 'q2', the capacity for maps is 2 and reduce is 1. + // Since we're the only user, we should get tasks + Map expectedTaskStrings = new HashMap(); + populateExpectedStrings(expectedTaskStrings, + "attempt_test_0001_m_000001_0 on tt1", + "attempt_test_0001_r_000001_0 on tt1"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, + "tt1", expectedTaskStrings); + // since we're the only job, we get another map checkAssignment( taskTrackerManager, scheduler, "tt1", "attempt_test_0001_m_000002_0 on tt1"); + // Submit another job, from a different user taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2"); - // Now if I ask for a map task, it should come from the second job - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0002_m_000001_0 on tt2"); + // Now if I ask for a task, it should come from the second job + populateExpectedStrings(expectedTaskStrings, + "attempt_test_0002_m_000001_0 on tt2", + "attempt_test_0002_r_000001_0 on tt2"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, + "tt2", expectedTaskStrings); // and another checkAssignment( taskTrackerManager, scheduler, "tt2", @@ -810,41 +856,56 @@ // submit a job FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1"); - // for queue 'q2', the capacity for maps is 2. Since we're the only user, - // we should get a task - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); + // for queue 'q2', the capacity for maps is 2 and reduces is 1. + // Since we're the only user, we should get a task + Map expectedTaskStrings = new HashMap(); + populateExpectedStrings(expectedTaskStrings, + "attempt_test_0001_m_000001_0 on tt1", + "attempt_test_0001_r_000001_0 on tt1"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, + "tt1", expectedTaskStrings); // since we're the only job, we get another map checkAssignment( taskTrackerManager, scheduler, "tt1", "attempt_test_0001_m_000002_0 on tt1"); - // we get two more maps from 'default queue' - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0001_m_000003_0 on tt2"); + // we get more tasks from 'default queue' + populateExpectedStrings(expectedTaskStrings, + "attempt_test_0001_m_000003_0 on tt2", + "attempt_test_0001_r_000002_0 on tt2"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, + "tt2", expectedTaskStrings); checkAssignment( taskTrackerManager, scheduler, "tt2", "attempt_test_0001_m_000004_0 on tt2"); + // Submit another job, from a different user FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2"); - // one of the task finishes + // one of the task finishes of each type taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", j1); - // Now if I ask for a map task, it should come from the second job - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_m_000001_0 on tt1"); + taskTrackerManager.finishTask("attempt_test_0001_r_000001_0", j1); + + // Now if I ask for a task, it should come from the second job + populateExpectedStrings(expectedTaskStrings, + "attempt_test_0002_m_000001_0 on tt1", + "attempt_test_0002_r_000001_0 on tt1"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, + "tt1", expectedTaskStrings); + // another task from job1 finishes, another new task to job2 taskTrackerManager.finishTask("attempt_test_0001_m_000002_0", j1); checkAssignment( taskTrackerManager, scheduler, "tt1", "attempt_test_0002_m_000002_0 on tt1"); + // now we have equal number of tasks from each job. Whichever job's // task finishes, that job gets a new task taskTrackerManager.finishTask("attempt_test_0001_m_000003_0", j1); - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0001_m_000005_0 on tt2"); + taskTrackerManager.finishTask("attempt_test_0001_r_000002_0", j1); + populateExpectedStrings(expectedTaskStrings, + "attempt_test_0001_m_000005_0 on tt2", + "attempt_test_0001_r_000003_0 on tt2"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, + "tt2", expectedTaskStrings); taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", j2); checkAssignment( taskTrackerManager, scheduler, "tt1", @@ -853,7 +914,7 @@ // test user limits with many users, more slots public void testUserLimits4() throws Exception { - // set up one queue, with 10 slots + // set up one queue, with 10 map slots and 5 reduce slots String[] qs = {"default"}; taskTrackerManager.addQueues(qs); ArrayList queues = new ArrayList(); @@ -870,42 +931,31 @@ // u1 submits job FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1"); // it gets the first 5 slots - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000002_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0001_m_000003_0 on tt2"); - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0001_m_000004_0 on tt2"); - checkAssignment( - taskTrackerManager, scheduler, "tt3", - "attempt_test_0001_m_000005_0 on tt3"); + Map expectedTaskStrings = new HashMap(); + for (int i=0; i<5; i++) { + String ttName = "tt"+(i+1); + populateExpectedStrings(expectedTaskStrings, + "attempt_test_0001_m_00000"+(i+1)+"_0 on " + ttName, + "attempt_test_0001_r_00000"+(i+1)+"_0 on " + ttName); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, + ttName, expectedTaskStrings); + } + // u2 submits job with 4 slots FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2"); // u2 should get next 4 slots - checkAssignment( - taskTrackerManager, scheduler, "tt3", - "attempt_test_0002_m_000001_0 on tt3"); - checkAssignment( - taskTrackerManager, scheduler, "tt4", - "attempt_test_0002_m_000002_0 on tt4"); - checkAssignment( - taskTrackerManager, scheduler, "tt4", - "attempt_test_0002_m_000003_0 on tt4"); - checkAssignment( - taskTrackerManager, scheduler, "tt5", - "attempt_test_0002_m_000004_0 on tt5"); + for (int i=0; i<4; i++) { + String ttName = "tt"+(i+1); + checkAssignment(taskTrackerManager, scheduler, ttName, + "attempt_test_0002_m_00000"+(i+1)+"_0 on " + ttName); + } // last slot should go to u1, since u2 has no more tasks checkAssignment( taskTrackerManager, scheduler, "tt5", "attempt_test_0001_m_000006_0 on tt5"); - // u1 finishes a task + // u1 finishes tasks taskTrackerManager.finishTask("attempt_test_0001_m_000006_0", j1); + taskTrackerManager.finishTask("attempt_test_0001_r_000005_0", j1); // u1 submits a few more jobs // All the jobs are inited when submitted // because of addition of Eager Job Initializer all jobs in this @@ -917,23 +967,26 @@ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u2"); // now u3 submits a job taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3"); - // next slot should go to u3, even though u2 has an earlier job, since + // next map slot should go to u3, even though u2 has an earlier job, since // user limits have changed and u1/u2 are over limits - checkAssignment( - taskTrackerManager, scheduler, "tt5", - "attempt_test_0007_m_000001_0 on tt5"); + // reduce slot will go to job 2, as it is still under limit. + populateExpectedStrings(expectedTaskStrings, + "attempt_test_0007_m_000001_0 on tt5", + "attempt_test_0002_r_000001_0 on tt5"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, + "tt5", expectedTaskStrings); // some other task finishes and u3 gets it taskTrackerManager.finishTask("attempt_test_0002_m_000004_0", j1); checkAssignment( - taskTrackerManager, scheduler, "tt5", - "attempt_test_0007_m_000002_0 on tt5"); + taskTrackerManager, scheduler, "tt4", + "attempt_test_0007_m_000002_0 on tt4"); // now, u2 finishes a task taskTrackerManager.finishTask("attempt_test_0002_m_000002_0", j1); // next slot will go to u1, since u3 has nothing to run and u1's job is // first in the queue checkAssignment( - taskTrackerManager, scheduler, "tt4", - "attempt_test_0001_m_000007_0 on tt4"); + taskTrackerManager, scheduler, "tt2", + "attempt_test_0001_m_000007_0 on tt2"); } /** @@ -955,13 +1008,13 @@ // enabled memory-based scheduling // Normal job in the cluster would be 1GB maps/reduces scheduler.getConf().setLong( - JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024); + JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024); scheduler.getConf().setLong( - MRConfig.MAPMEMORY_MB, 1 * 1024); + MRConfig.MAPMEMORY_MB, 1 * 1024); scheduler.getConf().setLong( - JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024); + JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024); scheduler.getConf().setLong( - MRConfig.REDUCEMEMORY_MB, 1 * 1024); + MRConfig.REDUCEMEMORY_MB, 1 * 1024); taskTrackerManager.setFakeQueues(queues); scheduler.start(); @@ -973,7 +1026,8 @@ jConf.setNumReduceTasks(6); jConf.setUser("u1"); jConf.setQueueName("default"); - FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf); + FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, jConf); LOG.debug( "Submit one high memory(2GB maps, 2GB reduces) job of " @@ -985,60 +1039,41 @@ jConf.setNumReduceTasks(6); jConf.setQueueName("default"); jConf.setUser("u2"); - FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf); + FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, jConf); - // Verify that normal job takes 3 task assignments to hit user limits - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_r_000001_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000002_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_r_000002_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000003_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_r_000003_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000004_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_r_000004_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000005_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_r_000005_0 on tt1"); + // Verify that normal job takes 5 task assignments to hit user limits + Map expectedStrings = new HashMap(); + for (int i = 0; i < 5; i++) { + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, + "attempt_test_0001_m_00000" + (i + 1) + "_0 on tt1"); + expectedStrings.put( + CapacityTestUtils.REDUCE, + "attempt_test_0001_r_00000" + (i + 1) + "_0 on tt1"); + checkMultipleTaskAssignment( + taskTrackerManager, scheduler, "tt1", + expectedStrings); + } // u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits // are hit. So u2 should get slots - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_m_000001_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_r_000001_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_m_000002_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_r_000002_0 on tt1"); - - // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce + for (int i = 0; i < 2; i++) { + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, + "attempt_test_0002_m_00000" + (i + 1) + "_0 on tt1"); + expectedStrings.put( + CapacityTestUtils.REDUCE, + "attempt_test_0002_r_00000" + (i + 1) + "_0 on tt1"); + checkMultipleTaskAssignment( + taskTrackerManager, scheduler, "tt1", + expectedStrings); + } // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce // slots. Because of high memory tasks, giving u2 another task would // overflow limits. So, no more tasks should be given to anyone. assertNull(scheduler.assignTasks(tracker("tt1"))); - assertNull(scheduler.assignTasks(tracker("tt1"))); } /* @@ -1051,9 +1086,7 @@ * - Then run initializationPoller() * - Check once again the waiting queue, it should be 5 jobs again. * - Then raise status change events. - * - Assign one task to a task tracker. (Map) - * - Check waiting job count, it should be 4 now and used map (%) = 100 - * - Assign another one task (Reduce) + * - Assign tasks to a task tracker. * - Check waiting job count, it should be 4 now and used map (%) = 100 * and used reduce (%) = 100 * - finish the job and then check the used percentage it should go @@ -1066,9 +1099,9 @@ * - Check the count, the waiting job count should be 2. * - Now raise status change events to move the initialized jobs which * should be two in count to running queue. - * - Then schedule a map of the job in running queue. + * - Then schedule a map and reduce of the job in running queue. * - Run the poller because the poller is responsible for waiting - * jobs count. Check the count, it should be using 100% map and one + * jobs count. Check the count, it should be using 100% map, reduce and one * waiting job * - fail the running job. * - Check the count, it should be now one waiting job and zero running @@ -1161,9 +1194,12 @@ raiseStatusChangeEvents(scheduler.jobQueuesManager); raiseStatusChangeEvents(scheduler.jobQueuesManager, "q2"); //assign one job - Task t1 = checkAssignment( + Map strs = new HashMap(); + strs.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1"); + strs.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1"); + List t1 = checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); + strs); //Initalize extra job. controlledInitializationPoller.selectJobsToInitialize(); @@ -1174,24 +1210,22 @@ schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 20); + + assertEquals(infoStrings.length, 22); assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)"); assertEquals(infoStrings[8], "Running tasks: 1"); assertEquals(infoStrings[9], "Active users:"); assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)"); - assertEquals(infoStrings[14], "Used capacity: 0 (0.0% of Capacity)"); - assertEquals(infoStrings[15], "Running tasks: 0"); - assertEquals(infoStrings[18], "Number of Waiting Jobs: 4"); + assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)"); + assertEquals(infoStrings[15], "Running tasks: 1"); + assertEquals(infoStrings[20], "Number of Waiting Jobs: 4"); - //assign a reduce task - Task t2 = checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_r_000001_0 on tt1"); // make sure we update our stats scheduler.updateContextInfoForTests(); schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); + assertEquals(infoStrings.length, 22); assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)"); assertEquals(infoStrings[8], "Running tasks: 1"); @@ -1205,8 +1239,9 @@ //Complete the job and check the running tasks count FakeJobInProgress u1j1 = userJobs.get(0); - taskTrackerManager.finishTask(t1.getTaskID().toString(), u1j1); - taskTrackerManager.finishTask(t2.getTaskID().toString(), u1j1); + for (Task task : t1) { + taskTrackerManager.finishTask(task.getTaskID().toString(), u1j1); + } taskTrackerManager.finalizeJob(u1j1); // make sure we update our stats @@ -1214,6 +1249,7 @@ schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); + assertEquals(infoStrings.length, 18); assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)"); assertEquals(infoStrings[8], "Running tasks: 0"); @@ -1274,9 +1310,12 @@ //Now schedule a map should be job3 of the user as job1 succeeded job2 //failed and now job3 is running - t1 = checkAssignment( + strs.clear(); + strs.put(CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt1"); + strs.put(CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt1"); + t1 = checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0003_m_000001_0 on tt1"); + strs); FakeJobInProgress u1j3 = userJobs.get(2); assertTrue( "User Job 3 not running ", @@ -1290,12 +1329,16 @@ schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo(); infoStrings = schedulingInfo.split("\n"); - assertEquals(infoStrings.length, 20); + assertEquals(infoStrings.length, 22); assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)"); assertEquals(infoStrings[8], "Running tasks: 1"); assertEquals(infoStrings[9], "Active users:"); assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)"); - assertEquals(infoStrings[18], "Number of Waiting Jobs: 1"); + assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)"); + assertEquals(infoStrings[15], "Running tasks: 1"); + assertEquals(infoStrings[16], "Active users:"); + assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)"); + assertEquals(infoStrings[20], "Number of Waiting Jobs: 1"); //Fail the executing job taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED); @@ -1347,13 +1390,14 @@ // assert that all tasks are launched even though they transgress the // scheduling limits. - - checkAssignment( + Map expectedStrings = new HashMap(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1"); + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_r_000001_0 on tt1"); + expectedStrings); } /** @@ -1366,7 +1410,7 @@ throws IOException { // 2 map and 1 reduce slots - taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1); + taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2); taskTrackerManager.addQueues(new String[]{"default"}); ArrayList queues = new ArrayList(); @@ -1399,7 +1443,8 @@ jConf.setNumReduceTasks(0); jConf.setQueueName("default"); jConf.setUser("u1"); - FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf); + FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, jConf); LOG.debug( "Submit another regular memory(1GB vmem maps/reduces) job of " @@ -1411,170 +1456,188 @@ jConf.setNumReduceTasks(2); jConf.setQueueName("default"); jConf.setUser("u1"); - FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf); + FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, jConf); - // first, a map from j1 will run - checkAssignment( + // first, a map from j1 and a reduce from other job j2 + Map strs = new HashMap(); + strs.put(MAP,"attempt_test_0001_m_000001_0 on tt1"); + strs.put(REDUCE,"attempt_test_0002_r_000001_0 on tt1"); + + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); + strs); // Total 2 map slots should be accounted for. checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f); - checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L); + checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50.0f); + checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L); - // at this point, the scheduler tries to schedule another map from j1. - // there isn't enough space. The second job's reduce should be scheduled. + //TT has 2 slots for reduces hence this call should get a reduce task + //from other job checkAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_r_000001_0 on tt1"); - // Total 1 reduce slot should be accounted for. - checkOccupiedSlots( - "default", TaskType.REDUCE, 1, 1, - 100.0f); - checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L); + "attempt_test_0002_r_000002_0 on tt1"); + checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f); + checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f); + checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L); + + //now as all the slots are occupied hence no more tasks would be + //assigned. + assertNull(scheduler.assignTasks(tracker("tt1"))); } - /** - * Test blocking of cluster for lack of memory. + /** + * Tests that scheduler schedules normal jobs once high RAM jobs + * have been reserved to the limit. + * + * The test causes the scheduler to schedule a normal job on two + * trackers, and one task of the high RAM job on a third. Then it + * asserts that one of the first two trackers gets a reservation + * for the remaining task of the high RAM job. After this, it + * asserts that a normal job submitted later is allowed to run + * on a free slot, as all tasks of the high RAM job are either + * scheduled or reserved. * * @throws IOException */ public void testClusterBlockingForLackOfMemory() - throws IOException { - - LOG.debug("Starting the scheduler."); - taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2); - - ArrayList queues = new ArrayList(); - queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); - taskTrackerManager.addQueues(new String[]{"default"}); - - - scheduler.setTaskTrackerManager(taskTrackerManager); - // enabled memory-based scheduling - // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces - scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024); - scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024); - scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024); - scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024); - taskTrackerManager.setFakeQueues(queues); - scheduler.start(); - - LOG.debug( - "Submit one normal memory(1GB maps/reduces) job of " - + "1 map, 1 reduce tasks."); - JobConf jConf = new JobConf(conf); - jConf.setMemoryForMapTask(1 * 1024); - jConf.setMemoryForReduceTask(1 * 1024); - jConf.setNumMapTasks(1); - jConf.setNumReduceTasks(1); - jConf.setQueueName("default"); - jConf.setUser("u1"); - FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf); + throws IOException { - // Fill the second tt with this job. - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0001_m_000001_0 on tt2"); - // Total 1 map slot should be accounted for. - checkOccupiedSlots("default", TaskType.MAP, 1, 1, 25.0f); - assertEquals( - String.format( - TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING, - 1, 1, 0, 0, 0, 0), - (String) job1.getSchedulingInfo()); - checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L); - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0001_r_000001_0 on tt2"); - // Total 1 map slot should be accounted for. - checkOccupiedSlots( - "default", TaskType.REDUCE, 1, 1, - 25.0f); - assertEquals( - String.format( - TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING, - 1, 1, 0, 1, 1, 0), - (String) job1.getSchedulingInfo()); - checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L); - - LOG.debug( - "Submit one high memory(2GB maps/reduces) job of " - + "2 map, 2 reduce tasks."); - jConf = new JobConf(conf); - jConf.setMemoryForMapTask(2 * 1024); - jConf.setMemoryForReduceTask(2 * 1024); - jConf.setNumMapTasks(2); - jConf.setNumReduceTasks(2); - jConf.setQueueName("default"); - jConf.setUser("u1"); - FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf); - - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_m_000001_0 on tt1"); - // Total 3 map slots should be accounted for. - checkOccupiedSlots("default", TaskType.MAP, 1, 3, 75.0f); - assertEquals( - String.format( - TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING, - 1, 2, 0, 0, 0, 0), - (String) job2.getSchedulingInfo()); - checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L); + LOG.debug("Starting the scheduler."); + taskTrackerManager = new FakeTaskTrackerManager(3, 2, 2); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_r_000001_0 on tt1"); - // Total 3 reduce slots should be accounted for. - checkOccupiedSlots( - "default", TaskType.REDUCE, 1, 3, - 75.0f); - assertEquals( - String.format( - TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING, - 1, 2, 0, 1, 2, 0), - (String) job2.getSchedulingInfo()); - checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L); - - LOG.debug( - "Submit one normal memory(1GB maps/reduces) job of " - + "1 map, 0 reduce tasks."); - jConf = new JobConf(conf); - jConf.setMemoryForMapTask(1 * 1024); - jConf.setMemoryForReduceTask(1 * 1024); - jConf.setNumMapTasks(1); - jConf.setNumReduceTasks(1); - jConf.setQueueName("default"); - jConf.setUser("u1"); - FakeJobInProgress job3 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf); - - // Job2 cannot fit on tt1. So tt1 is reserved for a map slot of job2 - assertNull(scheduler.assignTasks(tracker("tt1"))); - assertNull(scheduler.assignTasks(tracker("tt1"))); - - // reserved tasktrackers contribute to occupied slots for maps. - checkOccupiedSlots("default", TaskType.MAP, 1, 5, 125.0f); - // occupied slots for reduces remain unchanged as tt1 is not reserved for - // reduces. - checkOccupiedSlots("default", TaskType.REDUCE, 1, 3, 75.0f); - checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L); - checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L); - LOG.info(job2.getSchedulingInfo()); - assertEquals( - String.format( - TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING, - 1, 2, 2, 1, 2, 0), - (String) job2.getSchedulingInfo()); - assertEquals( - String.format( - TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING, - 0, 0, 0, 0, 0, 0), - (String) job3.getSchedulingInfo()); - - // One reservation is already done for job2. So job3 should go ahead. - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0003_m_000001_0 on tt2"); - } + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 100.0f, true, 25)); + taskTrackerManager.addQueues(new String[]{"default"}); + scheduler.setTaskTrackerManager(taskTrackerManager); + // enabled memory-based scheduling + // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces + scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024); + scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024); + scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024); + scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024); + taskTrackerManager.setFakeQueues(queues); + scheduler.start(); + + LOG.debug( + "Submit one normal memory(1GB maps/reduces) job of " + + "2 map, 2 reduce tasks."); + JobConf jConf = new JobConf(conf); + jConf.setMemoryForMapTask(1 * 1024); + jConf.setMemoryForReduceTask(1 * 1024); + jConf.setNumMapTasks(2); + jConf.setNumReduceTasks(2); + jConf.setQueueName("default"); + jConf.setUser("u1"); + FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, jConf); + + // Fill a tt with this job's tasks. + Map expectedStrings = new HashMap(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1"); + checkMultipleTaskAssignment( + taskTrackerManager, scheduler, "tt1", + expectedStrings); + // Total 1 map slot should be accounted for. + checkOccupiedSlots("default", TaskType.MAP, 1, 1, 16.7f); + checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 16.7f); + assertEquals( + String.format( + TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING, + 1, 1, 0, 1, 1, 0), + (String) job1.getSchedulingInfo()); + checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L); + + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt2"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt2"); + + // fill another TT with the rest of the tasks of the job + checkMultipleTaskAssignment( + taskTrackerManager, scheduler, "tt2", + expectedStrings); + + LOG.debug( + "Submit one high memory(2GB maps/reduces) job of " + + "2 map, 2 reduce tasks."); + jConf = new JobConf(conf); + jConf.setMemoryForMapTask(2 * 1024); + jConf.setMemoryForReduceTask(2 * 1024); + jConf.setNumMapTasks(2); + jConf.setNumReduceTasks(2); + jConf.setQueueName("default"); + jConf.setUser("u1"); + FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, jConf); + + // Have another TT run one task of each type of the high RAM + // job. This will fill up the TT. + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt3"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt3"); + + checkMultipleTaskAssignment(taskTrackerManager, scheduler, + "tt3", expectedStrings); + checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f); + checkOccupiedSlots("default", TaskType.REDUCE, 1, 4, 66.7f); + assertEquals( + String.format( + TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING, + 1, 2, 0, 1, 2, 0), + (String) job2.getSchedulingInfo()); + checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 2 * 1024L); + + LOG.debug( + "Submit one normal memory(1GB maps/reduces) job of " + + "1 map, 1 reduce tasks."); + jConf = new JobConf(conf); + jConf.setMemoryForMapTask(1 * 1024); + jConf.setMemoryForReduceTask(1 * 1024); + jConf.setNumMapTasks(1); + jConf.setNumReduceTasks(1); + jConf.setQueueName("default"); + jConf.setUser("u1"); + FakeJobInProgress job3 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, jConf); + + // Send a TT with insufficient space for task assignment, + // This will cause a reservation for the high RAM job. + assertNull(scheduler.assignTasks(tracker("tt1"))); + + // reserved tasktrackers contribute to occupied slots for maps and reduces + checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f); + checkOccupiedSlots("default", TaskType.REDUCE, 1, 6, 100.0f); + checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L); + LOG.info(job2.getSchedulingInfo()); + assertEquals( + String.format( + TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING, + 1, 2, 2, 1, 2, 2), + (String) job2.getSchedulingInfo()); + assertEquals( + String.format( + TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING, + 0, 0, 0, 0, 0, 0), + (String) job3.getSchedulingInfo()); + + // Reservations are already done for job2. So job3 should go ahead. + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt2"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt2"); + + checkMultipleTaskAssignment( + taskTrackerManager, scheduler, "tt2", + expectedStrings); + } /** * Testcase to verify fix for a NPE (HADOOP-5641), when memory based @@ -1613,24 +1676,21 @@ jConf.setMemoryForReduceTask(512); jConf.setQueueName("default"); jConf.setUser("u1"); - FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf); + FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, jConf); - // 1st cycle - 1 map gets assigned. - Task t = checkAssignment( + // 1st cycle - 1 map and reduce gets assigned. + Map expectedStrings = new HashMap(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1"); + List t = checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); - // Total 1 map slot should be accounted for. + expectedStrings); + // Total 1 map slot and 1 reduce slot should be accounted for. checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50.0f); - checkMemReservedForTasksOnTT("tt1", 512L, 0L); - - // 1st cycle of reduces - 1 reduce gets assigned. - Task t1 = checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_r_000001_0 on tt1"); - // Total 1 reduce slot should be accounted for. - checkOccupiedSlots( - "default", TaskType.REDUCE, 1, 1, - 50.0f); + checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50.0f); checkMemReservedForTasksOnTT("tt1", 512L, 512L); // kill this job ! @@ -1653,20 +1713,21 @@ jConf.setMemoryForReduceTask(512); jConf.setQueueName("default"); jConf.setUser("u1"); - FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf); + FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, jConf); // since with HADOOP-5964, we don't rely on a job conf to get // the memory occupied, scheduling should be able to work correctly. - t1 = checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_m_000001_0 on tt1"); - checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50); - checkMemReservedForTasksOnTT("tt1", 1024L, 512L); + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1"); - // assign a reduce now. - t1 = checkAssignment( + List t1 = checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_r_000001_0 on tt1"); + expectedStrings); + checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50); checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50); checkMemReservedForTasksOnTT("tt1", 1024L, 1024L); @@ -1674,22 +1735,22 @@ assertNull(scheduler.assignTasks(tracker("tt1"))); // finish the tasks on the tracker. - taskTrackerManager.finishTask(t.getTaskID().toString(), job1); - taskTrackerManager.finishTask(t1.getTaskID().toString(), job1); + for (Task task : t) { + taskTrackerManager.finishTask(task.getTaskID().toString(), job1); + } + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt1"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt1"); // now a new task can be assigned. - t = checkAssignment( + t = checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_m_000002_0 on tt1"); + expectedStrings); checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f); - // memory used will change because of the finished task above. - checkMemReservedForTasksOnTT("tt1", 1024L, 512L); - - // reduce can be assigned. - t = checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_r_000002_0 on tt1"); checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f); + // memory used will change because of the finished task above. checkMemReservedForTasksOnTT("tt1", 1024L, 1024L); } @@ -1721,9 +1782,10 @@ JobInitializationPoller initPoller = scheduler.getInitializationPoller(); // submit 4 jobs each for 3 users. - HashMap> userJobs = taskTrackerManager.submitJobs( - 3, - 4, "default"); + HashMap> userJobs = + taskTrackerManager.submitJobs( + 3, + 4, "default"); // get the jobs submitted. ArrayList u1Jobs = userJobs.get("u1"); @@ -1782,31 +1844,37 @@ raiseStatusChangeEvents(mgr); // get some tasks assigned. - Task t1 = checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); - Task t2 = checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_r_000001_0 on tt1"); - Task t3 = checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0002_m_000001_0 on tt2"); - Task t4 = checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0002_r_000001_0 on tt2"); - taskTrackerManager.finishTask( - t1.getTaskID().toString(), u1Jobs.get( - 0)); - taskTrackerManager.finishTask( - t2.getTaskID().toString(), u1Jobs.get( - 0)); - taskTrackerManager.finishTask( - t3.getTaskID().toString(), u1Jobs.get( - 1)); - taskTrackerManager.finishTask( - t4.getTaskID().toString(), u1Jobs.get( - 1)); + Map expectedStrings = new HashMap(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1"); + + List t1 = checkMultipleTaskAssignment( + taskTrackerManager, scheduler, "tt1", + expectedStrings); + + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt2"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt2"); + + List t2 = checkMultipleTaskAssignment( + taskTrackerManager, scheduler, "tt2", + expectedStrings); + + for (Task task : t1) { + taskTrackerManager.finishTask( + task.getTaskID().toString(), u1Jobs.get( + 0)); + } + for (Task task : t2) { + taskTrackerManager.finishTask( + task.getTaskID().toString(), u1Jobs.get( + 0)); + } // as some jobs have running tasks, the poller will now // pick up new jobs to initialize. controlledInitializationPoller.selectJobsToInitialize(); @@ -1827,19 +1895,21 @@ "Initialized jobs contains the user1 job 2", initializedJobs.contains(u1Jobs.get(1).getJobID())); + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt1"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt1"); + // finish one more job - t1 = checkAssignment( + t1 = checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0003_m_000001_0 on tt1"); - t2 = checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0003_r_000001_0 on tt1"); - taskTrackerManager.finishTask( - t1.getTaskID().toString(), u1Jobs.get( - 2)); - taskTrackerManager.finishTask( - t2.getTaskID().toString(), u1Jobs.get( - 2)); + expectedStrings); + for (Task task : t1) { + taskTrackerManager.finishTask( + task.getTaskID().toString(), u1Jobs.get( + 2)); + } // no new jobs should be picked up, because max user limit // is still 3. @@ -1847,19 +1917,21 @@ assertEquals(initializedJobs.size(), 5); + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0004_m_000001_0 on tt1"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0004_r_000001_0 on tt1"); + // run 1 more jobs.. - t1 = checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0004_m_000001_0 on tt1"); - t1 = checkAssignment( + t1 = checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0004_r_000001_0 on tt1"); - taskTrackerManager.finishTask( - t1.getTaskID().toString(), u1Jobs.get( - 3)); - taskTrackerManager.finishTask( - t2.getTaskID().toString(), u1Jobs.get( - 3)); + expectedStrings); + for (Task task : t1) { + taskTrackerManager.finishTask( + task.getTaskID().toString(), u1Jobs.get( + 3)); + } // Now initialised jobs should contain user 4's job, as // user 1's jobs are all done and the number of users is @@ -1968,12 +2040,13 @@ taskTrackerManager.submitJob(JobStatus.PREP, 1, 1, "q1", "u1"); controlledInitializationPoller.selectJobsToInitialize(); raiseStatusChangeEvents(scheduler.jobQueuesManager, "q1"); - Task t = checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); - t = checkAssignment( + Map strs = new HashMap(); + strs.put(CapacityTestUtils.MAP,"attempt_test_0001_m_000001_0 on tt1"); + strs.put(CapacityTestUtils.REDUCE,"attempt_test_0001_r_000001_0 on tt1"); + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_r_000001_0 on tt1"); + strs); + } public void testFailedJobInitalizations() throws Exception { @@ -2049,37 +2122,41 @@ conf.setReduceSpeculativeExecution(true); //Submit a job which would have one speculative map and one speculative //reduce. - FakeJobInProgress fjob1 = taskTrackerManager.submitJob(JobStatus.PREP, conf); + FakeJobInProgress fjob1 = taskTrackerManager.submitJob( + JobStatus.PREP, conf); conf = new JobConf(); conf.setNumMapTasks(1); conf.setNumReduceTasks(1); //Submit a job which has no speculative map or reduce. - FakeJobInProgress fjob2 = taskTrackerManager.submitJob(JobStatus.PREP, conf); + FakeJobInProgress fjob2 = taskTrackerManager.submitJob( + JobStatus.PREP, conf); //Ask the poller to initalize all the submitted job and raise status //change event. controlledInitializationPoller.selectJobsToInitialize(); raiseStatusChangeEvents(mgr); - - checkAssignment( + Map strs = new HashMap(); + strs.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1"); + strs.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1"); + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); + strs); assertTrue( "Pending maps of job1 greater than zero", (fjob1.pendingMaps() == 0)); - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0001_m_000001_1 on tt2"); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_r_000001_0 on tt1"); + assertTrue( - "Pending reduces of job2 greater than zero", + "Pending reduces of job1 greater than zero", (fjob1.pendingReduces() == 0)); - checkAssignment( + + Map str = new HashMap(); + str.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_1 on tt2"); + str.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_1 on tt2"); + + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt2", - "attempt_test_0001_r_000001_1 on tt2"); + str); taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", fjob1); taskTrackerManager.finishTask("attempt_test_0001_m_000001_1", fjob1); @@ -2087,12 +2164,13 @@ taskTrackerManager.finishTask("attempt_test_0001_r_000001_1", fjob1); taskTrackerManager.finalizeJob(fjob1); - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_m_000001_0 on tt1"); - checkAssignment( + str.clear(); + str.put(CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1"); + str.put(CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1"); + + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_r_000001_0 on tt1"); + str); taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", fjob2); taskTrackerManager.finishTask("attempt_test_0002_r_000001_0", fjob2); taskTrackerManager.finalizeJob(fjob2); @@ -2267,7 +2345,8 @@ jConf.setNumReduceTasks(6); jConf.setQueueName("default"); jConf.setUser("u1"); - FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf); + FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, jConf); // Submit a normal job to the other queue. jConf = new JobConf(conf); @@ -2277,108 +2356,178 @@ jConf.setNumReduceTasks(6); jConf.setUser("u1"); jConf.setQueueName("q1"); - FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf); + FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, jConf); - // Map 1 of high memory job - checkAssignment( + // Map and reduce of high memory job should be assigned + HashMap expectedStrings = new HashMap(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1"); + + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); + expectedStrings); + checkQueuesOrder( qs, scheduler .getOrderedQueues(TaskType.MAP)); - // Reduce 1 of high memory job - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_r_000001_0 on tt1"); checkQueuesOrder( qs, scheduler .getOrderedQueues(TaskType.REDUCE)); - // Map 1 of normal job - checkAssignment( + // 1st map and reduce of normal job should be assigned + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1"); + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_m_000001_0 on tt1"); + expectedStrings); + checkQueuesOrder( reversedQs, scheduler .getOrderedQueues(TaskType.MAP)); - - // Reduce 1 of normal job - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_r_000001_0 on tt1"); checkQueuesOrder( reversedQs, scheduler .getOrderedQueues(TaskType.REDUCE)); - // Map 2 of normal job - checkAssignment( + // 2nd map and reduce of normal job should be assigned + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt1"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt1"); + + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_m_000002_0 on tt1"); + expectedStrings); checkQueuesOrder( reversedQs, scheduler .getOrderedQueues(TaskType.MAP)); - - // Reduce 2 of normal job - checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0002_r_000002_0 on tt1"); checkQueuesOrder( reversedQs, scheduler .getOrderedQueues(TaskType.REDUCE)); // Now both the queues are equally served. But the comparator doesn't change // the order if queues are equally served. + // Hence, 3rd map and reduce of normal job should be assigned + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0002_m_000003_0 on tt2"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0002_r_000003_0 on tt2"); - // Map 3 of normal job - checkAssignment( + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt2", - "attempt_test_0002_m_000003_0 on tt2"); + expectedStrings); + checkQueuesOrder( reversedQs, scheduler .getOrderedQueues(TaskType.MAP)); - // Reduce 3 of normal job - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0002_r_000003_0 on tt2"); checkQueuesOrder( reversedQs, scheduler .getOrderedQueues(TaskType.REDUCE)); - // Map 2 of high memory job - checkAssignment( + // 2nd map and reduce of high memory job should be assigned + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt2"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt2"); + + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt2", - "attempt_test_0001_m_000002_0 on tt2"); + expectedStrings); checkQueuesOrder( qs, scheduler .getOrderedQueues(TaskType.MAP)); - // Reduce 2 of high memory job - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0001_r_000002_0 on tt2"); checkQueuesOrder( qs, scheduler .getOrderedQueues(TaskType.REDUCE)); - // Map 4 of normal job - checkAssignment( + // 4th map and reduce of normal job should be assigned. + expectedStrings.clear(); + expectedStrings.put( + CapacityTestUtils.MAP, "attempt_test_0002_m_000004_0 on tt2"); + expectedStrings.put( + CapacityTestUtils.REDUCE, "attempt_test_0002_r_000004_0 on tt2"); + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt2", - "attempt_test_0002_m_000004_0 on tt2"); + expectedStrings); checkQueuesOrder( reversedQs, scheduler .getOrderedQueues(TaskType.MAP)); - // Reduce 4 of normal job - checkAssignment( - taskTrackerManager, scheduler, "tt2", - "attempt_test_0002_r_000004_0 on tt2"); checkQueuesOrder( reversedQs, scheduler .getOrderedQueues(TaskType.REDUCE)); } + /** + * Tests whether 1 map and 1 reduce are assigned even if reduces span across + * multiple jobs or multiple queues. + * + * creates a cluster of 6 maps and 2 reduces. + * Submits 2 jobs: + * job1 , with 6 map and 1 reduces + * job2 with 2 map and 1 reduces + * + * + * check that first assignment assigns a map and a reduce. + * check that second assignment assigns a map and a reduce + * (both from other job and other queue) + * + * the last 2 calls just checks to make sure that we dont get further reduces + * + * @throws Exception + */ + public void testMultiTaskAssignmentInMultipleQueues() throws Exception { + setUp(1, 6, 2); + // set up some queues + String[] qs = {"default", "q1"}; + taskTrackerManager.addQueues(qs); + ArrayList queues = new ArrayList(); + queues.add(new FakeQueueInfo("default", 50.0f, true, 25)); + queues.add(new FakeQueueInfo("q1", 50.0f, true, 25)); + taskTrackerManager.setFakeQueues(queues); + scheduler.start(); + + //Submit the job with 6 maps and 2 reduces + taskTrackerManager.submitJobAndInit( + JobStatus.PREP, 6, 1, "default", "u1"); + + FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit( + JobStatus.PREP, 2, 1, "q1", "u2"); + + Map str = new HashMap(); + str.put(MAP, "attempt_test_0001_m_000001_0 on tt1"); + str.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str); + + // next assignment will be for job in second queue. + str.clear(); + str.put(MAP, "attempt_test_0002_m_000001_0 on tt1"); + str.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str); + + //now both the reduce slots are being used , hence we sholdnot get only 1 + //map task in this assignTasks call. + str.clear(); + str.put(MAP, "attempt_test_0002_m_000002_0 on tt1"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str); + + str.clear(); + str.put(MAP, "attempt_test_0001_m_000002_0 on tt1"); + checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str); + } + + private void checkRunningJobMovementAndCompletion() throws IOException { JobQueuesManager mgr = scheduler.jobQueuesManager; @@ -2402,12 +2551,13 @@ mgr.getJobQueue("default").getRunningJobs().contains(job)); // assign a task - Task t = checkAssignment( - taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_m_000001_0 on tt1"); - t = checkAssignment( + Map strs = new HashMap(); + strs.put(MAP,"attempt_test_0001_m_000001_0 on tt1"); + strs.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1"); + + checkMultipleTaskAssignment( taskTrackerManager, scheduler, "tt1", - "attempt_test_0001_r_000001_0 on tt1"); + strs); controlledInitializationPoller.selectJobsToInitialize(); @@ -2551,14 +2701,14 @@ tracker(taskTracker).getStatus(), TaskType.REDUCE); if (expectedMemForMapsOnTT == null) { - assertTrue(observedMemForMapsOnTT == null); + assertEquals(observedMemForMapsOnTT,null); } else { - assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT)); + assertEquals(observedMemForMapsOnTT,expectedMemForMapsOnTT); } if (expectedMemForReducesOnTT == null) { - assertTrue(observedMemForReducesOnTT == null); + assertEquals(observedMemForReducesOnTT,null); } else { - assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT)); + assertEquals(observedMemForReducesOnTT,expectedMemForReducesOnTT); } }