From mapreduce-commits-return-482-apmail-hadoop-mapreduce-commits-archive=hadoop.apache.org@hadoop.apache.org Fri Nov 20 20:06:06 2009 Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 62303 invoked from network); 20 Nov 2009 20:06:06 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 20 Nov 2009 20:06:06 -0000 Received: (qmail 871 invoked by uid 500); 20 Nov 2009 20:06:06 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 819 invoked by uid 500); 20 Nov 2009 20:06:06 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 809 invoked by uid 99); 20 Nov 2009 20:06:05 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Nov 2009 20:06:05 +0000 X-ASF-Spam-Status: No, hits=-2.6 required=5.0 tests=BAYES_00 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Nov 2009 20:06:02 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8F64123888C2; Fri, 20 Nov 2009 20:05:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r882700 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Date: Fri, 20 Nov 2009 20:05:41 -0000 To: mapreduce-commits@hadoop.apache.org From: matei@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20091120200542.8F64123888C2@eris.apache.org> Author: matei Date: Fri Nov 20 20:05:38 2009 New Revision: 882700 URL: http://svn.apache.org/viewvc?rev=882700&view=rev Log: MAPREDUCE-1198. Alternatively schedule different types of tasks in fair share scheduler. Contributed by Scott Chen. Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=882700&r1=882699&r2=882700&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Fri Nov 20 20:05:38 2009 @@ -11,6 +11,9 @@ IMPROVEMENTS + MAPREDUCE-1198. Alternatively schedule different types of tasks in + fair share scheduler. (Scott Chen via matei) + MAPREDUCE-707. Provide a jobconf property for explicitly assigning a job to a pool in the Fair Scheduler. (Alan Heirich via matei) Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=882700&r1=882699&r2=882700&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original) +++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Fri Nov 20 20:05:38 2009 @@ -322,69 +322,116 @@ TaskTrackerStatus tts = tracker.getStatus(); - // Scan to see whether any job needs to run a map, then a reduce + int mapsAssigned = 0; // loop counter for map in the below while loop + int reducesAssigned = 0; // loop counter for reduce in the below while + int mapCapacity = maxTasksToAssign(TaskType.MAP, tts); + int reduceCapacity = maxTasksToAssign(TaskType.REDUCE, tts); + boolean mapRejected = false; // flag used for ending the loop + boolean reduceRejected = false; // flag used for ending the loop + + // Keep track of which jobs were visited for map tasks and which had tasks + // launched, so that we can later mark skipped jobs for delay scheduling + Set visitedForMap = new HashSet(); + Set visitedForReduce = new HashSet(); + Set launchedMap = new HashSet(); + ArrayList tasks = new ArrayList(); - for (TaskType taskType: MAP_AND_REDUCE) { - // Keep track of which jobs were visited and which had tasks launched, - // so that we can later mark skipped jobs for delay scheduling - Set visited = new HashSet(); - Set launched = new HashSet(); - // Compute a maximum number of tasks to assign on this task tracker - int cap = maxTasksToAssign(taskType, tts); - // Assign up to cap tasks - for (int i = 0; i < cap; i++) { - // Break if all runnable tasks of this type are already running - if (taskType == TaskType.MAP && runningMaps == runnableMaps || - taskType == TaskType.REDUCE && runningReduces == runnableReduces) - break; - // Break if the node can't support another task of this type - boolean canAssign = (taskType == TaskType.MAP) ? - loadMgr.canAssignMap(tts, runnableMaps, totalMapSlots) : - loadMgr.canAssignReduce(tts, runnableReduces, totalReduceSlots); - if (canAssign) { - // Get the map or reduce schedulables and sort them by fair sharing - List scheds = getPoolSchedulables(taskType); - Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator()); - for (Schedulable sched: scheds) { - eventLog.log("INFO", "Checking for " + taskType + - " task in " + sched.getName()); - Task task = sched.assignTask(tts, currentTime, visited); - if (task != null) { - JobInProgress job = taskTrackerManager.getJob(task.getJobID()); - eventLog.log("ASSIGN", trackerName, taskType, - job.getJobID(), task.getTaskID()); - launched.add(job); - // Update running task counts, and the job's locality level - if (taskType == TaskType.MAP) { - runningMaps++; - updateLastMapLocalityLevel(job, task, tts); - } else { - runningReduces++; - } - // Add task to the list of assignments - tasks.add(task); - break; - } // end if(task != null) - } // end for(Schedulable sched: scheds) + // Scan jobs to assign tasks until neither maps nor reduces can be assigned + while (true) { + // Computing the ending conditions for the loop + // Reject a task type if one of the following condition happens + // 1. number of assigned task reaches per heatbeat limit + // 2. number of running tasks reaches runnable tasks + // 3. task is rejected by the LoadManager.canAssign + if (!mapRejected) { + if (mapsAssigned == mapCapacity || + runningMaps == runnableMaps || + !loadMgr.canAssignMap(tts, runnableMaps, totalMapSlots)) { + eventLog.log("INFO", "Can't assign another MAP to " + trackerName); + mapRejected = true; + } + } + if (!reduceRejected) { + if (reducesAssigned == reduceCapacity || + runningReduces == runnableReduces || + !loadMgr.canAssignReduce(tts, runnableReduces, totalReduceSlots)) { + eventLog.log("INFO", "Can't assign another REDUCE to " + trackerName); + reduceRejected = true; + } + } + // Exit while (true) loop if + // 1. neither maps nor reduces can be assigned + // 2. assignMultiple is off and we already assigned one task + if (mapRejected && reduceRejected || + !assignMultiple && tasks.size() > 0) { + break; // This is the only exit of the while (true) loop + } + + // Determine which task type to assign this time + // First try choosing a task type which is not rejected + TaskType taskType; + if (mapRejected) { + taskType = TaskType.REDUCE; + } else if (reduceRejected) { + taskType = TaskType.MAP; + } else { + // If both types are available, choose the task type with fewer running + // tasks on the task tracker to prevent that task type from starving + if (tts.countMapTasks() <= tts.countReduceTasks()) { + taskType = TaskType.MAP; } else { - eventLog.log("INFO", "Can't assign another " + taskType + - " to " + trackerName); - break; - } - } // end for(i = 0; i < cap; i++) - // If we were assigning maps, mark any jobs that were visited but - // did not launch a task as skipped on this heartbeat - if (taskType == TaskType.MAP) { - for (JobInProgress job: visited) { - if (!launched.contains(job)) { - infos.get(job).skippedAtLastHeartbeat = true; + taskType = TaskType.REDUCE; + } + } + + // Get the map or reduce schedulables and sort them by fair sharing + List scheds = getPoolSchedulables(taskType); + Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator()); + boolean foundTask = false; + for (Schedulable sched: scheds) { // This loop will assign only one task + eventLog.log("INFO", "Checking for " + taskType + + " task in " + sched.getName()); + Task task = taskType == TaskType.MAP ? + sched.assignTask(tts, currentTime, visitedForMap) : + sched.assignTask(tts, currentTime, visitedForReduce); + if (task != null) { + foundTask = true; + JobInProgress job = taskTrackerManager.getJob(task.getJobID()); + eventLog.log("ASSIGN", trackerName, taskType, + job.getJobID(), task.getTaskID()); + // Update running task counts, and the job's locality level + if (taskType == TaskType.MAP) { + launchedMap.add(job); + mapsAssigned++; + runningMaps++; + updateLastMapLocalityLevel(job, task, tts); + } else { + reducesAssigned++; + runningReduces++; } + // Add task to the list of assignments + tasks.add(task); + break; // This break makes this loop assign only one task + } // end if(task != null) + } // end for(Schedulable sched: scheds) + + // Reject the task type if we cannot find a task + if (!foundTask) { + if (taskType == TaskType.MAP) { + mapRejected = true; + } else { + reduceRejected = true; } } - // Return if assignMultiple was disabled and we found a task - if (!assignMultiple && tasks.size() > 0) - return tasks; - } // end for(TaskType taskType: MAP_AND_REDUCE) + } // end while (true) + + // Mark any jobs that were visited for map tasks but did not launch a task + // as skipped on this heartbeat + for (JobInProgress job: visitedForMap) { + if (!launchedMap.contains(job)) { + infos.get(job).skippedAtLastHeartbeat = true; + } + } // If no tasks were found, return null return tasks.isEmpty() ? null : tasks; Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=882700&r1=882699&r2=882700&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original) +++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Nov 20 20:05:38 2009 @@ -683,8 +683,8 @@ // Assign tasks and check that jobs alternate in filling slots checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); @@ -742,8 +742,8 @@ // Assign tasks and check that jobs alternate in filling slots checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1", - "attempt_test_0002_m_000000_0 on tt1", "attempt_test_0001_r_000000_0 on tt1", + "attempt_test_0002_m_000000_0 on tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2", "attempt_test_0002_r_000001_0 on tt2"); @@ -803,12 +803,12 @@ // Check that tasks are filled alternately by the jobs checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); - checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); + checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); // Check that no new tasks can be launched once the tasktrackers are full @@ -846,12 +846,12 @@ // Check that tasks are filled alternately by the jobs checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1"); - checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1"); + checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); - checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2"); + checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2"); // Check scheduler variables; the demands should now be 8 because 2 tasks @@ -911,12 +911,12 @@ // Check that tasks are filled alternately by the jobs checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1", - "attempt_test_0002_m_000000_0 on tt1", "attempt_test_0001_r_000000_0 on tt1", + "attempt_test_0002_m_000000_0 on tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2", - "attempt_test_0002_m_000001_0 on tt2", "attempt_test_0001_r_000001_0 on tt2", + "attempt_test_0002_m_000001_0 on tt2", "attempt_test_0002_r_000001_0 on tt2"); // Check that no new tasks can be launched once the tasktrackers are full @@ -954,12 +954,12 @@ // Check that tasks are filled alternately by the jobs checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1", - "attempt_test_0002_m_000002_0 on tt1", "attempt_test_0001_r_000002_0 on tt1", + "attempt_test_0002_m_000002_0 on tt1", "attempt_test_0002_r_000002_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2", - "attempt_test_0002_m_000003_0 on tt2", "attempt_test_0001_r_000003_0 on tt2", + "attempt_test_0002_m_000003_0 on tt2", "attempt_test_0002_r_000003_0 on tt2"); // Check scheduler variables; the demands should now be 8 because 2 tasks @@ -1016,16 +1016,16 @@ // type should be handed out alternately to 1, 2, 2, 1, 2, 2, etc. System.out.println("HEREEEE"); checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3"); - checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3"); checkAssignment("tt3", "attempt_test_0002_r_000002_0 on tt3"); + checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3"); checkAssignment("tt3", "attempt_test_0002_r_000003_0 on tt3"); } @@ -1101,12 +1101,12 @@ // Assign tasks and check that slots are first given to needy jobs checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2"); } @@ -1181,12 +1181,12 @@ // Assign tasks and check that slots are first given to needy jobs, but // that job 1 gets two tasks after due to having a larger share. checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); } @@ -1257,12 +1257,12 @@ // Assign tasks and check that slots are first given to needy jobs, but // that job 1 gets two tasks after due to having a larger share. checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1", - "attempt_test_0003_m_000000_0 on tt1", "attempt_test_0002_r_000000_0 on tt1", + "attempt_test_0003_m_000000_0 on tt1", "attempt_test_0003_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2", - "attempt_test_0001_m_000001_0 on tt2", "attempt_test_0001_r_000000_0 on tt2", + "attempt_test_0001_m_000001_0 on tt2", "attempt_test_0001_r_000001_0 on tt2"); } @@ -1309,12 +1309,12 @@ // Assign tasks and check that slots are first given to needy jobs checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2"); } @@ -1361,13 +1361,13 @@ // Assign tasks and check that only jobs 1 and 2 get them checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1"); advanceTime(100); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); - checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); + checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); } @@ -1418,13 +1418,13 @@ // Assign tasks and check that slots are given only to jobs 1, 3 and 4 checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1"); advanceTime(100); checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0004_r_000000_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); } @@ -1687,13 +1687,13 @@ // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first. JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); advanceTime(100); checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2"); // Ten seconds later, submit job 2. @@ -1765,13 +1765,13 @@ // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first. JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); advanceTime(100); checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2"); // Ten seconds later, submit job 2. @@ -1798,8 +1798,8 @@ scheduler.update(); assertEquals(3, job1.runningMaps()); assertEquals(2, job1.runningReduces()); - checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2"); + checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); assertNull(scheduler.assignTasks(tracker("tt1"))); assertNull(scheduler.assignTasks(tracker("tt2"))); @@ -1844,18 +1844,18 @@ JobInProgress job1 = submitJob(JobStatus.RUNNING, 6, 6, "pool1"); advanceTime(100); checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); advanceTime(100); checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2"); advanceTime(100); checkAssignment("tt3", "attempt_test_0001_m_000004_0 on tt3"); - checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3"); checkAssignment("tt3", "attempt_test_0001_r_000004_0 on tt3"); + checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3"); checkAssignment("tt3", "attempt_test_0001_r_000005_0 on tt3"); advanceTime(100); @@ -1863,8 +1863,8 @@ JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool2"); advanceTime(100); checkAssignment("tt4", "attempt_test_0002_m_000000_0 on tt4"); - checkAssignment("tt4", "attempt_test_0002_m_000001_0 on tt4"); checkAssignment("tt4", "attempt_test_0002_r_000000_0 on tt4"); + checkAssignment("tt4", "attempt_test_0002_m_000001_0 on tt4"); checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4"); // Submit job 3. @@ -1900,8 +1900,8 @@ assertEquals(4, job1.runningMaps()); assertEquals(4, job1.runningReduces()); checkAssignment("tt3", "attempt_test_0003_m_000000_0 on tt3"); - checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3"); checkAssignment("tt3", "attempt_test_0003_r_000000_0 on tt3"); + checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3"); checkAssignment("tt3", "attempt_test_0003_r_000001_0 on tt3"); assertNull(scheduler.assignTasks(tracker("tt1"))); assertNull(scheduler.assignTasks(tracker("tt2"))); @@ -1941,13 +1941,13 @@ // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first. JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); advanceTime(100); checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2"); // Ten seconds later, submit job 2. @@ -1983,8 +1983,8 @@ assertEquals(2, job1.runningMaps()); assertEquals(2, job1.runningReduces()); checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2"); - checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2"); + checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); assertNull(scheduler.assignTasks(tracker("tt1"))); assertNull(scheduler.assignTasks(tracker("tt2"))); @@ -2015,13 +2015,13 @@ // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first. JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); advanceTime(100); checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2"); // Ten seconds later, submit job 2. @@ -2069,13 +2069,13 @@ // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first. JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10); checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1"); advanceTime(100); checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2"); - checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2"); + checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2"); // Ten seconds later, submit job 2. @@ -2385,12 +2385,12 @@ // Assign tasks and check that they're given first to job3 (because it is // high priority), then to job1, then to job2. checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2"); - checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2"); + checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2"); } @@ -2423,12 +2423,12 @@ // Assign tasks and check that they alternate between jobs 1 and 3, the // head-of-line jobs in their respective pools. checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); - checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); + checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2"); checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2"); } @@ -2464,12 +2464,12 @@ // Assign tasks and check that only job 1 gets tasks in pool A, but // jobs 3 and 4 both get tasks in pool B. checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1"); - checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1"); + checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1"); checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1"); checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2"); - checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2"); + checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2"); checkAssignment("tt2", "attempt_test_0004_r_000000_0 on tt2"); }