hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cdoug...@apache.org
Subject svn commit: r786377 [2/2] - in /hadoop/core/branches/HADOOP-4687/mapred/src/contrib: ./ capacity-scheduler/ capacity-scheduler/src/java/org/apache/hadoop/mapred/ capacity-scheduler/src/test/org/apache/hadoop/mapred/ data_join/ dynamic-scheduler/ fairsc...
Date Fri, 19 Jun 2009 05:42:54 GMT
Modified: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=786377&r1=786376&r2=786377&view=diff
==============================================================================
--- hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/branches/HADOOP-4687/mapred/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Jun 19 05:42:53 2009
@@ -40,7 +40,6 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecurityUtil.AccessControlList;
 
-
 public class TestCapacityScheduler extends TestCase {
 
   static final Log LOG =
@@ -146,17 +145,24 @@
   }
 
   private ControlledInitializationPoller controlledInitializationPoller;
-
+  /*
+   * Fake job in progress object used for testing the schedulers scheduling
+   * decisions. The JobInProgress objects returns out FakeTaskInProgress
+   * objects when assignTasks is called. If speculative maps and reduces
+   * are configured then JobInProgress returns exactly one Speculative
+   * map and reduce task.
+   */
   static class FakeJobInProgress extends JobInProgress {
     
-    private FakeTaskTrackerManager taskTrackerManager;
+    protected FakeTaskTrackerManager taskTrackerManager;
     private int mapTaskCtr;
     private int redTaskCtr;
     private Set<TaskInProgress> mapTips = 
       new HashSet<TaskInProgress>();
     private Set<TaskInProgress> reduceTips = 
       new HashSet<TaskInProgress>();
-    
+    private int speculativeMapTaskCounter = 0;
+    private int speculativeReduceTaskCounter = 0;
     public FakeJobInProgress(JobID jId, JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, String user) {
       super(jId, jobConf, null);
@@ -175,8 +181,6 @@
       }
       mapTaskCtr = 0;
       redTaskCtr = 0;
-      super.setMaxVirtualMemoryForTask(jobConf.getMaxVirtualMemoryForTask());
-      super.setMaxPhysicalMemoryForTask(jobConf.getMaxPhysicalMemoryForTask());
     }
     
     @Override
@@ -187,8 +191,14 @@
     @Override
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
         int ignored) throws IOException {
-      if (mapTaskCtr == numMapTasks) return null;
-      TaskAttemptID attemptId = getTaskAttemptID(true);
+      boolean areAllMapsRunning = (mapTaskCtr == numMapTasks);
+      if (areAllMapsRunning){
+        if(!getJobConf().getMapSpeculativeExecution() || 
+            speculativeMapTasks > 0) {
+          return null;
+        }
+      }
+      TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
       Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
         @Override
         public String toString() {
@@ -198,16 +208,39 @@
       taskTrackerManager.startTask(tts.getTrackerName(), task);
       runningMapTasks++;
       // create a fake TIP and keep track of it
-      mapTips.add(new FakeTaskInProgress(getJobID(), 
-          getJobConf(), task, true, this));
+      FakeTaskInProgress mapTip = new FakeTaskInProgress(getJobID(), 
+          getJobConf(), task, true, this);
+      mapTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
+      if(areAllMapsRunning) {
+        speculativeMapTasks++;
+        //you have scheduled a speculative map. Now set all tips in the
+        //map tips not to have speculative task.
+        for(TaskInProgress t : mapTips) {
+          if (t instanceof FakeTaskInProgress) {
+            FakeTaskInProgress mt = (FakeTaskInProgress) t;
+            mt.hasSpeculativeMap = false;
+          }
+        }
+      } else {
+        //add only non-speculative tips.
+        mapTips.add(mapTip);
+        //add the tips to the JobInProgress TIPS
+        maps = mapTips.toArray(new TaskInProgress[mapTips.size()]);
+      }
       return task;
     }
-    
+
     @Override
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
-      if (redTaskCtr == numReduceTasks) return null;
-      TaskAttemptID attemptId = getTaskAttemptID(false);
+      boolean areAllReducesRunning = (redTaskCtr == numReduceTasks);
+      if (areAllReducesRunning){
+        if(!getJobConf().getReduceSpeculativeExecution() || 
+            speculativeReduceTasks > 0) {
+          return null;
+        }
+      }
+      TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
       Task task = new ReduceTask("", attemptId, 0, 10) {
         @Override
         public String toString() {
@@ -217,8 +250,25 @@
       taskTrackerManager.startTask(tts.getTrackerName(), task);
       runningReduceTasks++;
       // create a fake TIP and keep track of it
-      reduceTips.add(new FakeTaskInProgress(getJobID(), 
-          getJobConf(), task, false, this));
+      FakeTaskInProgress reduceTip = new FakeTaskInProgress(getJobID(), 
+          getJobConf(), task, false, this);
+      reduceTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
+      if(areAllReducesRunning) {
+        speculativeReduceTasks++;
+        //you have scheduled a speculative map. Now set all tips in the
+        //map tips not to have speculative task.
+        for(TaskInProgress t : reduceTips) {
+          if (t instanceof FakeTaskInProgress) {
+            FakeTaskInProgress rt = (FakeTaskInProgress) t;
+            rt.hasSpeculativeReduce = false;
+          }
+        }
+      } else {
+        //add only non-speculative tips.
+        reduceTips.add(reduceTip);
+        //add the tips to the JobInProgress TIPS
+        reduces = reduceTips.toArray(new TaskInProgress[reduceTips.size()]);
+      }
       return task;
     }
     
@@ -232,14 +282,19 @@
       finishedReduceTasks++;
     }
     
-    private TaskAttemptID getTaskAttemptID(boolean isMap) {
+    private TaskAttemptID getTaskAttemptID(boolean isMap, boolean isSpeculative) {
       JobID jobId = getJobID();
       TaskType t = TaskType.REDUCE;
       if (isMap) {
         t = TaskType.MAP;
       }
-      return new TaskAttemptID(jobId.getJtIdentifier(),
-          jobId.getId(), t, (isMap)?++mapTaskCtr: ++redTaskCtr, 0);
+      if (!isSpeculative) {
+        return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
+            (isMap) ? ++mapTaskCtr : ++redTaskCtr, 0);
+      } else  {
+        return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
+            (isMap) ? mapTaskCtr : redTaskCtr, 1);
+      }
     }
     
     @Override
@@ -270,12 +325,15 @@
       this.status.setRunState(JobStatus.FAILED);
     }
   }
-  
+ 
   static class FakeTaskInProgress extends TaskInProgress {
     private boolean isMap;
     private FakeJobInProgress fakeJob;
     private TreeMap<TaskAttemptID, String> activeTasks;
     private TaskStatus taskStatus;
+    boolean hasSpeculativeMap;
+    boolean hasSpeculativeReduce;
+    
     FakeTaskInProgress(JobID jId, JobConf jobConf, Task t, 
         boolean isMap, FakeJobInProgress job) {
       super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0);
@@ -287,6 +345,16 @@
       this.taskStatus = TaskStatus.createTaskStatus(isMap);
       taskStatus.setProgress(0.5f);
       taskStatus.setRunState(TaskStatus.State.RUNNING);
+      if (jobConf.getMapSpeculativeExecution()) {
+        //resetting of the hasSpeculativeMap is done
+        //when speculative map is scheduled by the job.
+        hasSpeculativeMap = true;
+      } 
+      if (jobConf.getReduceSpeculativeExecution()) {
+        //resetting of the hasSpeculativeReduce is done
+        //when speculative reduce is scheduled by the job.
+        hasSpeculativeReduce = true;
+      }
     }
     
     @Override
@@ -561,13 +629,13 @@
   // represents a fake queue configuration info
   static class FakeQueueInfo {
     String queueName;
-    float gc;
+    float capacity;
     boolean supportsPrio;
     int ulMin;
 
-    public FakeQueueInfo(String queueName, float gc, boolean supportsPrio, int ulMin) {
+    public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin) {
       this.queueName = queueName;
-      this.gc = gc;
+      this.capacity = capacity;
       this.supportsPrio = supportsPrio;
       this.ulMin = ulMin;
     }
@@ -597,10 +665,10 @@
     }*/
     
     public float getCapacity(String queue) {
-      if(queueMap.get(queue).gc == -1) {
+      if(queueMap.get(queue).capacity == -1) {
         return super.getCapacity(queue);
       }
-      return queueMap.get(queue).gc;
+      return queueMap.get(queue).capacity;
     }
     
     public int getMinimumUserLimitPercent(String queue) {
@@ -666,6 +734,9 @@
         resConf.getQueues());
     scheduler.setInitializationPoller(controlledInitializationPoller);
     scheduler.setConf(conf);
+    //by default disable speculative execution.
+    conf.setMapSpeculativeExecution(false);
+    conf.setReduceSpeculativeExecution(false);
   }
   
   @Override
@@ -678,7 +749,7 @@
   private FakeJobInProgress submitJob(int state, JobConf jobConf) throws IOException {
     FakeJobInProgress job =
         new FakeJobInProgress(new JobID("test", ++jobCounter),
-            (jobConf == null ? new JobConf() : jobConf), taskTrackerManager,
+            (jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager,
             jobConf.getUser());
     job.getStatus().setRunState(state);
     taskTrackerManager.submitJob(job);
@@ -852,13 +923,6 @@
     return queue.toArray(new JobInProgress[0]);
   }
   
-  /*protected void submitJobs(int number, int state, int maps, int reduces)
-    throws IOException {
-    for (int i = 0; i < number; i++) {
-      submitJob(state, maps, reduces);
-    }
-  }*/
-  
   // tests if tasks can be assinged when there are multiple jobs from a same
   // user
   public void testJobFinished() throws Exception {
@@ -995,7 +1059,7 @@
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    // set the gc % as 10%, so that gc will be zero initially as 
+    // set the capacity % as 10%, so that capacity will be zero initially as 
     // the cluster capacity increase slowly.
     queues.add(new FakeQueueInfo("default", 10.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 90.0f, true, 25));
@@ -1029,7 +1093,7 @@
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt5");
     // now job from default should run, as it is furthest away
-    // in terms of runningMaps / gc.
+    // in terms of runningMaps / capacity.
     checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
     verifyCapacity("1", "default");
     verifyCapacity("9", "q2");
@@ -1040,7 +1104,7 @@
     String schedInfo = taskTrackerManager.getQueueManager().
                           getSchedulerInfo(queue).toString();    
     assertTrue(schedInfo.contains("Map tasks\nCapacity: " 
-        + expectedCapacity));
+        + expectedCapacity + " slots"));
   }
   
   // test capacity transfer
@@ -1221,7 +1285,82 @@
     // first in the queue
     checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
   }
-  
+
+  /**
+   * Test to verify that high memory jobs hit user limits faster than any normal
+   * job.
+   * 
+   * @throws IOException
+   */
+  public void testUserLimitsForHighMemoryJobs()
+      throws IOException {
+    taskTrackerManager = new FakeTaskTrackerManager(1, 10, 10);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    String[] qs = { "default" };
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
+    resConf.setFakeQueues(queues);
+    // enabled memory-based scheduling
+    // Normal job in the cluster would be 1GB maps/reduces
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // Submit one normal job to the other queue.
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(6);
+    jConf.setNumReduceTasks(6);
+    jConf.setUser("u1");
+    jConf.setQueueName("default");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+        + "6 map and 6 reduce tasks");
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(2 * 1024);
+    jConf.setNumMapTasks(6);
+    jConf.setNumReduceTasks(6);
+    jConf.setQueueName("default");
+    jConf.setUser("u2");
+    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // Verify that normal job takes 3 task assignments to hit user limits
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000005_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000005_0 on tt1");
+    // u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
+    // are hit. So u2 should get slots
+
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+
+    // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
+    // slots. Because of high memory tasks, giving u2 another task would
+    // overflow limits. So, no more tasks should be given to anyone.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+  }
+
   /*
    * Following is the testing strategy for testing scheduling information.
    * - start capacity scheduler with two queues.
@@ -1271,21 +1410,35 @@
     scheduler.assignTasks(tracker("tt1")); // heartbeat
     scheduler.assignTasks(tracker("tt2")); // heartbeat
     int totalMaps = taskTrackerManager.getClusterStatus().getMaxMapTasks();
-    int totalReduces = taskTrackerManager.getClusterStatus().getMaxReduceTasks();
+    int totalReduces =
+        taskTrackerManager.getClusterStatus().getMaxReduceTasks();
     QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
-    String schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo();
-    String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo();
+    String schedulingInfo =
+        queueManager.getJobQueueInfo("default").getSchedulingInfo();
+    String schedulingInfo2 =
+        queueManager.getJobQueueInfo("q2").getSchedulingInfo();
     String[] infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[1] , "Capacity Percentage: 50.0%");
-    assertEquals(infoStrings[6] , "Capacity: " + totalMaps * 50/100);
-    assertEquals(infoStrings[10] , "Capacity: " + totalReduces * 50/100);
-    assertEquals(infoStrings[2] , "User Limit: 25%");
-    assertEquals(infoStrings[3] , "Priority Supported: YES");
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 0");
-    assertEquals(infoStrings[15] , "Number of users who have submitted jobs: 0");
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[0], "Queue configuration");
+    assertEquals(infoStrings[1], "Capacity Percentage: 50.0%");
+    assertEquals(infoStrings[2], "User Limit: 25%");
+    assertEquals(infoStrings[3], "Priority Supported: YES");
+    assertEquals(infoStrings[4], "-------------");
+    assertEquals(infoStrings[5], "Map tasks");
+    assertEquals(infoStrings[6], "Capacity: " + totalMaps * 50 / 100
+        + " slots");
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[9], "-------------");
+    assertEquals(infoStrings[10], "Reduce tasks");
+    assertEquals(infoStrings[11], "Capacity: " + totalReduces * 50 / 100
+        + " slots");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[14], "-------------");
+    assertEquals(infoStrings[15], "Job info");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 0");
+    assertEquals(infoStrings[17], "Number of users who have submitted jobs: 0");
     assertEquals(schedulingInfo, schedulingInfo2);
 
     //Testing with actual job submission.
@@ -1296,10 +1449,13 @@
     infoStrings = schedulingInfo.split("\n");
 
     //waiting job should be equal to number of jobs submitted.
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 5");
+    assertEquals(infoStrings[17], "Number of users who have submitted jobs: 1");
 
     //Initalize the jobs but don't raise events
     controlledInitializationPoller.selectJobsToInitialize();
@@ -1307,12 +1463,14 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
+    assertEquals(infoStrings.length, 18);
     //should be previous value as nothing is scheduled because no events
     //has been raised after initialization.
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 5");
 
     //Raise status change event so that jobs can move to running queue.
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
@@ -1329,10 +1487,14 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 18);
-    assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
-    assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[16] , "Number of Waiting Jobs: 4");
+    assertEquals(infoStrings.length, 20);
+    assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 1");
+    assertEquals(infoStrings[9], "Active users:");
+    assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[14], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[15], "Running tasks: 0");
+    assertEquals(infoStrings[18], "Number of Waiting Jobs: 4");
 
     //assign a reduce task
     Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
@@ -1341,10 +1503,16 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 20);
-    assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
-    assertEquals(infoStrings[13],"Running tasks: 100.0% of Capacity");
-    assertEquals(infoStrings[18] , "Number of Waiting Jobs: 4");
+    assertEquals(infoStrings.length, 22);
+    assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 1");
+    assertEquals(infoStrings[9], "Active users:");
+    assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[15], "Running tasks: 1");
+    assertEquals(infoStrings[16], "Active users:");
+    assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[20], "Number of Waiting Jobs: 4");
 
     //Complete the job and check the running tasks count
     FakeJobInProgress u1j1 = userJobs.get(0);
@@ -1357,10 +1525,12 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 4");
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 4");
 
     //Fail a job which is initialized but not scheduled and check the count.
     FakeJobInProgress u1j2 = userJobs.get(1);
@@ -1374,10 +1544,14 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 3");
+    assertEquals(infoStrings.length, 18);
+    //should be previous value as nothing is scheduled because no events
+    //has been raised after initialization.
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 3");
 
     //Fail a job which is not initialized but is in the waiting queue.
     FakeJobInProgress u1j5 = userJobs.get(4);
@@ -1392,10 +1566,14 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 2");
+    assertEquals(infoStrings.length, 18);
+    //should be previous value as nothing is scheduled because no events
+    //has been raised after initialization.
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 2");
 
     //Raise status change events as none of the intialized jobs would be
     //in running queue as we just failed the second job which was initialized
@@ -1418,10 +1596,12 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 18);
-    assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
-    assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[16] , "Number of Waiting Jobs: 1");
+    assertEquals(infoStrings.length, 20);
+    assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 1");
+    assertEquals(infoStrings[9], "Active users:");
+    assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[18], "Number of Waiting Jobs: 1");
 
     //Fail the executing job
     taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
@@ -1431,11 +1611,10 @@
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 16);
-    assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
-    assertEquals(infoStrings[14] , "Number of Waiting Jobs: 1");
-
+    assertEquals(infoStrings.length, 18);
+    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[16], "Number of Waiting Jobs: 1");
   }
 
   /**
@@ -1449,12 +1628,6 @@
     LOG.debug("Starting the scheduler.");
     taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
 
-    // Limited TT - 1GB vmem and 512MB pmem
-    taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
-        .setTotalVirtualMemory(1 * 1024 * 1024 * 1024L);
-    taskTrackerManager.getTaskTracker("tt1").getResourceStatus()
-        .setTotalPhysicalMemory(512 * 1024 * 1024L);
-
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -1464,11 +1637,11 @@
     // memory-based scheduling disabled by default.
     scheduler.start();
 
-    LOG.debug("Submit one high memory(3GB vmem, 1GBpmem) job of 1 map task "
-        + "and 1 reduce task.");
+    LOG.debug("Submit one high memory job of 1 3GB map task "
+        + "and 1 1GB reduce task.");
     JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
-    jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
+    jConf.setMemoryForMapTask(3 * 1024L); // 3GB
+    jConf.setMemoryForReduceTask(1 * 1024L); // 1 GB
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(1);
     jConf.setQueueName("default");
@@ -1483,193 +1656,59 @@
   }
 
   /**
-   * Test to verify that highPmemJobs are scheduled like all other jobs when
-   * physical-memory based scheduling is not enabled.
-   * @throws IOException
-   */
-  public void testDisabledPmemBasedScheduling()
-      throws IOException {
-
-    LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-
-    // Limited TT - 100GB vmem and 500MB pmem
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(100 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(500 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
-
-    taskTrackerManager.addQueues(new String[] { "default" });
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.setTaskTrackerManager(taskTrackerManager);
-    // enable vmem-based scheduling. pmem based scheduling disabled by default.
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1536 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        3 * 1024 * 1024 * 1024L);
-    scheduler.start();
-
-    LOG.debug("Submit one high pmem(3GB vmem, 1GBpmem) job of 1 map task "
-        + "and 1 reduce task.");
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(3 * 1024 * 1024 * 1024L); // 3GB vmem
-    jConf.setMaxPhysicalMemoryForTask(1 * 1024 * 1024 * 1024L); // 1 GB pmem
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    submitJobAndInit(JobStatus.RUNNING, jConf);
-
-    // assert that all tasks are launched even though they transgress the
-    // scheduling limits.
-
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-  }
-
-  /**
-   * Test HighMemoryJobs.
-   * @throws IOException
-   */
-  public void testHighMemoryJobs()
-      throws IOException {
-
-    LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
-    // Normal job on this TT would be 1.5GB vmem, 0.5GB pmem
-
-    taskTrackerManager.addQueues(new String[] { "default" });
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setTaskTrackerManager(taskTrackerManager);
-    // enabled memory-based scheduling
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1536 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        3 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-
-    LOG.debug("Submit one high memory(1600MB vmem, 400MB pmem) job of "
-        + "1 map task and 1 reduce task.");
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(1600 * 1024 * 1024L); // 1.6GB vmem
-    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-
-    // No more tasks of this job can run on the TT because of lack of vmem
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-
-    // Let attempt_test_0001_m_000001_0 finish, task assignment should succeed.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-
-    LOG.debug("Submit another high memory(1200MB vmem, 800MB pmem) job of "
-        + "1 map task and 0 reduces.");
-    jConf.setMaxVirtualMemoryForTask(1200 * 1024 * 1024L);
-    jConf.setMaxPhysicalMemoryForTask(800 * 1024 * 1024L);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(0);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    submitJobAndInit(JobStatus.PREP, jConf); // job2
-
-    // This job shouldn't run the TT now because of lack of pmem
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-
-    // Let attempt_test_0001_m_000002_0 finish, task assignment should succeed.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job1);
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-
-    LOG.debug("Submit a normal memory(200MB vmem, 100MB pmem) job of "
-        + "0 maps and 1 reduce task.");
-    jConf.setMaxVirtualMemoryForTask(200 * 1024 * 1024L);
-    jConf.setMaxPhysicalMemoryForTask(100 * 1024 * 1024L);
-    jConf.setNumMapTasks(0);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    submitJobAndInit(JobStatus.PREP, jConf); // job3
-
-    checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
-  }
-
-  /**
-   * Test HADOOP-4979. 
-   * Bug fix for making sure we always return null to TT if there is a 
-   * high-mem job, and not look at reduce jobs (if map tasks are high-mem)
-   * or vice-versa.
+   * Test reverting HADOOP-4979. If there is a high-mem job, we should now look
+   * at reduce jobs (if map tasks are high-mem) or vice-versa.
+   * 
    * @throws IOException
    */
-  public void testHighMemoryBlocking()
+  public void testHighMemoryBlockingAcrossTaskTypes()
       throws IOException {
 
     // 2 map and 1 reduce slots
     taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
 
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
-    // Normal job on this TT would be 1GB vmem, 0.5GB pmem
-
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1 * 1024 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        3 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
+    // Normal job in the cluster would be 1GB maps/reduces
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
-    // We need a situation where the scheduler needs to run a map task, 
-    // but the available one has a high-mem requirement. There should
-    // be another job whose maps or reduces can run, but they shouldn't 
-    // be scheduled.
+    // The situation : Two jobs in the queue. First job with only maps and no
+    // reduces and is a high memory job. Second job is a normal job with both
+    // maps and reduces.
+    // First job cannot run for want of memory for maps. In this case, second
+    // job's reduces should run.
     
-    LOG.debug("Submit one high memory(2GB vmem, 400MB pmem) job of "
+    LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
         + "2 map tasks");
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
-    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(0);
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
-    LOG.debug("Submit another regular memory(900MB vmem, 200MB pmem) job of "
+
+    LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
         + "2 map/red tasks");
-    jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(900 * 1024 * 1024L); // 900MB vmem
-    jConf.setMaxPhysicalMemoryForTask(200 * 1024 * 1024L); // 200MB pmem
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(2);
     jConf.setQueueName("default");
@@ -1678,76 +1717,17 @@
     
     // first, a map from j1 will run
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    // at this point, the scheduler tries to schedule another map from j1. 
-    // there isn't enough space. There is space to run the second job's
-    // map or reduce task, but they shouldn't be scheduled
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-  }
-  
-  /**
-   * test invalid highMemoryJobs
-   * @throws IOException
-   */
-  public void testHighMemoryJobWithInvalidRequirements()
-      throws IOException {
-    LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024);
-    ttStatus.setReservedPhysicalMemory(0);
+    // Total 2 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 100.0f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
 
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    taskTrackerManager.addQueues(new String[] { "default" });
-    resConf.setFakeQueues(queues);
-    scheduler.setTaskTrackerManager(taskTrackerManager);
-    // enabled memory-based scheduling
-    long vmemUpperLimit = 1 * 1024 * 1024 * 1024L;
-    long vmemDefault = 1536 * 1024 * 1024L;
-    long pmemUpperLimit = vmemUpperLimit;
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        vmemDefault);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        vmemUpperLimit);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(pmemUpperLimit);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-
-    LOG.debug("Submit one invalid high ram(5GB vmem, 3GB pmem) job of "
-        + "1 map, 0 reduce tasks.");
-    long jobMaxVmem = 5 * 1024 * 1024 * 1024L;
-    long jobMaxPmem = 3 * 1024 * 1024 * 1024L;
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(jobMaxVmem);
-    jConf.setMaxPhysicalMemoryForTask(jobMaxPmem);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(0);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-
-    boolean throwsException = false;
-    String msg = null;
-    FakeJobInProgress job;
-    try {
-      job = submitJob(JobStatus.PREP, jConf);
-    } catch (IOException ioe) {
-      // job has to fail
-      throwsException = true;
-      msg = ioe.getMessage();
-    }
-
-    assertTrue(throwsException);
-    job = (FakeJobInProgress) taskTrackerManager.getJobs().toArray()[0];
-    assertTrue(msg.matches(job.getJobID() + " \\(" + jobMaxVmem + "vmem, "
-        + jobMaxPmem + "pmem\\) exceeds the cluster's max-memory-limits \\("
-        + vmemUpperLimit + "vmem, " + pmemUpperLimit
-        + "pmem\\). Cannot run in this cluster, so killing it."));
-    // For job, no cleanup task needed so gets killed immediately.
-    assertTrue(job.getStatus().getRunState() == JobStatus.KILLED);
+    // at this point, the scheduler tries to schedule another map from j1. 
+    // there isn't enough space. The second job's reduce should be scheduled.
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    // Total 1 reduce slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+        100.0f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
   }
 
   /**
@@ -1758,13 +1738,7 @@
       throws IOException {
 
     LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
+    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
 
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -1772,68 +1746,103 @@
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        1536 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        4 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(33.3f);
-    resConf.setLimitMaxPmemForTasks(2 * 1024 * 1024 * 1024L);
+    // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
-    LOG.debug("Submit one high memory(4GB vmem, 512MB pmem) job of "
-        + "1 map, 0 reduce tasks.");
-    JobConf jConf = new JobConf();
-    jConf.setMaxVirtualMemoryForTask(4 * 1024 * 1024 * 1024L);
-    jConf.setMaxPhysicalMemoryForTask(512 * 1024 * 1024L);
+    LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
+        + "1 map, 1 reduce tasks.");
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
     jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(0);
+    jConf.setNumReduceTasks(1);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
-    // TTs should not run these jobs i.e. cluster blocked because of lack of
-    // vmem
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-
-    // Job should still be alive
-    assertTrue(job1.getStatus().getRunState() == JobStatus.RUNNING);
 
-    LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
-    // Use cluster-wide defaults
-    jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
-    jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
+    // Fill the second tt with this job.
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    // Total 1 map slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 25.0f);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 0, 0),
+        (String) job1.getSchedulingInfo());
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    // Total 1 map slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+        25.0f);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 1, 1),
+        (String) job1.getSchedulingInfo());
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+
+    LOG.debug("Submit one high memory(2GB maps/reduces) job of "
+        + "2 map, 2 reduce tasks.");
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(2 * 1024);
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(2);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    // cluster should still be blocked for job1 and so even job2 should not run
-    // even though it is a normal job
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-
-    scheduler.taskTrackerManager.killJob(job2.getJobID());
-    scheduler.taskTrackerManager.killJob(job1.getJobID());
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    // Total 3 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 0, 0),
+        (String) job2.getSchedulingInfo());
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    // Total 3 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
+        75.0f);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+        (String) job2.getSchedulingInfo());
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
 
-    LOG.debug("Submit one high memory(2GB vmem, 2GB pmem) job of "
+    LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
         + "1 map, 0 reduce tasks.");
-    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L);
-    jConf.setMaxPhysicalMemoryForTask(2 * 1024 * 1024 * 1024L);
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
     FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
-    // TTs should not run these jobs i.e. cluster blocked because of lack of
-    // pmem now.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    
-    // Job should still be alive
-    assertTrue(job3.getStatus().getRunState() == JobStatus.RUNNING);
-
-    LOG.debug("Submit a normal job of 1 map, 0 reduce tasks.");
-    // Use cluster-wide defaults
-    jConf.setMaxVirtualMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
-    jConf.setMaxPhysicalMemoryForTask(JobConf.DISABLED_MEMORY_LIMIT);
-    submitJobAndInit(JobStatus.PREP, jConf); // job4
 
-    // cluster should still be blocked for job3 and so even job4 should not run
-    // even though it is a normal job
+    // Job2 cannot fit on tt2 or tt1. Blocking. Job3 also will not run.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
+        75.0f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+        (String) job2.getSchedulingInfo());
+    assertEquals(String.format(
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 0, 0),
+        (String) job3.getSchedulingInfo());
   }
 
   /**
@@ -1847,13 +1856,6 @@
     // create a cluster with a single node.
     LOG.debug("Starting cluster with 1 tasktracker, 2 map and 2 reduce slots");
     taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
-    TaskTrackerStatus.ResourceStatus ttStatus =
-        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
-    LOG.debug("Assume TT has 4 GB virtual mem and 2 GB RAM");
-    ttStatus.setTotalVirtualMemory(4 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedVirtualMemory(0);
-    ttStatus.setTotalPhysicalMemory(2 * 1024 * 1024 * 1024L);
-    ttStatus.setReservedPhysicalMemory(0);
 
     // create scheduler
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -1862,14 +1864,17 @@
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
-    LOG.debug("By default, jobs get 0.5 GB per task vmem" +
-        " and 2 GB max vmem, with 50% of it for RAM");
-    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
-        512 * 1024 * 1024L);
-    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-        2 * 1024 * 1024 * 1024L);
-    resConf.setDefaultPercentOfPmemInVmem(50.0f);
-    resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
+    LOG.debug("Assume TT has 2GB for maps and 2GB for reduces");
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 512);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        2 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 512);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
     
@@ -1878,15 +1883,31 @@
     JobConf jConf = new JobConf();
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(2);
+    jConf.setMemoryForMapTask(512);
+    jConf.setMemoryForReduceTask(512);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
 
     // 1st cycle - 1 map gets assigned.
     Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    // Total 1 map slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
+    checkMemReservedForTasksOnTT("tt1",  512L, 0L);
+
+    // 1st cycle of reduces - 1 reduce gets assigned.
+    Task t1 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    // Total 1 reduce slot should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+        50.0f);
+    checkMemReservedForTasksOnTT("tt1",  512L, 512L);
     
     // kill this job !
     taskTrackerManager.killJob(job1.getJobID());
+    // No more map/reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 0, 0, 0.0f);
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 0, 0,
+        0.0f);
     
     // retire the job
     taskTrackerManager.removeJob(job1.getJobID());
@@ -1896,6 +1917,8 @@
     jConf = new JobConf();
     jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(1);
+    jConf.setMemoryForMapTask(512);
+    jConf.setMemoryForReduceTask(512);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
@@ -1903,30 +1926,30 @@
     // 2nd cycle - nothing should get assigned. Memory matching code
     // will see the job is missing and fail memory requirements.
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    checkMemReservedForTasksOnTT("tt1", null, null);
+
     // calling again should not make a difference, as the task is still running
     assertNull(scheduler.assignTasks(tracker("tt1")));
+    checkMemReservedForTasksOnTT("tt1", null, null);
     
-    // finish the task on the tracker.
+    // finish the tasks on the tracker.
     taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
+    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), job1);
+
     // now a new task can be assigned.
     t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    // Total 1 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
+    checkMemReservedForTasksOnTT("tt1", 512L, 0L);
+
     // reduce can be assigned.
     t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    // Total 1 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+        50.0f);
+    checkMemReservedForTasksOnTT("tt1", 512L, 512L);
   }
-  
-  protected TaskTrackerStatus tracker(String taskTrackerName) {
-    return taskTrackerManager.getTaskTracker(taskTrackerName);
-  }
-  
-  protected Task checkAssignment(String taskTrackerName,
-      String expectedTaskString) throws IOException {
-    List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
-    assertNotNull(expectedTaskString, tasks);
-    assertEquals(expectedTaskString, 1, tasks.size());
-    assertEquals(expectedTaskString, tasks.get(0).toString());
-    return tasks.get(0);
-  }
-  
+
   /*
    * Test cases for Job Initialization poller.
    */
@@ -2133,10 +2156,9 @@
     checkFailedInitializedJobMovement();
 
     // Check failed waiting job movement
-    checkFailedWaitingJobMovement();
-    
+    checkFailedWaitingJobMovement(); 
   }
-  
+
   public void testStartWithoutDefaultQueueConfigured() throws Exception {
     //configure a single queue which is not default queue
     String[] qs = {"q1"};
@@ -2188,6 +2210,382 @@
     assertFalse("Waiting job contains submitted job", 
         mgr.getRunningJobQueue("default").contains(job));
   }
+  
+  /**
+   * Test case deals with normal jobs which have speculative maps and reduce.
+   * Following is test executed
+   * <ol>
+   * <li>Submit one job with speculative maps and reduce.</li>
+   * <li>Submit another job with no speculative execution.</li>
+   * <li>Observe that all tasks from first job get scheduled, speculative
+   * and normal tasks</li>
+   * <li>Finish all the first jobs tasks second jobs tasks get scheduled.</li>
+   * </ol>
+   * @throws IOException
+   */
+  public void testSpeculativeTaskScheduling() throws IOException {
+    String[] qs = {"default"};
+    taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    JobQueuesManager mgr = scheduler.jobQueuesManager;
+    JobConf conf = new JobConf();
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    conf.setMapSpeculativeExecution(true);
+    conf.setReduceSpeculativeExecution(true);
+    //Submit a job which would have one speculative map and one speculative
+    //reduce.
+    FakeJobInProgress fjob1 = submitJob(JobStatus.PREP, conf);
+    
+    conf = new JobConf();
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    //Submit a job which has no speculative map or reduce.
+    FakeJobInProgress fjob2 = submitJob(JobStatus.PREP, conf);    
+
+    //Ask the poller to initalize all the submitted job and raise status
+    //change event.
+    controlledInitializationPoller.selectJobsToInitialize();
+    raiseStatusChangeEvents(mgr);
+
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    assertTrue("Pending maps of job1 greater than zero", 
+        (fjob1.pendingMaps() == 0));
+    checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    assertTrue("Pending reduces of job2 greater than zero", 
+        (fjob1.pendingReduces() == 0));
+    checkAssignment("tt2", "attempt_test_0001_r_000001_1 on tt2");
+
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", fjob1);
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1", fjob1);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", fjob1);
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_1", fjob1);
+    taskTrackerManager.finalizeJob(fjob1);
+    
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", fjob2);
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", fjob2);
+    taskTrackerManager.finalizeJob(fjob2);    
+  }
+
+  /**
+   * Test case to test scheduling of
+   * <ol> 
+   * <li>High ram job with speculative map execution.
+   * <ul>
+   * <li>Submit one high ram job which has speculative map.</li>
+   * <li>Submit a normal job which has no speculative map.</li>
+   * <li>Scheduler should schedule first all map tasks from first job and block
+   * the cluster till both maps from first job get completed.
+   * </ul>
+   * </li>
+   * <li>High ram job with speculative reduce execution.
+   * <ul>
+   * <li>Submit one high ram job which has speculative reduce.</li>
+   * <li>Submit a normal job which has no speculative reduce.</li>
+   * <li>Scheduler should schedule first all reduce tasks from first job and
+   * block the cluster till both reduces are completed.</li>
+   * </ul>
+   * </li>
+   * </ol>
+   * @throws IOException
+   */
+  public void testHighRamJobWithSpeculativeExecution() throws IOException {
+    // 2 TTs, 3 map and 3 reduce slots on each TT
+    taskTrackerManager = new FakeTaskTrackerManager(2, 3, 3);
+
+    taskTrackerManager.addQueues(new String[] { "default" });
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enabled memory-based scheduling
+    // 1GB for each map, 1GB for each reduce
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        3 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        3 * 1024L);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024L);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    //Submit a high memory job with speculative tasks.
+    JobConf jConf = new JobConf();
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(0);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    jConf.setMapSpeculativeExecution(true);
+    jConf.setReduceSpeculativeExecution(false);
+    FakeJobInProgress job1 =
+        new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
+            taskTrackerManager, "u1");
+    taskTrackerManager.submitJob(job1);
+
+    //Submit normal job
+    jConf = new JobConf();
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(0);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    jConf.setMapSpeculativeExecution(false);
+    jConf.setReduceSpeculativeExecution(false);
+    FakeJobInProgress job2 = submitJob(JobStatus.PREP, jConf);
+
+    controlledInitializationPoller.selectJobsToInitialize();
+    raiseStatusChangeEvents(scheduler.jobQueuesManager);
+
+    // first, a map from j1 will run
+    // at this point, there is a speculative task for the same job to be
+    //scheduled. This task would be scheduled. Till the tasks from job1 gets
+    //complete none of the tasks from other jobs would be scheduled.
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+    assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
+    // Total 2 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+
+    //make same tracker get back, check if you are blocking. Your job
+    //has speculative map task so tracker should be blocked even tho' it
+    //can run job2's map.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    // Total 2 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
+    //TT2 now gets speculative map of the job1
+    checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
+    // Total 4 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 4, 66.7f);
+    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
+
+    // Now since the first job has no more speculative maps, it can schedule
+    // the second job.
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    // Total 5 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 5, 83.3f);
+    checkMemReservedForTasksOnTT("tt1", 3 * 1024L, 0L);
+
+    //finish everything
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", 
+        job1);
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1", 
+        job1);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", 
+        job2);
+    taskTrackerManager.finalizeJob(job1);
+    taskTrackerManager.finalizeJob(job2);
+    
+    //Now submit high ram job with speculative reduce and check.
+    jConf = new JobConf();
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(2 * 1024L);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    jConf.setMapSpeculativeExecution(false);
+    jConf.setReduceSpeculativeExecution(true);
+    FakeJobInProgress job3 =
+        new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
+            taskTrackerManager, "u1");
+    taskTrackerManager.submitJob(job3);
+
+    //Submit normal job w.r.t reduces
+    jConf = new JobConf();
+    jConf.setMemoryForMapTask(1 * 1024L);
+    jConf.setMemoryForReduceTask(1 * 1024L);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    jConf.setMapSpeculativeExecution(false);
+    jConf.setReduceSpeculativeExecution(false);
+    FakeJobInProgress job4 = submitJob(JobStatus.PREP, jConf);
+    
+    controlledInitializationPoller.selectJobsToInitialize();
+    raiseStatusChangeEvents(scheduler.jobQueuesManager);
+
+    // Finish up the map scheduler
+    checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+    // Total 2 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
+    checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
+    // Total 3 map slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 50.0f);
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
+
+    // first, a reduce from j3 will run
+    // at this point, there is a speculative task for the same job to be
+    //scheduled. This task would be scheduled. Till the tasks from job3 gets
+    //complete none of the tasks from other jobs would be scheduled.
+    checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
+    assertEquals("pending reduces greater than zero ", job3.pendingReduces(),
+        0);
+    // Total 2 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
+        33.3f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2*1024L);
+
+    //make same tracker get back, check if you are blocking. Your job
+    //has speculative reduce task so tracker should be blocked even tho' it
+    //can run job4's reduce.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    // Total 2 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
+        33.3f);
+
+    //TT2 now gets speculative reduce of the job3
+    checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
+    // Total 4 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 4,
+        66.7f);
+    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 2 * 1024L);
+
+    // Now since j3 has no more speculative reduces, it can schedule
+    // the j4.
+    checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
+    // Total 5 reduce slots should be accounted for.
+    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 5,
+        83.3f);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 3 * 1024L);
+  }
+
+  /**
+   * Test to verify that queue ordering is based on the number of slots occupied
+   * and hence to verify that presence of high memory jobs is reflected properly
+   * while determining used capacities of queues and hence the queue ordering.
+   * 
+   * @throws IOException
+   */
+  public void testQueueOrdering()
+      throws IOException {
+    taskTrackerManager = new FakeTaskTrackerManager(2, 6, 6);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    String[] qs = { "default", "q1" };
+    String[] reversedQs = { qs[1], qs[0] };
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 100));
+    queues.add(new FakeQueueInfo("q1", 50.0f, true, 100));
+    resConf.setFakeQueues(queues);
+    // enabled memory-based scheduling
+    // Normal job in the cluster would be 1GB maps/reduces
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+        + "6 map and 6 reduce tasks");
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(2 * 1024);
+    jConf.setNumMapTasks(6);
+    jConf.setNumReduceTasks(6);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // Submit a normal job to the other queue.
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(6);
+    jConf.setNumReduceTasks(6);
+    jConf.setUser("u1");
+    jConf.setQueueName("q1");
+    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // Map 1 of high memory job
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkQueuesOrder(qs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 1 of high memory job
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkQueuesOrder(qs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Map 1 of normal job
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 1 of normal job
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Map 2 of normal job
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 2 of normal job
+    checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Now both the queues are equally served. But the comparator doesn't change
+    // the order if queues are equally served.
+
+    // Map 3 of normal job
+    checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 3 of normal job
+    checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Map 2 of high memory job
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkQueuesOrder(qs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 2 of high memory job
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkQueuesOrder(qs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+
+    // Map 4 of normal job
+    checkAssignment("tt2", "attempt_test_0002_m_000004_0 on tt2");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+
+    // Reduce 4 of normal job
+    checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2");
+    checkQueuesOrder(reversedQs, scheduler
+        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+  }
 
   private void checkRunningJobMovementAndCompletion() throws IOException {
     
@@ -2332,6 +2730,89 @@
       userJobs.put(user, jips);
     }
     return userJobs;
+  }
+
+  
+  protected TaskTrackerStatus tracker(String taskTrackerName) {
+    return taskTrackerManager.getTaskTracker(taskTrackerName);
+  }
+  
+  protected Task checkAssignment(String taskTrackerName,
+      String expectedTaskString) throws IOException {
+    List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+    assertNotNull(expectedTaskString, tasks);
+    assertEquals(expectedTaskString, 1, tasks.size());
+    assertEquals(expectedTaskString, tasks.get(0).toString());
+    return tasks.get(0);
+  }
 
+  /**
+   * Get the amount of memory that is reserved for tasks on the taskTracker and
+   * verify that it matches what is expected.
+   * 
+   * @param taskTracker
+   * @param expectedMemForMapsOnTT
+   * @param expectedMemForReducesOnTT
+   */
+  private void checkMemReservedForTasksOnTT(String taskTracker,
+      Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
+    Long observedMemForMapsOnTT =
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+            CapacityTaskScheduler.TYPE.MAP);
+    Long observedMemForReducesOnTT =
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
+            CapacityTaskScheduler.TYPE.REDUCE);
+    if (expectedMemForMapsOnTT == null) {
+      assertTrue(observedMemForMapsOnTT == null);
+    } else {
+      assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
+    }
+    if (expectedMemForReducesOnTT == null) {
+      assertTrue(observedMemForReducesOnTT == null);
+    } else {
+      assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
+    }
+  }
+
+  /**
+   * Verify the number of slots of type 'type' from the queue 'queue'.
+   * 
+   * @param queue
+   * @param type
+   * @param numActiveUsers in the queue at present.
+   * @param expectedOccupiedSlots
+   * @param expectedOccupiedSlotsPercent
+   * @return
+   */
+  private void checkOccupiedSlots(String queue,
+      CapacityTaskScheduler.TYPE type, int numActiveUsers,
+      int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) {
+    scheduler.updateQSIInfoForTests();
+    QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
+    String schedulingInfo =
+        queueManager.getJobQueueInfo(queue).getSchedulingInfo();
+    String[] infoStrings = schedulingInfo.split("\n");
+    int index = -1;
+    if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+      index = 7;
+    } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+      index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers);
+    }
+    LOG.info(infoStrings[index]);
+    assertEquals(String.format("Used capacity: %d (%.1f%% of Capacity)",
+        expectedOccupiedSlots, expectedOccupiedSlotsPercent),
+        infoStrings[index]);
+  }
+
+  private void checkQueuesOrder(String[] expectedOrder, String[] observedOrder) {
+    assertTrue("Observed and expected queues are not of same length.",
+        expectedOrder.length == observedOrder.length);
+    int i = 0;
+    for (String expectedQ : expectedOrder) {
+      assertTrue("Observed and expected queues are not in the same order. "
+          + "Differ at index " + i + ". Got " + observedOrder[i]
+          + " instead of " + expectedQ, expectedQ.equals(observedOrder[i]));
+      i++;
+    }
   }
 }

Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/data_join/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/data_join:713112
+/hadoop/core/trunk/src/contrib/data_join:776175-786373

Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/dynamic-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19/src/contrib/dynamic-scheduler:713112
-/hadoop/core/trunk/src/contrib/dynamic-scheduler:784975-785643
+/hadoop/core/trunk/src/contrib/dynamic-scheduler:784975-786373

Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/fairscheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/fairscheduler:713112
+/hadoop/core/trunk/src/contrib/fairscheduler:776175-786373

Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/index/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/index:713112
+/hadoop/core/trunk/src/contrib/index:776175-786373

Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/mrunit/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -0,0 +1,2 @@
+/hadoop/core/branches/branch-0.19/mapred/src/contrib/mrunit:713112
+/hadoop/core/trunk/src/contrib/mrunit:776175-786373

Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/sqoop/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19/src/contrib/sqoop:713112
-/hadoop/core/trunk/src/contrib/sqoop:784975-785643
+/hadoop/core/trunk/src/contrib/sqoop:784975-786373

Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/streaming/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/streaming:713112
-/hadoop/core/trunk/src/contrib/streaming:776175-785643
+/hadoop/core/trunk/src/contrib/streaming:776175-786373

Propchange: hadoop/core/branches/HADOOP-4687/mapred/src/contrib/vaidya/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Jun 19 05:42:53 2009
@@ -1,2 +1,2 @@
 /hadoop/core/branches/branch-0.19/mapred/src/contrib/vaidya:713112
-/hadoop/core/trunk/src/contrib/vaidya:776175-785643
+/hadoop/core/trunk/src/contrib/vaidya:776175-786373



Mime
View raw message