hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r882700 - in /hadoop/mapreduce/trunk: CHANGES.txt src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Date Fri, 20 Nov 2009 20:05:41 GMT
Author: matei
Date: Fri Nov 20 20:05:38 2009
New Revision: 882700

URL: http://svn.apache.org/viewvc?rev=882700&view=rev
Log:
MAPREDUCE-1198. Alternatively schedule different types of tasks in fair share
scheduler. Contributed by Scott Chen.


Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=882700&r1=882699&r2=882700&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Nov 20 20:05:38 2009
@@ -11,6 +11,9 @@
 
   IMPROVEMENTS
 
+    MAPREDUCE-1198. Alternatively schedule different types of tasks in
+    fair share scheduler. (Scott Chen via matei)
+
     MAPREDUCE-707. Provide a jobconf property for explicitly assigning a job to 
     a pool in the Fair Scheduler. (Alan Heirich via matei)
 

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=882700&r1=882699&r2=882700&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Fri Nov 20 20:05:38 2009
@@ -322,69 +322,116 @@
     
     TaskTrackerStatus tts = tracker.getStatus();
 
-    // Scan to see whether any job needs to run a map, then a reduce
+    int mapsAssigned = 0; // loop counter for map in the below while loop
+    int reducesAssigned = 0; // loop counter for reduce in the below while
+    int mapCapacity = maxTasksToAssign(TaskType.MAP, tts);
+    int reduceCapacity = maxTasksToAssign(TaskType.REDUCE, tts);
+    boolean mapRejected = false; // flag used for ending the loop
+    boolean reduceRejected = false; // flag used for ending the loop
+
+    // Keep track of which jobs were visited for map tasks and which had tasks
+    // launched, so that we can later mark skipped jobs for delay scheduling
+    Set<JobInProgress> visitedForMap = new HashSet<JobInProgress>();
+    Set<JobInProgress> visitedForReduce = new HashSet<JobInProgress>();
+    Set<JobInProgress> launchedMap = new HashSet<JobInProgress>();
+
     ArrayList<Task> tasks = new ArrayList<Task>();
-    for (TaskType taskType: MAP_AND_REDUCE) {
-      // Keep track of which jobs were visited and which had tasks launched,
-      // so that we can later mark skipped jobs for delay scheduling
-      Set<JobInProgress> visited = new HashSet<JobInProgress>();
-      Set<JobInProgress> launched = new HashSet<JobInProgress>();
-      // Compute a maximum number of tasks to assign on this task tracker
-      int cap = maxTasksToAssign(taskType, tts);
-      // Assign up to cap tasks
-      for (int i = 0; i < cap; i++) {
-        // Break if all runnable tasks of this type are already running
-        if (taskType == TaskType.MAP && runningMaps == runnableMaps ||
-            taskType == TaskType.REDUCE && runningReduces == runnableReduces)
-          break;
-        // Break if the node can't support another task of this type
-        boolean canAssign = (taskType == TaskType.MAP) ? 
-            loadMgr.canAssignMap(tts, runnableMaps, totalMapSlots) :
-            loadMgr.canAssignReduce(tts, runnableReduces, totalReduceSlots);
-        if (canAssign) {
-          // Get the map or reduce schedulables and sort them by fair sharing
-          List<PoolSchedulable> scheds = getPoolSchedulables(taskType);
-          Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
-          for (Schedulable sched: scheds) {
-            eventLog.log("INFO", "Checking for " + taskType + 
-                " task in " + sched.getName());
-            Task task = sched.assignTask(tts, currentTime, visited);
-            if (task != null) {
-              JobInProgress job = taskTrackerManager.getJob(task.getJobID());
-              eventLog.log("ASSIGN", trackerName, taskType,
-                  job.getJobID(), task.getTaskID());
-              launched.add(job);
-              // Update running task counts, and the job's locality level
-              if (taskType == TaskType.MAP) {
-                runningMaps++;
-                updateLastMapLocalityLevel(job, task, tts);
-              } else {
-                runningReduces++;
-              }
-              // Add task to the list of assignments
-              tasks.add(task);
-              break;
-            } // end if(task != null)
-          } // end for(Schedulable sched: scheds)
+    // Scan jobs to assign tasks until neither maps nor reduces can be assigned
+    while (true) {
+      // Computing the ending conditions for the loop
+      // Reject a task type if one of the following condition happens
+      // 1. number of assigned task reaches per heatbeat limit
+      // 2. number of running tasks reaches runnable tasks
+      // 3. task is rejected by the LoadManager.canAssign
+      if (!mapRejected) {
+        if (mapsAssigned == mapCapacity ||
+            runningMaps == runnableMaps ||
+            !loadMgr.canAssignMap(tts, runnableMaps, totalMapSlots)) {
+          eventLog.log("INFO", "Can't assign another MAP to " + trackerName);
+          mapRejected = true;
+        }
+      }
+      if (!reduceRejected) {
+        if (reducesAssigned == reduceCapacity ||
+            runningReduces == runnableReduces ||
+            !loadMgr.canAssignReduce(tts, runnableReduces, totalReduceSlots)) {
+          eventLog.log("INFO", "Can't assign another REDUCE to " + trackerName);
+          reduceRejected = true;
+        }
+      }
+      // Exit while (true) loop if
+      // 1. neither maps nor reduces can be assigned
+      // 2. assignMultiple is off and we already assigned one task
+      if (mapRejected && reduceRejected ||
+          !assignMultiple && tasks.size() > 0) {
+        break; // This is the only exit of the while (true) loop
+      }
+
+      // Determine which task type to assign this time
+      // First try choosing a task type which is not rejected
+      TaskType taskType;
+      if (mapRejected) {
+        taskType = TaskType.REDUCE;
+      } else if (reduceRejected) {
+        taskType = TaskType.MAP;
+      } else {
+        // If both types are available, choose the task type with fewer running
+        // tasks on the task tracker to prevent that task type from starving
+        if (tts.countMapTasks() <= tts.countReduceTasks()) {
+          taskType = TaskType.MAP;
         } else {
-          eventLog.log("INFO", "Can't assign another " + taskType +
-              " to " + trackerName);
-          break;
-        }
-      } // end for(i = 0; i < cap; i++)
-      // If we were assigning maps, mark any jobs that were visited but
-      // did not launch a task as skipped on this heartbeat
-      if (taskType == TaskType.MAP) {
-        for (JobInProgress job: visited) {
-          if (!launched.contains(job)) {
-            infos.get(job).skippedAtLastHeartbeat = true;
+          taskType = TaskType.REDUCE;
+        }
+      }
+
+      // Get the map or reduce schedulables and sort them by fair sharing
+      List<PoolSchedulable> scheds = getPoolSchedulables(taskType);
+      Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
+      boolean foundTask = false;
+      for (Schedulable sched: scheds) { // This loop will assign only one task
+        eventLog.log("INFO", "Checking for " + taskType +
+            " task in " + sched.getName());
+        Task task = taskType == TaskType.MAP ? 
+                    sched.assignTask(tts, currentTime, visitedForMap) : 
+                    sched.assignTask(tts, currentTime, visitedForReduce);
+        if (task != null) {
+          foundTask = true;
+          JobInProgress job = taskTrackerManager.getJob(task.getJobID());
+          eventLog.log("ASSIGN", trackerName, taskType,
+              job.getJobID(), task.getTaskID());
+          // Update running task counts, and the job's locality level
+          if (taskType == TaskType.MAP) {
+            launchedMap.add(job);
+            mapsAssigned++;
+            runningMaps++;
+            updateLastMapLocalityLevel(job, task, tts);
+          } else {
+            reducesAssigned++;
+            runningReduces++;
           }
+          // Add task to the list of assignments
+          tasks.add(task);
+          break; // This break makes this loop assign only one task
+        } // end if(task != null)
+      } // end for(Schedulable sched: scheds)
+
+      // Reject the task type if we cannot find a task
+      if (!foundTask) {
+        if (taskType == TaskType.MAP) {
+          mapRejected = true;
+        } else {
+          reduceRejected = true;
         }
       }
-      // Return if assignMultiple was disabled and we found a task
-      if (!assignMultiple && tasks.size() > 0)
-        return tasks;
-    } // end for(TaskType taskType: MAP_AND_REDUCE)
+    } // end while (true)
+
+    // Mark any jobs that were visited for map tasks but did not launch a task
+    // as skipped on this heartbeat
+    for (JobInProgress job: visitedForMap) {
+      if (!launchedMap.contains(job)) {
+        infos.get(job).skippedAtLastHeartbeat = true;
+      }
+    }
     
     // If no tasks were found, return null
     return tasks.isEmpty() ? null : tasks;

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=882700&r1=882699&r2=882700&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Fri Nov 20 20:05:38 2009
@@ -683,8 +683,8 @@
     
     // Assign tasks and check that jobs alternate in filling slots
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
@@ -742,8 +742,8 @@
     
     // Assign tasks and check that jobs alternate in filling slots
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1",
-                           "attempt_test_0002_m_000000_0 on tt1",
                            "attempt_test_0001_r_000000_0 on tt1",
+                           "attempt_test_0002_m_000000_0 on tt1",
                            "attempt_test_0002_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2",
                            "attempt_test_0002_r_000001_0 on tt2");
@@ -803,12 +803,12 @@
     
     // Check that tasks are filled alternately by the jobs
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
     
     // Check that no new tasks can be launched once the tasktrackers are full
@@ -846,12 +846,12 @@
 
     // Check that tasks are filled alternately by the jobs
     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
     
     // Check scheduler variables; the demands should now be 8 because 2 tasks
@@ -911,12 +911,12 @@
     
     // Check that tasks are filled alternately by the jobs
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1",
-                           "attempt_test_0002_m_000000_0 on tt1",
                            "attempt_test_0001_r_000000_0 on tt1",
+                           "attempt_test_0002_m_000000_0 on tt1",
                            "attempt_test_0002_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2",
-                           "attempt_test_0002_m_000001_0 on tt2",
                            "attempt_test_0001_r_000001_0 on tt2",
+                           "attempt_test_0002_m_000001_0 on tt2",
                            "attempt_test_0002_r_000001_0 on tt2");
     
     // Check that no new tasks can be launched once the tasktrackers are full
@@ -954,12 +954,12 @@
 
     // Check that tasks are filled alternately by the jobs
     checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1",
-                           "attempt_test_0002_m_000002_0 on tt1",
                            "attempt_test_0001_r_000002_0 on tt1",
+                           "attempt_test_0002_m_000002_0 on tt1",
                            "attempt_test_0002_r_000002_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2",
-                           "attempt_test_0002_m_000003_0 on tt2",
                            "attempt_test_0001_r_000003_0 on tt2",
+                           "attempt_test_0002_m_000003_0 on tt2",
                            "attempt_test_0002_r_000003_0 on tt2");
     
     // Check scheduler variables; the demands should now be 8 because 2 tasks
@@ -1016,16 +1016,16 @@
     // type should be handed out alternately to 1, 2, 2, 1, 2, 2, etc.
     System.out.println("HEREEEE");
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
     checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
-    checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
     checkAssignment("tt3", "attempt_test_0002_r_000002_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
     checkAssignment("tt3", "attempt_test_0002_r_000003_0 on tt3");
   }
   
@@ -1101,12 +1101,12 @@
     
     // Assign tasks and check that slots are first given to needy jobs
     checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2");
   }
 
@@ -1181,12 +1181,12 @@
     // Assign tasks and check that slots are first given to needy jobs, but
     // that job 1 gets two tasks after due to having a larger share.
     checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
   }
   
@@ -1257,12 +1257,12 @@
     // Assign tasks and check that slots are first given to needy jobs, but
     // that job 1 gets two tasks after due to having a larger share.
     checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1",
-                           "attempt_test_0003_m_000000_0 on tt1",
                            "attempt_test_0002_r_000000_0 on tt1",
+                           "attempt_test_0003_m_000000_0 on tt1",
                            "attempt_test_0003_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2",
-                           "attempt_test_0001_m_000001_0 on tt2",
                            "attempt_test_0001_r_000000_0 on tt2",
+                           "attempt_test_0001_m_000001_0 on tt2",
                            "attempt_test_0001_r_000001_0 on tt2");
   }
   
@@ -1309,12 +1309,12 @@
     
     // Assign tasks and check that slots are first given to needy jobs
     checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
   }
   
@@ -1361,13 +1361,13 @@
     
     // Assign tasks and check that only jobs 1 and 2 get them
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
   }
 
@@ -1418,13 +1418,13 @@
     
     // Assign tasks and check that slots are given only to jobs 1, 3 and 4
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0004_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
   }
   
@@ -1687,13 +1687,13 @@
     // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
     
     // Ten seconds later, submit job 2.
@@ -1765,13 +1765,13 @@
     // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
     
     // Ten seconds later, submit job 2.
@@ -1798,8 +1798,8 @@
     scheduler.update();
     assertEquals(3, job1.runningMaps());
     assertEquals(2, job1.runningReduces());
-    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
@@ -1844,18 +1844,18 @@
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 6, 6, "pool1");
     advanceTime(100);
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
     advanceTime(100);
     checkAssignment("tt3", "attempt_test_0001_m_000004_0 on tt3");
-    checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
     checkAssignment("tt3", "attempt_test_0001_r_000004_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
     checkAssignment("tt3", "attempt_test_0001_r_000005_0 on tt3");
     advanceTime(100);
     
@@ -1863,8 +1863,8 @@
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool2");
     advanceTime(100);
     checkAssignment("tt4", "attempt_test_0002_m_000000_0 on tt4");
-    checkAssignment("tt4", "attempt_test_0002_m_000001_0 on tt4");
     checkAssignment("tt4", "attempt_test_0002_r_000000_0 on tt4");
+    checkAssignment("tt4", "attempt_test_0002_m_000001_0 on tt4");
     checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
     
     // Submit job 3.
@@ -1900,8 +1900,8 @@
     assertEquals(4, job1.runningMaps());
     assertEquals(4, job1.runningReduces());
     checkAssignment("tt3", "attempt_test_0003_m_000000_0 on tt3");
-    checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
     checkAssignment("tt3", "attempt_test_0003_r_000000_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
     checkAssignment("tt3", "attempt_test_0003_r_000001_0 on tt3");
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
@@ -1941,13 +1941,13 @@
     // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
     
     // Ten seconds later, submit job 2.
@@ -1983,8 +1983,8 @@
     assertEquals(2, job1.runningMaps());
     assertEquals(2, job1.runningReduces());
     checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
@@ -2015,13 +2015,13 @@
     // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
     
     // Ten seconds later, submit job 2.
@@ -2069,13 +2069,13 @@
     // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
     
     // Ten seconds later, submit job 2.
@@ -2385,12 +2385,12 @@
     // Assign tasks and check that they're given first to job3 (because it is
     // high priority), then to job1, then to job2.
     checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
   }
   
@@ -2423,12 +2423,12 @@
     // Assign tasks and check that they alternate between jobs 1 and 3, the
     // head-of-line jobs in their respective pools.
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
   }
   
@@ -2464,12 +2464,12 @@
     // Assign tasks and check that only job 1 gets tasks in pool A, but
     // jobs 3 and 4 both get tasks in pool B.
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0004_r_000000_0 on tt2");
   }
 



Mime
View raw message