hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r824757 [2/3] - in /hadoop/mapreduce/branches/branch-0.21: ./ conf/ src/c++/ src/contrib/ src/contrib/capacity-scheduler/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/h...
Date Tue, 13 Oct 2009 13:44:14 GMT
Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=824757&r1=824756&r2=824757&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Oct 13 13:44:13 2009
@@ -108,21 +108,21 @@
 
     taskTrackerManager.initJob(fjob1);
 
+    //1 map and 1 reduce assigned
     List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
+    //2 map are assigned reached the maxlimit
     List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
 
-    //Once the 2 tasks are running the third assigment should be reduce.
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt3",
-      "attempt_test_0001_r_000001_0 on tt3");
-    //This should fail.
-    List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
-    assertNull(task4);
+    //task3 is null as maxlimit is reached.
+    List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
+    assertNull(task3);
     //Now complete the task 1.
     // complete the job
+    for(Task task: task1) {
     taskTrackerManager.finishTask(
-      task1.get(0).getTaskID().toString(),
+      task.getTaskID().toString(),
       fjob1);
+    }
     //We have completed the tt1 task which was a map task so we expect one map
     //task to be picked up
     checkAssignment(
@@ -154,23 +154,29 @@
 
     taskTrackerManager.initJob(fjob1);
 
+    //1 map and 1 reduce
     List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
+
+    // 1 reduce assigned
     List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
+
+    // No tasks should be assigned, as we have reached the max cap.
     List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
+    assertNull(task3);
 
-    //This should fail. 1 map, 2 reduces , we have reached the limit.
-    List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
-    assertNull(task4);
     //Now complete the task 1 i.e map task.
-    // complete the job
-    taskTrackerManager.finishTask(
-      task1.get(0).getTaskID().toString(),
-      fjob1);
-
-    //This should still fail as only map task is done
-    task4 = scheduler.assignTasks(tracker("tt4"));
-    assertNull(task4);
+    for(Task task: task1) {
+      if (task.isMapTask()) {
+        taskTrackerManager.finishTask(
+          task.getTaskID().toString(),
+          fjob1);
+      }
+    }
 
+    //Still no slots available for reduce hence no tasks
+    //assigned
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    
     //Complete the reduce task
     taskTrackerManager.finishTask(
       task2.get(0).getTaskID().toString(), fjob1);
@@ -403,19 +409,27 @@
 
     // submit a job with no queue specified. It should be accepted
     // and given to the default queue. 
-    JobInProgress j = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+    JobInProgress j = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 
+                                                    10, 10, null, "u1");
+    // when we ask for tasks, we should get them for the job submitted
+    Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+    expectedTaskStrings.put(CapacityTestUtils.MAP, 
+                            "attempt_test_0001_m_000001_0 on tt1");
+    expectedTaskStrings.put(CapacityTestUtils.REDUCE, 
+                            "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+                                      "tt1", expectedTaskStrings);
 
-    // when we ask for a task, we should get one, from the job submitted
-    Task t;
-    t = checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
     // submit another job, to a different queue
     j = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // now when we get a task, it should be from the second job
-    t = checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0002_m_000001_0 on tt2");
+    // now when we get tasks, it should be from the second job
+    expectedTaskStrings.clear();
+    expectedTaskStrings.put(CapacityTestUtils.MAP,
+                              "attempt_test_0002_m_000001_0 on tt2");
+    expectedTaskStrings.put(CapacityTestUtils.REDUCE,
+                              "attempt_test_0002_r_000001_0 on tt2");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+                                  "tt2", expectedTaskStrings);
   }
 
   public void testGetJobs() throws Exception {
@@ -544,19 +558,30 @@
     // submit a job  
     taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the capacity for maps is 2. Since we're the only user,
-    // we should get a task 
-    checkAssignment(
+    // we should get a task
+    Map<String,String> expectedStrings = new HashMap<String,String>();
+    expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
-    // I should get another map task. 
+      expectedStrings);
+
+    // I should get another map task.
+    //No redduces as there is 1 slot only for reduce on TT
     checkAssignment(
       taskTrackerManager, scheduler, "tt1",
       "attempt_test_0001_m_000002_0 on tt1");
+
     // Now we're at full capacity for maps. If I ask for another map task,
-    // I should get a map task from the default queue's capacity. 
-    checkAssignment(
+    // I should get a map task from the default queue's capacity.
+    //same with reduce
+    expectedStrings.put(MAP,"attempt_test_0001_m_000003_0 on tt2");
+    expectedStrings.put(REDUCE,"attempt_test_0001_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0001_m_000003_0 on tt2");
+      expectedStrings);
+    
     // and another
     checkAssignment(
       taskTrackerManager, scheduler, "tt2",
@@ -608,7 +633,8 @@
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("defaultXYZM");
     jConf.setUser("u1");
-    FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
     LOG.debug(
       "Submit another regular memory(1GB vmem maps/reduces) job of "
@@ -620,22 +646,18 @@
     jConf.setNumReduceTasks(2);
     jConf.setQueueName("defaultXYZM");
     jConf.setUser("u1");
-    FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
     // first, a map from j1 will run this is a high memory job so it would
-    // occupy the 2 slots
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
-
-    checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 100.0f, 1, 1);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
-
-    // at this point, the scheduler tries to schedule another map from j1.
-    // there isn't enough space. The second job's reduce should be scheduled.
-    checkAssignment(
+    // occupy the 2 slots and it would try to assign the reduce task from
+    //job2.
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_r_000001_0 on tt1");
+      expectedStrings);
 
     checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 100.0f, 1, 1);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
@@ -644,10 +666,11 @@
     //another task tracker.
     // This should not happen as all the map slots are taken
     //by the first task itself.hence reduce task from the second job is given
-
-    checkAssignment(
+    expectedStrings.clear();
+    expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0002_r_000002_0 on tt2");
+      expectedStrings);
   }
 
   /**
@@ -664,60 +687,71 @@
 
     taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
-    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getMapTSC().setMaxTaskLimit(2);
-    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getReduceTSC().setMaxTaskLimit(2);
+    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+      .getMapTSC().setMaxTaskLimit(2);
+    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+      .getReduceTSC().setMaxTaskLimit(2);
 
 
     // submit a job
     FakeJobInProgress fjob1 =
-      taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 10, 10, "default", "u1");
     FakeJobInProgress fjob2 =
-      taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
+      taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, 10, 10, "default", "u2");
 
     // for queue 'default', the capacity for maps is 2.
     // But the max map limit is 2
     // hence user should be getting not more than 1 as it is the 50%.
-    Task t1 = checkAssignment(
+    //same with reduce
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    populateExpectedStrings(expectedStrings, 
+                  "attempt_test_0001_m_000001_0 on tt1", 
+                  "attempt_test_0001_r_000001_0 on tt1");
+    List<Task> t1 = checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
+      expectedStrings);
 
     //Now we should get the task from the other job. As the
     //first user has reached his max map limit.
+    //same with reduce
+    populateExpectedStrings(expectedStrings, 
+                            "attempt_test_0002_m_000001_0 on tt2",
+                            "attempt_test_0002_r_000001_0 on tt2");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
+
+    //Now we are done with map and reduce limit ,
+    //  now if we ask for task we should
+    // get null.
+    List<Task> t3 = scheduler.assignTasks(tracker("tt3"));
+    assertNull(t3);
+
+    //We completed 1 map and 1 reduce in here
+    for (Task task : t1) {
+      taskTrackerManager.finishTask(
+        task.getTaskID().toString(),
+        fjob1);
+    }
 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0002_m_000001_0 on tt2");
-
-    //Now we are done with map limit , now if we ask for task we should
-    // get reduce from 1st job
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt3",
-      "attempt_test_0001_r_000001_0 on tt3");
-    // Now we're at full capacity for maps. 1 done with reduces for job 1 so
-    // now we should get 1 reduces for job 2
-    Task t4 = checkAssignment(
-      taskTrackerManager, scheduler, "tt4",
-      "attempt_test_0002_r_000001_0 on tt4");
-
-    taskTrackerManager.finishTask(
-      t1.getTaskID().toString(),
-      fjob1);
-
-    //tt1 completed the task so we have 1 map slot for u1
-    // we are assigning the 2nd map task from fjob1
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000002_0 on tt1");
-
-    taskTrackerManager.finishTask(
-      t4.getTaskID().toString(),
-      fjob2);
-    //tt4 completed the task , so we have 1 reduce slot for u2
-    //we are assigning the 2nd reduce from fjob2
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt4",
-      "attempt_test_0002_r_000002_0 on tt4");
-
+    //again we would assign 1 map and 1 reduce
+    populateExpectedStrings(expectedStrings, 
+                            "attempt_test_0001_m_000002_0 on tt1",
+                            "attempt_test_0001_r_000002_0 on tt1");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+  }
+
+  // Utility method to construct a map of expected strings
+  // with exactly one map task and one reduce task.
+  private void populateExpectedStrings(Map<String, String> expectedTaskStrings,
+                        String mapTask, String reduceTask) {
+    expectedTaskStrings.clear();
+    expectedTaskStrings.put(CapacityTestUtils.MAP, mapTask);
+    expectedTaskStrings.put(CapacityTestUtils.REDUCE, reduceTask);
   }
 
 
@@ -736,26 +770,31 @@
 
     // submit a job  
     taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // for queue 'q2', the capacity for maps is 2. Since we're the only user,
-    // we should get a task 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
+    // for queue 'q2', the capacity is 2 for maps and 1 for reduce. 
+    // Since we're the only user, we should get tasks
+    Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+    populateExpectedStrings(expectedTaskStrings, 
+              "attempt_test_0001_m_000001_0 on tt1", 
+              "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+                                  "tt1", expectedTaskStrings);
+
     // Submit another job, from a different user
     taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
-    // Now if I ask for a map task, it should come from the second job 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_m_000001_0 on tt1");
-    // Now we're at full capacity for maps. If I ask for another map task,
-    // I should get a map task from the default queue's capacity. 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0001_m_000002_0 on tt2");
+    // Now if I ask for a task, it should come from the second job
+    checkAssignment(taskTrackerManager, scheduler, 
+        "tt1", "attempt_test_0002_m_000001_0 on tt1");
+
+    // Now we're at full capacity. If I ask for another task,
+    // I should get tasks from the default queue's capacity.
+    populateExpectedStrings(expectedTaskStrings, 
+        "attempt_test_0001_m_000002_0 on tt2", 
+        "attempt_test_0002_r_000001_0 on tt2");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+          "tt2", expectedTaskStrings);
     // and another
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0002_m_000002_0 on tt2");
+    checkAssignment(taskTrackerManager, scheduler, 
+            "tt2", "attempt_test_0002_m_000002_0 on tt2");
   }
 
   // test user limits when a 2nd job is submitted much after first job 
@@ -773,21 +812,28 @@
 
     // submit a job  
     taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // for queue 'q2', the capacity for maps is 2. Since we're the only user,
-    // we should get a task 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
+    // for queue 'q2', the capacity for maps is 2 and reduce is 1. 
+    // Since we're the only user, we should get tasks
+    Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+    populateExpectedStrings(expectedTaskStrings, 
+        "attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+        "tt1", expectedTaskStrings);
+
     // since we're the only job, we get another map
     checkAssignment(
       taskTrackerManager, scheduler, "tt1",
       "attempt_test_0001_m_000002_0 on tt1");
+
     // Submit another job, from a different user
     taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
-    // Now if I ask for a map task, it should come from the second job 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0002_m_000001_0 on tt2");
+    // Now if I ask for a task, it should come from the second job
+    populateExpectedStrings(expectedTaskStrings, 
+        "attempt_test_0002_m_000001_0 on tt2",
+        "attempt_test_0002_r_000001_0 on tt2");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+        "tt2", expectedTaskStrings);
     // and another
     checkAssignment(
       taskTrackerManager, scheduler, "tt2",
@@ -810,41 +856,56 @@
 
     // submit a job  
     FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // for queue 'q2', the capacity for maps is 2. Since we're the only user,
-    // we should get a task 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
+    // for queue 'q2', the capacity for maps is 2 and reduces is 1. 
+    // Since we're the only user, we should get a task
+    Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+    populateExpectedStrings(expectedTaskStrings, 
+        "attempt_test_0001_m_000001_0 on tt1", 
+        "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+        "tt1", expectedTaskStrings);
     // since we're the only job, we get another map
     checkAssignment(
       taskTrackerManager, scheduler, "tt1",
       "attempt_test_0001_m_000002_0 on tt1");
-    // we get two more maps from 'default queue'
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0001_m_000003_0 on tt2");
+    // we get more tasks from 'default queue'
+    populateExpectedStrings(expectedTaskStrings, 
+        "attempt_test_0001_m_000003_0 on tt2",
+        "attempt_test_0001_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+        "tt2", expectedTaskStrings);
     checkAssignment(
       taskTrackerManager, scheduler, "tt2",
       "attempt_test_0001_m_000004_0 on tt2");
+
     // Submit another job, from a different user
     FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
-    // one of the task finishes
+    // one of the task finishes of each type
     taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", j1);
-    // Now if I ask for a map task, it should come from the second job 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_m_000001_0 on tt1");
+    taskTrackerManager.finishTask("attempt_test_0001_r_000001_0", j1);
+    
+    // Now if I ask for a task, it should come from the second job
+    populateExpectedStrings(expectedTaskStrings, 
+        "attempt_test_0002_m_000001_0 on tt1",
+        "attempt_test_0002_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+        "tt1", expectedTaskStrings);
+
     // another task from job1 finishes, another new task to job2
     taskTrackerManager.finishTask("attempt_test_0001_m_000002_0", j1);
     checkAssignment(
       taskTrackerManager, scheduler, "tt1",
       "attempt_test_0002_m_000002_0 on tt1");
+
     // now we have equal number of tasks from each job. Whichever job's
     // task finishes, that job gets a new task
     taskTrackerManager.finishTask("attempt_test_0001_m_000003_0", j1);
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0001_m_000005_0 on tt2");
+    taskTrackerManager.finishTask("attempt_test_0001_r_000002_0", j1);
+    populateExpectedStrings(expectedTaskStrings,
+        "attempt_test_0001_m_000005_0 on tt2",
+        "attempt_test_0001_r_000003_0 on tt2");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+        "tt2", expectedTaskStrings);
     taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", j2);
     checkAssignment(
       taskTrackerManager, scheduler, "tt1",
@@ -853,7 +914,7 @@
 
   // test user limits with many users, more slots
   public void testUserLimits4() throws Exception {
-    // set up one queue, with 10 slots
+    // set up one queue, with 10 map slots and 5 reduce slots
     String[] qs = {"default"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -870,42 +931,31 @@
     // u1 submits job
     FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
     // it gets the first 5 slots
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0001_m_000003_0 on tt2");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0001_m_000004_0 on tt2");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt3",
-      "attempt_test_0001_m_000005_0 on tt3");
+    Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+    for (int i=0; i<5; i++) {
+      String ttName = "tt"+(i+1);
+      populateExpectedStrings(expectedTaskStrings,
+          "attempt_test_0001_m_00000"+(i+1)+"_0 on " + ttName, 
+          "attempt_test_0001_r_00000"+(i+1)+"_0 on " + ttName);
+      checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+          ttName, expectedTaskStrings);
+    }
+      
     // u2 submits job with 4 slots
     FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
     // u2 should get next 4 slots
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt3",
-      "attempt_test_0002_m_000001_0 on tt3");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt4",
-      "attempt_test_0002_m_000002_0 on tt4");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt4",
-      "attempt_test_0002_m_000003_0 on tt4");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt5",
-      "attempt_test_0002_m_000004_0 on tt5");
+    for (int i=0; i<4; i++) {
+      String ttName = "tt"+(i+1);
+      checkAssignment(taskTrackerManager, scheduler, ttName,
+          "attempt_test_0002_m_00000"+(i+1)+"_0 on " + ttName);
+    }
     // last slot should go to u1, since u2 has no more tasks
     checkAssignment(
       taskTrackerManager, scheduler, "tt5",
       "attempt_test_0001_m_000006_0 on tt5");
-    // u1 finishes a task
+    // u1 finishes tasks
     taskTrackerManager.finishTask("attempt_test_0001_m_000006_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0001_r_000005_0", j1);
     // u1 submits a few more jobs 
     // All the jobs are inited when submitted
     // because of addition of Eager Job Initializer all jobs in this
@@ -917,23 +967,26 @@
     taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u2");
     // now u3 submits a job
     taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
-    // next slot should go to u3, even though u2 has an earlier job, since
+    // next map slot should go to u3, even though u2 has an earlier job, since
     // user limits have changed and u1/u2 are over limits
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt5",
-      "attempt_test_0007_m_000001_0 on tt5");
+    // reduce slot will go to job 2, as it is still under limit.
+    populateExpectedStrings(expectedTaskStrings,
+        "attempt_test_0007_m_000001_0 on tt5",
+        "attempt_test_0002_r_000001_0 on tt5");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+        "tt5", expectedTaskStrings);
     // some other task finishes and u3 gets it
     taskTrackerManager.finishTask("attempt_test_0002_m_000004_0", j1);
     checkAssignment(
-      taskTrackerManager, scheduler, "tt5",
-      "attempt_test_0007_m_000002_0 on tt5");
+      taskTrackerManager, scheduler, "tt4",
+      "attempt_test_0007_m_000002_0 on tt4");
     // now, u2 finishes a task
     taskTrackerManager.finishTask("attempt_test_0002_m_000002_0", j1);
     // next slot will go to u1, since u3 has nothing to run and u1's job is 
     // first in the queue
     checkAssignment(
-      taskTrackerManager, scheduler, "tt4",
-      "attempt_test_0001_m_000007_0 on tt4");
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_m_000007_0 on tt2");
   }
 
   /**
@@ -955,13 +1008,13 @@
     // enabled memory-based scheduling
     // Normal job in the cluster would be 1GB maps/reduces
     scheduler.getConf().setLong(
-        JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
+      JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
     scheduler.getConf().setLong(
-        MRConfig.MAPMEMORY_MB, 1 * 1024);
+      MRConfig.MAPMEMORY_MB, 1 * 1024);
     scheduler.getConf().setLong(
-        JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
+      JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
     scheduler.getConf().setLong(
-        MRConfig.REDUCEMEMORY_MB, 1 * 1024);
+      MRConfig.REDUCEMEMORY_MB, 1 * 1024);
     taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
 
@@ -973,7 +1026,8 @@
     jConf.setNumReduceTasks(6);
     jConf.setUser("u1");
     jConf.setQueueName("default");
-    FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
     LOG.debug(
       "Submit one high memory(2GB maps, 2GB reduces) job of "
@@ -985,60 +1039,41 @@
     jConf.setNumReduceTasks(6);
     jConf.setQueueName("default");
     jConf.setUser("u2");
-    FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
-    // Verify that normal job takes 3 task assignments to hit user limits
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_r_000001_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_r_000002_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000003_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_r_000003_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000004_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_r_000004_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000005_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_r_000005_0 on tt1");
+    // Verify that normal job takes 5 task assignments to hit user limits
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    for (int i = 0; i < 5; i++) {
+      expectedStrings.clear();
+      expectedStrings.put(
+        CapacityTestUtils.MAP,
+        "attempt_test_0001_m_00000" + (i + 1) + "_0 on tt1");
+      expectedStrings.put(
+        CapacityTestUtils.REDUCE,
+        "attempt_test_0001_r_00000" + (i + 1) + "_0 on tt1");
+      checkMultipleTaskAssignment(
+        taskTrackerManager, scheduler, "tt1",
+        expectedStrings);
+    }
     // u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
     // are hit. So u2 should get slots
 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_r_000001_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_m_000002_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_r_000002_0 on tt1");
-
-    // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
+    for (int i = 0; i < 2; i++) {
+      expectedStrings.clear();
+      expectedStrings.put(
+        CapacityTestUtils.MAP,
+        "attempt_test_0002_m_00000" + (i + 1) + "_0 on tt1");
+      expectedStrings.put(
+        CapacityTestUtils.REDUCE,
+        "attempt_test_0002_r_00000" + (i + 1) + "_0 on tt1");
+      checkMultipleTaskAssignment(
+        taskTrackerManager, scheduler, "tt1",
+        expectedStrings);
+    }  // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
     // slots. Because of high memory tasks, giving u2 another task would
     // overflow limits. So, no more tasks should be given to anyone.
     assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt1")));
   }
 
   /*
@@ -1051,9 +1086,7 @@
    * - Then run initializationPoller()
    * - Check once again the waiting queue, it should be 5 jobs again.
    * - Then raise status change events.
-   * - Assign one task to a task tracker. (Map)
-   * - Check waiting job count, it should be 4 now and used map (%) = 100
-   * - Assign another one task (Reduce)
+   * - Assign tasks to a task tracker.
    * - Check waiting job count, it should be 4 now and used map (%) = 100
    * and used reduce (%) = 100
    * - finish the job and then check the used percentage it should go
@@ -1066,9 +1099,9 @@
    * - Check the count, the waiting job count should be 2.
    * - Now raise status change events to move the initialized jobs which
    * should be two in count to running queue.
-   * - Then schedule a map of the job in running queue.
+   * - Then schedule a map and reduce of the job in running queue.
    * - Run the poller because the poller is responsible for waiting
-   * jobs count. Check the count, it should be using 100% map and one
+   * jobs count. Check the count, it should be using 100% map, reduce and one
    * waiting job
    * - fail the running job.
    * - Check the count, it should be now one waiting job and zero running
@@ -1161,9 +1194,12 @@
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
     raiseStatusChangeEvents(scheduler.jobQueuesManager, "q2");
     //assign one job
-    Task t1 = checkAssignment(
+    Map<String, String> strs = new HashMap<String, String>();
+    strs.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+    strs.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+    List<Task> t1 = checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
+      strs);
     //Initalize extra job.
     controlledInitializationPoller.selectJobsToInitialize();
 
@@ -1174,24 +1210,22 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 20);
+
+    assertEquals(infoStrings.length, 22);
     assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
     assertEquals(infoStrings[8], "Running tasks: 1");
     assertEquals(infoStrings[9], "Active users:");
     assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
-    assertEquals(infoStrings[14], "Used capacity: 0 (0.0% of Capacity)");
-    assertEquals(infoStrings[15], "Running tasks: 0");
-    assertEquals(infoStrings[18], "Number of Waiting Jobs: 4");
+    assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[15], "Running tasks: 1");
+    assertEquals(infoStrings[20], "Number of Waiting Jobs: 4");
 
-    //assign a reduce task
-    Task t2 = checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_r_000001_0 on tt1");
     // make sure we update our stats
     scheduler.updateContextInfoForTests();
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
+
     assertEquals(infoStrings.length, 22);
     assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
     assertEquals(infoStrings[8], "Running tasks: 1");
@@ -1205,8 +1239,9 @@
 
     //Complete the job and check the running tasks count
     FakeJobInProgress u1j1 = userJobs.get(0);
-    taskTrackerManager.finishTask(t1.getTaskID().toString(), u1j1);
-    taskTrackerManager.finishTask(t2.getTaskID().toString(), u1j1);
+    for (Task task : t1) {
+      taskTrackerManager.finishTask(task.getTaskID().toString(), u1j1);
+    }
     taskTrackerManager.finalizeJob(u1j1);
 
     // make sure we update our stats
@@ -1214,6 +1249,7 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
+
     assertEquals(infoStrings.length, 18);
     assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
     assertEquals(infoStrings[8], "Running tasks: 0");
@@ -1274,9 +1310,12 @@
 
     //Now schedule a map should be job3 of the user as job1 succeeded job2
     //failed and now job3 is running
-    t1 = checkAssignment(
+    strs.clear();
+    strs.put(CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt1");
+    strs.put(CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+    t1 = checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0003_m_000001_0 on tt1");
+      strs);
     FakeJobInProgress u1j3 = userJobs.get(2);
     assertTrue(
       "User Job 3 not running ",
@@ -1290,12 +1329,16 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 20);
+    assertEquals(infoStrings.length, 22);
     assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
     assertEquals(infoStrings[8], "Running tasks: 1");
     assertEquals(infoStrings[9], "Active users:");
     assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
-    assertEquals(infoStrings[18], "Number of Waiting Jobs: 1");
+    assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[15], "Running tasks: 1");
+    assertEquals(infoStrings[16], "Active users:");
+    assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[20], "Number of Waiting Jobs: 1");
 
     //Fail the executing job
     taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
@@ -1347,13 +1390,14 @@
 
     // assert that all tasks are launched even though they transgress the
     // scheduling limits.
-
-    checkAssignment(
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_r_000001_0 on tt1");
+      expectedStrings);
   }
 
   /**
@@ -1366,7 +1410,7 @@
     throws IOException {
 
     // 2 map and 1 reduce slots
-    taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
+    taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
 
     taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -1399,7 +1443,8 @@
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
     jConf.setUser("u1");
-    FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
     LOG.debug(
       "Submit another regular memory(1GB vmem maps/reduces) job of "
@@ -1411,170 +1456,188 @@
     jConf.setNumReduceTasks(2);
     jConf.setQueueName("default");
     jConf.setUser("u1");
-    FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
-    // first, a map from j1 will run
-    checkAssignment(
+    // first, a map from j1 and a reduce from other job j2
+    Map<String,String> strs = new HashMap<String,String>();
+    strs.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+    strs.put(REDUCE,"attempt_test_0002_r_000001_0 on tt1");
+
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
+      strs);
     // Total 2 map slots should be accounted for.
     checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50.0f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
 
-    // at this point, the scheduler tries to schedule another map from j1. 
-    // there isn't enough space. The second job's reduce should be scheduled.
+    //TT has 2 slots for reduces hence this call should get a reduce task
+    //from other job
     checkAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_r_000001_0 on tt1");
-    // Total 1 reduce slot should be accounted for.
-    checkOccupiedSlots(
-      "default", TaskType.REDUCE, 1, 1,
-      100.0f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
+      "attempt_test_0002_r_000002_0 on tt1");
+    checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
+
+    //now as all the slots are occupied hence no more tasks would be
+    //assigned.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
   }
 
-  /**
-   * Test blocking of cluster for lack of memory.
+    /**
+   * Tests that scheduler schedules normal jobs once high RAM jobs
+   * have been reserved to the limit.
+   *
+   * The test causes the scheduler to schedule a normal job on two
+   * trackers, and one task of the high RAM job on a third. Then it
+   * asserts that one of the first two trackers gets a reservation
+   * for the remaining task of the high RAM job. After this, it
+   * asserts that a normal job submitted later is allowed to run
+   * on a free slot, as all tasks of the high RAM job are either
+   * scheduled or reserved.
    *
    * @throws IOException
    */
   public void testClusterBlockingForLackOfMemory()
-    throws IOException {
-
-    LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
-
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    taskTrackerManager.addQueues(new String[]{"default"});
-
-
-    scheduler.setTaskTrackerManager(taskTrackerManager);
-    // enabled memory-based scheduling
-    // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
-    scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
-    scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
-    scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
-    scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
-    taskTrackerManager.setFakeQueues(queues);
-    scheduler.start();
-
-    LOG.debug(
-      "Submit one normal memory(1GB maps/reduces) job of "
-        + "1 map, 1 reduce tasks.");
-    JobConf jConf = new JobConf(conf);
-    jConf.setMemoryForMapTask(1 * 1024);
-    jConf.setMemoryForReduceTask(1 * 1024);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+      throws IOException {
 
-    // Fill the second tt with this job.
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0001_m_000001_0 on tt2");
-    // Total 1 map slot should be accounted for.
-    checkOccupiedSlots("default", TaskType.MAP, 1, 1, 25.0f);
-    assertEquals(
-      String.format(
-        TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
-        1, 1, 0, 0, 0, 0),
-      (String) job1.getSchedulingInfo());
-    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0001_r_000001_0 on tt2");
-    // Total 1 map slot should be accounted for.
-    checkOccupiedSlots(
-      "default", TaskType.REDUCE, 1, 1,
-      25.0f);
-    assertEquals(
-      String.format(
-        TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
-        1, 1, 0, 1, 1, 0),
-      (String) job1.getSchedulingInfo());
-    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
-
-    LOG.debug(
-      "Submit one high memory(2GB maps/reduces) job of "
-        + "2 map, 2 reduce tasks.");
-    jConf = new JobConf(conf);
-    jConf.setMemoryForMapTask(2 * 1024);
-    jConf.setMemoryForReduceTask(2 * 1024);
-    jConf.setNumMapTasks(2);
-    jConf.setNumReduceTasks(2);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
-
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_m_000001_0 on tt1");
-    // Total 3 map slots should be accounted for.
-    checkOccupiedSlots("default", TaskType.MAP, 1, 3, 75.0f);
-    assertEquals(
-      String.format(
-        TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
-        1, 2, 0, 0, 0, 0),
-      (String) job2.getSchedulingInfo());
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+      LOG.debug("Starting the scheduler.");
+      taskTrackerManager = new FakeTaskTrackerManager(3, 2, 2);
 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_r_000001_0 on tt1");
-    // Total 3 reduce slots should be accounted for.
-    checkOccupiedSlots(
-      "default", TaskType.REDUCE, 1, 3,
-      75.0f);
-    assertEquals(
-      String.format(
-        TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
-        1, 2, 0, 1, 2, 0),
-      (String) job2.getSchedulingInfo());
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
-
-    LOG.debug(
-      "Submit one normal memory(1GB maps/reduces) job of "
-        + "1 map, 0 reduce tasks.");
-    jConf = new JobConf(conf);
-    jConf.setMemoryForMapTask(1 * 1024);
-    jConf.setMemoryForReduceTask(1 * 1024);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    FakeJobInProgress job3 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
-
-    // Job2 cannot fit on tt1. So tt1 is reserved for a map slot of job2
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-
-    // reserved tasktrackers contribute to occupied slots for maps.
-    checkOccupiedSlots("default", TaskType.MAP, 1, 5, 125.0f);
-    // occupied slots for reduces remain unchanged as tt1 is not reserved for
-    // reduces.
-    checkOccupiedSlots("default", TaskType.REDUCE, 1, 3, 75.0f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
-    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
-    LOG.info(job2.getSchedulingInfo());
-    assertEquals(
-      String.format(
-        TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
-        1, 2, 2, 1, 2, 0),
-      (String) job2.getSchedulingInfo());
-    assertEquals(
-      String.format(
-        TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
-        0, 0, 0, 0, 0, 0),
-      (String) job3.getSchedulingInfo());
-
-    // One reservation is already done for job2. So job3 should go ahead.
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0003_m_000001_0 on tt2");
-  }
+      ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+      queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
+      taskTrackerManager.addQueues(new String[]{"default"});
+      scheduler.setTaskTrackerManager(taskTrackerManager);
+      // enabled memory-based scheduling
+      // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
+      scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
+      scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
+      scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
+      scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
+      taskTrackerManager.setFakeQueues(queues);
+      scheduler.start();
+
+      LOG.debug(
+        "Submit one normal memory(1GB maps/reduces) job of "
+          + "2 map, 2 reduce tasks.");
+      JobConf jConf = new JobConf(conf);
+      jConf.setMemoryForMapTask(1 * 1024);
+      jConf.setMemoryForReduceTask(1 * 1024);
+      jConf.setNumMapTasks(2);
+      jConf.setNumReduceTasks(2);
+      jConf.setQueueName("default");
+      jConf.setUser("u1");
+      FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, jConf);
+
+      // Fill a tt with this job's tasks.
+      Map<String, String> expectedStrings = new HashMap<String, String>();
+      expectedStrings.put(
+        CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+      expectedStrings.put(
+        CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+      checkMultipleTaskAssignment(
+        taskTrackerManager, scheduler, "tt1",
+        expectedStrings);
+      // Total 1 map slot should be accounted for.
+      checkOccupiedSlots("default", TaskType.MAP, 1, 1, 16.7f);
+      checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 16.7f);
+      assertEquals(
+        String.format(
+          TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
+          1, 1, 0, 1, 1, 0),
+        (String) job1.getSchedulingInfo());
+      checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
+
+      expectedStrings.clear();
+      expectedStrings.put(
+        CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt2");
+      expectedStrings.put(
+        CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+
+      // fill another TT with the rest of the tasks of the job
+      checkMultipleTaskAssignment(
+        taskTrackerManager, scheduler, "tt2",
+        expectedStrings);
+
+      LOG.debug(
+        "Submit one high memory(2GB maps/reduces) job of "
+          + "2 map, 2 reduce tasks.");
+      jConf = new JobConf(conf);
+      jConf.setMemoryForMapTask(2 * 1024);
+      jConf.setMemoryForReduceTask(2 * 1024);
+      jConf.setNumMapTasks(2);
+      jConf.setNumReduceTasks(2);
+      jConf.setQueueName("default");
+      jConf.setUser("u1");
+      FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, jConf);
+
+      // Have another TT run one task of each type of the high RAM
+      // job. This will fill up the TT.
+      expectedStrings.clear();
+      expectedStrings.put(
+        CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt3");
+      expectedStrings.put(
+        CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt3");
+
+      checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+          "tt3", expectedStrings);
+      checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
+      checkOccupiedSlots("default", TaskType.REDUCE, 1, 4, 66.7f);
+      assertEquals(
+        String.format(
+          TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
+          1, 2, 0, 1, 2, 0),
+        (String) job2.getSchedulingInfo());
+      checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 2 * 1024L);
+
+      LOG.debug(
+        "Submit one normal memory(1GB maps/reduces) job of "
+          + "1 map, 1 reduce tasks.");
+      jConf = new JobConf(conf);
+      jConf.setMemoryForMapTask(1 * 1024);
+      jConf.setMemoryForReduceTask(1 * 1024);
+      jConf.setNumMapTasks(1);
+      jConf.setNumReduceTasks(1);
+      jConf.setQueueName("default");
+      jConf.setUser("u1");
+      FakeJobInProgress job3 = taskTrackerManager.submitJobAndInit(
+        JobStatus.PREP, jConf);
+
+      // Send a TT with insufficient space for task assignment,
+      // This will cause a reservation for the high RAM job.
+      assertNull(scheduler.assignTasks(tracker("tt1")));
+
+      // reserved tasktrackers contribute to occupied slots for maps and reduces
+      checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
+      checkOccupiedSlots("default", TaskType.REDUCE, 1, 6, 100.0f);
+      checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
+      LOG.info(job2.getSchedulingInfo());
+      assertEquals(
+        String.format(
+          TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
+          1, 2, 2, 1, 2, 2),
+        (String) job2.getSchedulingInfo());
+      assertEquals(
+        String.format(
+          TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
+          0, 0, 0, 0, 0, 0),
+        (String) job3.getSchedulingInfo());
+
+      // Reservations are already done for job2. So job3 should go ahead.
+      expectedStrings.clear();
+      expectedStrings.put(
+        CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt2");
+      expectedStrings.put(
+        CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt2");
+
+      checkMultipleTaskAssignment(
+        taskTrackerManager, scheduler, "tt2",
+        expectedStrings);
+    }
 
   /**
    * Testcase to verify fix for a NPE (HADOOP-5641), when memory based
@@ -1613,24 +1676,21 @@
     jConf.setMemoryForReduceTask(512);
     jConf.setQueueName("default");
     jConf.setUser("u1");
-    FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
-    // 1st cycle - 1 map gets assigned.
-    Task t = checkAssignment(
+    // 1st cycle - 1 map and reduce gets assigned.
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+    List<Task> t = checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
-    // Total 1 map slot should be accounted for.
+      expectedStrings);
+    // Total 1 map slot and 1 reduce slot should be accounted for.
     checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50.0f);
-    checkMemReservedForTasksOnTT("tt1", 512L, 0L);
-
-    // 1st cycle of reduces - 1 reduce gets assigned.
-    Task t1 = checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_r_000001_0 on tt1");
-    // Total 1 reduce slot should be accounted for.
-    checkOccupiedSlots(
-      "default", TaskType.REDUCE, 1, 1,
-      50.0f);
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50.0f);
     checkMemReservedForTasksOnTT("tt1", 512L, 512L);
 
     // kill this job !
@@ -1653,20 +1713,21 @@
     jConf.setMemoryForReduceTask(512);
     jConf.setQueueName("default");
     jConf.setUser("u1");
-    FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
     // since with HADOOP-5964, we don't rely on a job conf to get
     // the memory occupied, scheduling should be able to work correctly.
-    t1 = checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_m_000001_0 on tt1");
-    checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50);
-    checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1");
 
-    // assign a reduce now.
-    t1 = checkAssignment(
+    List<Task> t1 = checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_r_000001_0 on tt1");
+      expectedStrings);
+    checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50);
     checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50);
     checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
 
@@ -1674,22 +1735,22 @@
     assertNull(scheduler.assignTasks(tracker("tt1")));
 
     // finish the tasks on the tracker.
-    taskTrackerManager.finishTask(t.getTaskID().toString(), job1);
-    taskTrackerManager.finishTask(t1.getTaskID().toString(), job1);
+    for (Task task : t) {
+      taskTrackerManager.finishTask(task.getTaskID().toString(), job1);
+    }
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt1");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt1");
 
     // now a new task can be assigned.
-    t = checkAssignment(
+    t = checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_m_000002_0 on tt1");
+      expectedStrings);
     checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
-    // memory used will change because of the finished task above.
-    checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
-
-    // reduce can be assigned.
-    t = checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_r_000002_0 on tt1");
     checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
+    // memory used will change because of the finished task above.
     checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
   }
 
@@ -1721,9 +1782,10 @@
     JobInitializationPoller initPoller = scheduler.getInitializationPoller();
 
     // submit 4 jobs each for 3 users.
-    HashMap<String, ArrayList<FakeJobInProgress>> userJobs = taskTrackerManager.submitJobs(
-      3,
-      4, "default");
+    HashMap<String, ArrayList<FakeJobInProgress>> userJobs =
+      taskTrackerManager.submitJobs(
+        3,
+        4, "default");
 
     // get the jobs submitted.
     ArrayList<FakeJobInProgress> u1Jobs = userJobs.get("u1");
@@ -1782,31 +1844,37 @@
     raiseStatusChangeEvents(mgr);
 
     // get some tasks assigned.
-    Task t1 = checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
-    Task t2 = checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_r_000001_0 on tt1");
-    Task t3 = checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0002_m_000001_0 on tt2");
-    Task t4 = checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0002_r_000001_0 on tt2");
-    taskTrackerManager.finishTask(
-      t1.getTaskID().toString(), u1Jobs.get(
-        0));
-    taskTrackerManager.finishTask(
-      t2.getTaskID().toString(), u1Jobs.get(
-        0));
-    taskTrackerManager.finishTask(
-      t3.getTaskID().toString(), u1Jobs.get(
-        1));
-    taskTrackerManager.finishTask(
-      t4.getTaskID().toString(), u1Jobs.get(
-        1));
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+
+    List<Task> t1 = checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt2");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt2");
+
+    List<Task> t2 = checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
+
+    for (Task task : t1) {
+      taskTrackerManager.finishTask(
+        task.getTaskID().toString(), u1Jobs.get(
+          0));
+    }
 
+    for (Task task : t2) {
+      taskTrackerManager.finishTask(
+        task.getTaskID().toString(), u1Jobs.get(
+          0));
+    }
     // as some jobs have running tasks, the poller will now
     // pick up new jobs to initialize.
     controlledInitializationPoller.selectJobsToInitialize();
@@ -1827,19 +1895,21 @@
       "Initialized jobs contains the user1 job 2",
       initializedJobs.contains(u1Jobs.get(1).getJobID()));
 
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt1");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+
     // finish one more job
-    t1 = checkAssignment(
+    t1 = checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0003_m_000001_0 on tt1");
-    t2 = checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0003_r_000001_0 on tt1");
-    taskTrackerManager.finishTask(
-      t1.getTaskID().toString(), u1Jobs.get(
-        2));
-    taskTrackerManager.finishTask(
-      t2.getTaskID().toString(), u1Jobs.get(
-        2));
+      expectedStrings);
+    for (Task task : t1) {
+      taskTrackerManager.finishTask(
+        task.getTaskID().toString(), u1Jobs.get(
+          2));
+    }
 
     // no new jobs should be picked up, because max user limit
     // is still 3.
@@ -1847,19 +1917,21 @@
 
     assertEquals(initializedJobs.size(), 5);
 
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0004_m_000001_0 on tt1");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0004_r_000001_0 on tt1");
+
     // run 1 more jobs.. 
-    t1 = checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0004_m_000001_0 on tt1");
-    t1 = checkAssignment(
+    t1 = checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0004_r_000001_0 on tt1");
-    taskTrackerManager.finishTask(
-      t1.getTaskID().toString(), u1Jobs.get(
-        3));
-    taskTrackerManager.finishTask(
-      t2.getTaskID().toString(), u1Jobs.get(
-        3));
+      expectedStrings);
+    for (Task task : t1) {
+      taskTrackerManager.finishTask(
+        task.getTaskID().toString(), u1Jobs.get(
+          3));
+    }
 
     // Now initialised jobs should contain user 4's job, as
     // user 1's jobs are all done and the number of users is
@@ -1968,12 +2040,13 @@
       taskTrackerManager.submitJob(JobStatus.PREP, 1, 1, "q1", "u1");
     controlledInitializationPoller.selectJobsToInitialize();
     raiseStatusChangeEvents(scheduler.jobQueuesManager, "q1");
-    Task t = checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
-    t = checkAssignment(
+    Map<String,String> strs = new HashMap<String,String>();
+    strs.put(CapacityTestUtils.MAP,"attempt_test_0001_m_000001_0 on tt1");
+    strs.put(CapacityTestUtils.REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_r_000001_0 on tt1");
+      strs);
+
   }
 
   public void testFailedJobInitalizations() throws Exception {
@@ -2049,37 +2122,41 @@
     conf.setReduceSpeculativeExecution(true);
     //Submit a job which would have one speculative map and one speculative
     //reduce.
-    FakeJobInProgress fjob1 = taskTrackerManager.submitJob(JobStatus.PREP, conf);
+    FakeJobInProgress fjob1 = taskTrackerManager.submitJob(
+      JobStatus.PREP, conf);
 
     conf = new JobConf();
     conf.setNumMapTasks(1);
     conf.setNumReduceTasks(1);
     //Submit a job which has no speculative map or reduce.
-    FakeJobInProgress fjob2 = taskTrackerManager.submitJob(JobStatus.PREP, conf);
+    FakeJobInProgress fjob2 = taskTrackerManager.submitJob(
+      JobStatus.PREP, conf);
 
     //Ask the poller to initalize all the submitted job and raise status
     //change event.
     controlledInitializationPoller.selectJobsToInitialize();
     raiseStatusChangeEvents(mgr);
-
-    checkAssignment(
+    Map<String, String> strs = new HashMap<String, String>();
+    strs.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+    strs.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
+      strs);
     assertTrue(
       "Pending maps of job1 greater than zero",
       (fjob1.pendingMaps() == 0));
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0001_m_000001_1 on tt2");
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_r_000001_0 on tt1");
+
     assertTrue(
-      "Pending reduces of job2 greater than zero",
+      "Pending reduces of job1 greater than zero",
       (fjob1.pendingReduces() == 0));
-    checkAssignment(
+
+    Map<String, String> str = new HashMap<String, String>();
+    str.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_1 on tt2");
+    str.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_1 on tt2");
+
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0001_r_000001_1 on tt2");
+      str);
 
     taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", fjob1);
     taskTrackerManager.finishTask("attempt_test_0001_m_000001_1", fjob1);
@@ -2087,12 +2164,13 @@
     taskTrackerManager.finishTask("attempt_test_0001_r_000001_1", fjob1);
     taskTrackerManager.finalizeJob(fjob1);
 
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment(
+    str.clear();
+    str.put(CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1");
+    str.put(CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_r_000001_0 on tt1");
+      str);
     taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", fjob2);
     taskTrackerManager.finishTask("attempt_test_0002_r_000001_0", fjob2);
     taskTrackerManager.finalizeJob(fjob2);
@@ -2267,7 +2345,8 @@
     jConf.setNumReduceTasks(6);
     jConf.setQueueName("default");
     jConf.setUser("u1");
-    FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
     // Submit a normal job to the other queue.
     jConf = new JobConf(conf);
@@ -2277,108 +2356,178 @@
     jConf.setNumReduceTasks(6);
     jConf.setUser("u1");
     jConf.setQueueName("q1");
-    FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
-    // Map 1 of high memory job
-    checkAssignment(
+    // Map and reduce of high memory job should be assigned
+    HashMap<String, String> expectedStrings = new HashMap<String, String>();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
+      expectedStrings);
+
     checkQueuesOrder(
       qs, scheduler
         .getOrderedQueues(TaskType.MAP));
 
-    // Reduce 1 of high memory job
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_r_000001_0 on tt1");
     checkQueuesOrder(
       qs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
 
-    // Map 1 of normal job
-    checkAssignment(
+    // 1st map and reduce of normal job should be assigned
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_m_000001_0 on tt1");
+      expectedStrings);
+
     checkQueuesOrder(
       reversedQs, scheduler
         .getOrderedQueues(TaskType.MAP));
-
-    // Reduce 1 of normal job
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_r_000001_0 on tt1");
     checkQueuesOrder(
       reversedQs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
 
-    // Map 2 of normal job
-    checkAssignment(
+    // 2nd map and reduce of normal job should be assigned
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt1");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt1");
+
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_m_000002_0 on tt1");
+      expectedStrings);
     checkQueuesOrder(
       reversedQs, scheduler
         .getOrderedQueues(TaskType.MAP));
-
-    // Reduce 2 of normal job
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0002_r_000002_0 on tt1");
     checkQueuesOrder(
       reversedQs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
 
     // Now both the queues are equally served. But the comparator doesn't change
     // the order if queues are equally served.
+    // Hence, 3rd map and reduce of normal job should be assigned
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0002_m_000003_0 on tt2");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0002_r_000003_0 on tt2");
 
-    // Map 3 of normal job
-    checkAssignment(
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0002_m_000003_0 on tt2");
+      expectedStrings);
+
     checkQueuesOrder(
       reversedQs, scheduler
         .getOrderedQueues(TaskType.MAP));
 
-    // Reduce 3 of normal job
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0002_r_000003_0 on tt2");
     checkQueuesOrder(
       reversedQs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
 
-    // Map 2 of high memory job
-    checkAssignment(
+    // 2nd map and reduce of high memory job should be assigned
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt2");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0001_m_000002_0 on tt2");
+      expectedStrings);
     checkQueuesOrder(
       qs, scheduler
         .getOrderedQueues(TaskType.MAP));
 
-    // Reduce 2 of high memory job
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0001_r_000002_0 on tt2");
     checkQueuesOrder(
       qs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
 
-    // Map 4 of normal job
-    checkAssignment(
+    // 4th map and reduce of normal job should be assigned.
+    expectedStrings.clear();
+    expectedStrings.put(
+      CapacityTestUtils.MAP, "attempt_test_0002_m_000004_0 on tt2");
+    expectedStrings.put(
+      CapacityTestUtils.REDUCE, "attempt_test_0002_r_000004_0 on tt2");
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0002_m_000004_0 on tt2");
+      expectedStrings);
     checkQueuesOrder(
       reversedQs, scheduler
         .getOrderedQueues(TaskType.MAP));
 
-    // Reduce 4 of normal job
-    checkAssignment(
-      taskTrackerManager, scheduler, "tt2",
-      "attempt_test_0002_r_000004_0 on tt2");
     checkQueuesOrder(
       reversedQs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
   }
 
+  /**
+   * Tests whether 1 map and 1 reduce are assigned even if reduces span across
+   * multiple jobs or multiple queues.
+   *
+   * creates a cluster of 6 maps and 2 reduces.
+   * Submits 2 jobs:
+   * job1 , with 6 map and 1 reduces
+   * job2 with  2 map and 1 reduces
+   *
+   *
+   * check that first assignment assigns a map and a reduce.
+   * check that second assignment assigns a map and a reduce
+   * (both from other job and other queue)
+   *
+   * the last 2 calls just checks to make sure that we dont get further reduces
+   * 
+   * @throws Exception
+   */
+  public void testMultiTaskAssignmentInMultipleQueues() throws Exception {
+    setUp(1, 6, 2);
+    // set up some queues
+    String[] qs = {"default", "q1"};
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+    queues.add(new FakeQueueInfo("q1", 50.0f, true, 25));
+    taskTrackerManager.setFakeQueues(queues);
+    scheduler.start();
+
+    //Submit the job with 6 maps and 2 reduces
+    taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, 6, 1, "default", "u1");
+
+    FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, 2, 1, "q1", "u2");
+
+    Map<String, String> str = new HashMap<String, String>();
+    str.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+    str.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
+
+    // next assignment will be for job in second queue.
+    str.clear();
+    str.put(MAP, "attempt_test_0002_m_000001_0 on tt1");
+    str.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
+
+    //now both the reduce slots are being used , hence we sholdnot get only 1
+    //map task in this assignTasks call.
+    str.clear();
+    str.put(MAP, "attempt_test_0002_m_000002_0 on tt1");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
+
+    str.clear();
+    str.put(MAP, "attempt_test_0001_m_000002_0 on tt1");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
+  }
+
+
   private void checkRunningJobMovementAndCompletion() throws IOException {
 
     JobQueuesManager mgr = scheduler.jobQueuesManager;
@@ -2402,12 +2551,13 @@
       mgr.getJobQueue("default").getRunningJobs().contains(job));
 
     // assign a task
-    Task t = checkAssignment(
-      taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_m_000001_0 on tt1");
-    t = checkAssignment(
+    Map<String,String> strs = new HashMap<String,String>();
+    strs.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+    strs.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+
+    checkMultipleTaskAssignment(
       taskTrackerManager, scheduler, "tt1",
-      "attempt_test_0001_r_000001_0 on tt1");
+      strs);
 
     controlledInitializationPoller.selectJobsToInitialize();
 
@@ -2551,14 +2701,14 @@
         tracker(taskTracker).getStatus(),
         TaskType.REDUCE);
     if (expectedMemForMapsOnTT == null) {
-      assertTrue(observedMemForMapsOnTT == null);
+      assertEquals(observedMemForMapsOnTT,null);
     } else {
-      assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
+      assertEquals(observedMemForMapsOnTT,expectedMemForMapsOnTT);
     }
     if (expectedMemForReducesOnTT == null) {
-      assertTrue(observedMemForReducesOnTT == null);
+      assertEquals(observedMemForReducesOnTT,null);
     } else {
-      assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
+      assertEquals(observedMemForReducesOnTT,expectedMemForReducesOnTT);
     }
   }
 



Mime
View raw message