hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject svn commit: r808308 [4/5] - in /hadoop/mapreduce/trunk: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Date Thu, 27 Aug 2009 07:49:01 GMT
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=808308&r1=808307&r2=808308&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Aug 27 07:49:00 2009
@@ -18,755 +18,27 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
 import junit.framework.TestCase;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobInProgress;
-import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil.AccessControlList;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import static org.apache.hadoop.mapred.CapacityTestUtils.*;
+
+import java.io.IOException;
+import java.util.*;
 
 public class TestCapacityScheduler extends TestCase {
 
   static final Log LOG =
-      LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
+    LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
 
   private static int jobCounter;
 
-  /**
-   * Test class that removes the asynchronous nature of job initialization.
-   * 
-   * The run method is a dummy which just waits for completion. It is
-   * expected that test code calls the main method, initializeJobs, directly
-   * to trigger initialization.
-   */
-  class ControlledJobInitializer extends 
-                              JobInitializationPoller.JobInitializationThread {
-    
-    boolean stopRunning;
-    
-    public ControlledJobInitializer(JobInitializationPoller p) {
-      p.super();
-    }
-    
-    @Override
-    public void run() {
-      while (!stopRunning) {
-        try {
-          synchronized(this) {
-            this.wait();  
-          }
-        } catch (InterruptedException ie) {
-          break;
-        }
-      }
-    }
-    
-    void stopRunning() {
-      stopRunning = true;
-    }
-  }
-  
-  /**
-   * Test class that removes the asynchronous nature of job initialization.
-   * 
-   * The run method is a dummy which just waits for completion. It is
-   * expected that test code calls the main method, selectJobsToInitialize,
-   * directly to trigger initialization.
-   * 
-   * The class also creates the test worker thread objects of type 
-   * ControlledJobInitializer instead of the objects of the actual class
-   */
-  class ControlledInitializationPoller extends JobInitializationPoller {
-    
-    private boolean stopRunning;
-    private ArrayList<ControlledJobInitializer> workers;
-    
-    public ControlledInitializationPoller(JobQueuesManager mgr,
-                                          CapacitySchedulerConf rmConf,
-                                          Set<String> queues,
-                                          TaskTrackerManager ttm) {
-      super(mgr, rmConf, queues, ttm);
-    }
-    
-    @Override
-    public void run() {
-      // don't do anything here.
-      while (!stopRunning) {
-        try {
-          synchronized (this) {
-            this.wait();
-          }
-        } catch (InterruptedException ie) {
-          break;
-        }
-      }
-    }
-    
-    @Override
-    JobInitializationThread createJobInitializationThread() {
-      ControlledJobInitializer t = new ControlledJobInitializer(this);
-      if (workers == null) {
-        workers = new ArrayList<ControlledJobInitializer>();
-      }
-      workers.add(t);
-      return t;
-    }
-
-    @Override
-    void selectJobsToInitialize() {
-      super.cleanUpInitializedJobsList();
-      super.selectJobsToInitialize();
-      for (ControlledJobInitializer t : workers) {
-        t.initializeJobs();
-      }
-    }
-    
-    void stopRunning() {
-      stopRunning = true;
-      for (ControlledJobInitializer t : workers) {
-        t.stopRunning();
-        t.interrupt();
-      }
-    }
-  }
-
   private ControlledInitializationPoller controlledInitializationPoller;
-  /*
-   * Fake job in progress object used for testing the schedulers scheduling
-   * decisions. The JobInProgress objects returns out FakeTaskInProgress
-   * objects when assignTasks is called. If speculative maps and reduces
-   * are configured then JobInProgress returns exactly one Speculative
-   * map and reduce task.
-   */
-  static class FakeJobInProgress extends JobInProgress {
-    
-    protected FakeTaskTrackerManager taskTrackerManager;
-    private int mapTaskCtr;
-    private int redTaskCtr;
-    private Set<TaskInProgress> mapTips = 
-      new HashSet<TaskInProgress>();
-    private Set<TaskInProgress> reduceTips = 
-      new HashSet<TaskInProgress>();
-    private int speculativeMapTaskCounter = 0;
-    private int speculativeReduceTaskCounter = 0;
-    public FakeJobInProgress(JobID jId, JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager, String user) {
-      super(jId, jobConf, null);
-      this.taskTrackerManager = taskTrackerManager;
-      this.startTime = System.currentTimeMillis();
-      this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP, 
-          jobConf.getUser(), 
-          jobConf.getJobName(), "", "");
-      this.status.setJobPriority(JobPriority.NORMAL);
-      this.status.setStartTime(startTime);
-      if (null == jobConf.getQueueName()) {
-        this.profile = new JobProfile(user, jId, 
-            null, null, null);
-      }
-      else {
-        this.profile = new JobProfile(user, jId, 
-            null, null, null, jobConf.getQueueName());
-      }
-      mapTaskCtr = 0;
-      redTaskCtr = 0;
-    }
-    
-    @Override
-    public synchronized void initTasks() throws IOException {
-      getStatus().setRunState(JobStatus.RUNNING);
-    }
-
-    @Override
-    public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
-        int ignored) throws IOException {
-      boolean areAllMapsRunning = (mapTaskCtr == numMapTasks);
-      if (areAllMapsRunning){
-        if(!getJobConf().getMapSpeculativeExecution() || 
-            speculativeMapTasks > 0) {
-          return null;
-        }
-      }
-      TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 
-                                super.numSlotsPerMap) {
-        @Override
-        public String toString() {
-          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
-        }
-      };
-      taskTrackerManager.startTask(tts.getTrackerName(), task);
-      runningMapTasks++;
-      // create a fake TIP and keep track of it
-      FakeTaskInProgress mapTip = new FakeTaskInProgress(getJobID(), 
-          getJobConf(), task, true, this);
-      mapTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
-      if(areAllMapsRunning) {
-        speculativeMapTasks++;
-        //you have scheduled a speculative map. Now set all tips in the
-        //map tips not to have speculative task.
-        for(TaskInProgress t : mapTips) {
-          if (t instanceof FakeTaskInProgress) {
-            FakeTaskInProgress mt = (FakeTaskInProgress) t;
-            mt.hasSpeculativeMap = false;
-          }
-        }
-      } else {
-        //add only non-speculative tips.
-        mapTips.add(mapTip);
-        //add the tips to the JobInProgress TIPS
-        maps = mapTips.toArray(new TaskInProgress[mapTips.size()]);
-      }
-      return task;
-    }
-
-    @Override
-    public Task obtainNewReduceTask(final TaskTrackerStatus tts,
-        int clusterSize, int ignored) throws IOException {
-      boolean areAllReducesRunning = (redTaskCtr == numReduceTasks);
-      if (areAllReducesRunning){
-        if(!getJobConf().getReduceSpeculativeExecution() || 
-            speculativeReduceTasks > 0) {
-          return null;
-        }
-      }
-      TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
-      Task task = new ReduceTask("", attemptId, 0, 10, 
-                        super.numSlotsPerReduce) {
-        @Override
-        public String toString() {
-          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
-        }
-      };
-      taskTrackerManager.startTask(tts.getTrackerName(), task);
-      runningReduceTasks++;
-      // create a fake TIP and keep track of it
-      FakeTaskInProgress reduceTip = new FakeTaskInProgress(getJobID(), 
-          getJobConf(), task, false, this);
-      reduceTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
-      if(areAllReducesRunning) {
-        speculativeReduceTasks++;
-        //you have scheduled a speculative map. Now set all tips in the
-        //map tips not to have speculative task.
-        for(TaskInProgress t : reduceTips) {
-          if (t instanceof FakeTaskInProgress) {
-            FakeTaskInProgress rt = (FakeTaskInProgress) t;
-            rt.hasSpeculativeReduce = false;
-          }
-        }
-      } else {
-        //add only non-speculative tips.
-        reduceTips.add(reduceTip);
-        //add the tips to the JobInProgress TIPS
-        reduces = reduceTips.toArray(new TaskInProgress[reduceTips.size()]);
-      }
-      return task;
-    }
-    
-    public void mapTaskFinished() {
-      runningMapTasks--;
-      finishedMapTasks++;
-    }
-    
-    public void reduceTaskFinished() {
-      runningReduceTasks--;
-      finishedReduceTasks++;
-    }
-    
-    private TaskAttemptID getTaskAttemptID(boolean isMap, boolean isSpeculative) {
-      JobID jobId = getJobID();
-      TaskType t = TaskType.REDUCE;
-      if (isMap) {
-        t = TaskType.MAP;
-      }
-      if (!isSpeculative) {
-        return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
-            (isMap) ? ++mapTaskCtr : ++redTaskCtr, 0);
-      } else  {
-        return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
-            (isMap) ? mapTaskCtr : redTaskCtr, 1);
-      }
-    }
-    
-    @Override
-    Set<TaskInProgress> getNonLocalRunningMaps() {
-      return (Set<TaskInProgress>)mapTips;
-    }
-    @Override
-    Set<TaskInProgress> getRunningReduces() {
-      return (Set<TaskInProgress>)reduceTips;
-    }
-    
-  }
-  
-  static class FakeFailingJobInProgress extends FakeJobInProgress {
-
-    public FakeFailingJobInProgress(JobID id, JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager, String user) {
-      super(id, jobConf, taskTrackerManager, user);
-    }
-    
-    @Override
-    public synchronized void initTasks() throws IOException {
-      throw new IOException("Failed Initalization");
-    }
-    
-    @Override
-    synchronized void fail() {
-      this.status.setRunState(JobStatus.FAILED);
-    }
-  }
- 
-  static class FakeTaskInProgress extends TaskInProgress {
-    private boolean isMap;
-    private FakeJobInProgress fakeJob;
-    private TreeMap<TaskAttemptID, String> activeTasks;
-    private TaskStatus taskStatus;
-    boolean hasSpeculativeMap;
-    boolean hasSpeculativeReduce;
-    
-    FakeTaskInProgress(JobID jId, JobConf jobConf, Task t, 
-        boolean isMap, FakeJobInProgress job) {
-      super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0, 1);
-      this.isMap = isMap;
-      this.fakeJob = job;
-      activeTasks = new TreeMap<TaskAttemptID, String>();
-      activeTasks.put(t.getTaskID(), "tt");
-      // create a fake status for a task that is running for a bit
-      this.taskStatus = TaskStatus.createTaskStatus(isMap);
-      taskStatus.setProgress(0.5f);
-      taskStatus.setRunState(TaskStatus.State.RUNNING);
-      if (jobConf.getMapSpeculativeExecution()) {
-        //resetting of the hasSpeculativeMap is done
-        //when speculative map is scheduled by the job.
-        hasSpeculativeMap = true;
-      } 
-      if (jobConf.getReduceSpeculativeExecution()) {
-        //resetting of the hasSpeculativeReduce is done
-        //when speculative reduce is scheduled by the job.
-        hasSpeculativeReduce = true;
-      }
-    }
-    
-    @Override
-    TreeMap<TaskAttemptID, String> getActiveTasks() {
-      return activeTasks;
-    }
-    @Override
-    public TaskStatus getTaskStatus(TaskAttemptID taskid) {
-      // return a status for a task that has run a bit
-      return taskStatus;
-    }
-    @Override
-    boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
-      if (isMap) {
-        fakeJob.mapTaskFinished();
-      }
-      else {
-        fakeJob.reduceTaskFinished();
-      }
-      return true;
-    }
-    
-    @Override
-    /*
-     *hasSpeculativeMap and hasSpeculativeReduce is reset by FakeJobInProgress
-     *after the speculative tip has been scheduled.
-     */
-    boolean canBeSpeculated(long currentTime) {
-      if(isMap && hasSpeculativeMap) {
-        return fakeJob.getJobConf().getMapSpeculativeExecution();
-      } 
-      if (!isMap && hasSpeculativeReduce) {
-        return fakeJob.getJobConf().getReduceSpeculativeExecution();
-      }
-      return false;
-    }
-    
-    @Override
-    public boolean isRunning() {
-      return !activeTasks.isEmpty();
-    }
-    
-  }
-  
-  static class FakeQueueManager extends QueueManager {
-    private Set<String> queueNames = null;
-    private static final AccessControlList allEnabledAcl = new AccessControlList("*");
-    
-    FakeQueueManager() {
-      super(new Configuration());
-    }
-    
-    void setQueues(Set<String> queueNames) {
-      this.queueNames = queueNames;
-
-      // sync up queues with the parent class.
-      Queue[] queues = new Queue[queueNames.size()];
-      int i = 0;
-      for (String queueName : queueNames) {
-        HashMap<String, AccessControlList> aclsMap
-          = new HashMap<String, AccessControlList>();
-        for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
-          String key = QueueManager.toFullPropertyName(queueName,
-                                                        oper.getAclName());
-          aclsMap.put(key, allEnabledAcl);
-        }
-        queues[i++] = new Queue(queueName, aclsMap, Queue.QueueState.RUNNING);
-      }
-      super.setQueues(queues);
-    }
-    
-    public synchronized Set<String> getQueues() {
-      return queueNames;
-    }
-  }
-  
-  static class FakeTaskTrackerManager implements TaskTrackerManager {
-    int maps = 0;
-    int reduces = 0;
-    int maxMapTasksPerTracker = 2;
-    int maxReduceTasksPerTracker = 1;
-    long ttExpiryInterval = 10 * 60 * 1000L; // default interval
-    List<JobInProgressListener> listeners =
-      new ArrayList<JobInProgressListener>();
-    FakeQueueManager qm = new FakeQueueManager();
-    
-    private Map<String, TaskTracker> trackers =
-      new HashMap<String, TaskTracker>();
-    private Map<String, TaskStatus> taskStatuses = 
-      new HashMap<String, TaskStatus>();
-    private Map<JobID, JobInProgress> jobs =
-        new HashMap<JobID, JobInProgress>();
-
-    public FakeTaskTrackerManager() {
-      this(2, 2, 1);
-    }
-
-    public FakeTaskTrackerManager(int numTaskTrackers,
-        int maxMapTasksPerTracker, int maxReduceTasksPerTracker) {
-      this.maxMapTasksPerTracker = maxMapTasksPerTracker;
-      this.maxReduceTasksPerTracker = maxReduceTasksPerTracker;
-      for (int i = 1; i < numTaskTrackers + 1; i++) {
-        String ttName = "tt" + i;
-        TaskTracker tt = new TaskTracker(ttName);
-        tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", i,
-                                           new ArrayList<TaskStatus>(), 0, 
-                                           maxMapTasksPerTracker,
-                                           maxReduceTasksPerTracker));
-        trackers.put(ttName, tt);
-      }
-    }
-    
-    public void addTaskTracker(String ttName) {
-      TaskTracker tt = new TaskTracker(ttName);
-      tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", 1,
-                                         new ArrayList<TaskStatus>(), 0,
-                                         maxMapTasksPerTracker, 
-                                         maxReduceTasksPerTracker));
-      trackers.put(ttName, tt);
-    }
-    
-    public ClusterStatus getClusterStatus() {
-      int numTrackers = trackers.size();
-      return new ClusterStatus(numTrackers, 0,
-          ttExpiryInterval, maps, reduces,
-          numTrackers * maxMapTasksPerTracker,
-          numTrackers * maxReduceTasksPerTracker,
-          JobTracker.State.RUNNING);
-    }
-
-    public int getNumberOfUniqueHosts() {
-      return 0;
-    }
-
-    public int getNextHeartbeatInterval() {
-      return MRConstants.HEARTBEAT_INTERVAL_MIN;
-    }
-
-    @Override
-    public void killJob(JobID jobid) throws IOException {
-      JobInProgress job = jobs.get(jobid);
-      finalizeJob(job, JobStatus.KILLED);
-      job.kill();
-    }
-
-    public void initJob(JobInProgress jip) {
-      try {
-        JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
-        jip.initTasks();
-        if (jip.isJobEmpty()) {
-          completeEmptyJob(jip);
-        } else if (!jip.isSetupCleanupRequired()) {
-          jip.completeSetup();
-        }
-        JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-        JobStatusChangeEvent event = new JobStatusChangeEvent(jip, 
-          EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
-         for (JobInProgressListener listener : listeners) {
-           listener.jobUpdated(event);
-         }
-      } catch (Exception ioe) {
-        failJob(jip);
-      }
-    }
-
-    private synchronized void completeEmptyJob(JobInProgress jip) {
-      jip.completeEmptyJob();
-    }
-
-    public synchronized void failJob(JobInProgress jip) {
-      JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
-      jip.fail();
-      JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-      JobStatusChangeEvent event = new JobStatusChangeEvent(jip, 
-          EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
-      for (JobInProgressListener listener : listeners) {
-        listener.jobUpdated(event);
-      }
-    }
-
-    public void removeJob(JobID jobid) {
-      jobs.remove(jobid);
-    }
-    
-    @Override
-    public JobInProgress getJob(JobID jobid) {
-      return jobs.get(jobid);
-    }
-
-    Collection<JobInProgress> getJobs() {
-      return jobs.values();
-    }
-
-    public Collection<TaskTrackerStatus> taskTrackers() {
-      List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
-      for (TaskTracker tt : trackers.values()) {
-        statuses.add(tt.getStatus());
-      }
-      return statuses;
-    }
-
-
-    public void addJobInProgressListener(JobInProgressListener listener) {
-      listeners.add(listener);
-    }
-
-    public void removeJobInProgressListener(JobInProgressListener listener) {
-      listeners.remove(listener);
-    }
-    
-    public void submitJob(JobInProgress job) throws IOException {
-      jobs.put(job.getJobID(), job);
-      for (JobInProgressListener listener : listeners) {
-        listener.jobAdded(job);
-      }
-    }
-    
-    public TaskTracker getTaskTracker(String trackerID) {
-      return trackers.get(trackerID);
-    }
-    
-    public void startTask(String taskTrackerName, final Task t) {
-      if (t.isMapTask()) {
-        maps++;
-      } else {
-        reduces++;
-      }
-      TaskStatus status = new TaskStatus() {
-        @Override
-        public TaskAttemptID getTaskID() {
-          return t.getTaskID();
-        }
-
-        @Override
-        public boolean getIsMap() {
-          return t.isMapTask();
-        }
-        
-        @Override
-        public int getNumSlots() {
-          return t.getNumSlotsRequired();
-        }
-      };
-      taskStatuses.put(t.getTaskID().toString(), status);
-      status.setRunState(TaskStatus.State.RUNNING);
-      trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
-    }
-    
-    public void finishTask(String taskTrackerName, String tipId, 
-        FakeJobInProgress j) {
-      TaskStatus status = taskStatuses.get(tipId);
-      if (status.getIsMap()) {
-        maps--;
-        j.mapTaskFinished();
-      } else {
-        reduces--;
-        j.reduceTaskFinished();
-      }
-      status.setRunState(TaskStatus.State.SUCCEEDED);
-    }
-    
-    void finalizeJob(FakeJobInProgress fjob) {
-      finalizeJob(fjob, JobStatus.SUCCEEDED);
-    }
-
-    void finalizeJob(JobInProgress fjob, int state) {
-      // take a snapshot of the status before changing it
-      JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
-      fjob.getStatus().setRunState(state);
-      JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
-      JobStatusChangeEvent event = 
-        new JobStatusChangeEvent (fjob, EventType.RUN_STATE_CHANGED, oldStatus, 
-                                  newStatus);
-      for (JobInProgressListener listener : listeners) {
-        listener.jobUpdated(event);
-      }
-    }
-    
-    public void setPriority(FakeJobInProgress fjob, JobPriority priority) {
-      // take a snapshot of the status before changing it
-      JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
-      fjob.setPriority(priority);
-      JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
-      JobStatusChangeEvent event = 
-        new JobStatusChangeEvent (fjob, EventType.PRIORITY_CHANGED, oldStatus, 
-                                  newStatus);
-      for (JobInProgressListener listener : listeners) {
-        listener.jobUpdated(event);
-      }
-    }
-    
-    public void setStartTime(FakeJobInProgress fjob, long start) {
-      // take a snapshot of the status before changing it
-      JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
-      
-      fjob.startTime = start; // change the start time of the job
-      fjob.status.setStartTime(start); // change the start time of the jobstatus
-      
-      JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
-      
-      JobStatusChangeEvent event = 
-        new JobStatusChangeEvent (fjob, EventType.START_TIME_CHANGED, oldStatus,
-                                  newStatus);
-      for (JobInProgressListener listener : listeners) {
-        listener.jobUpdated(event);
-      }
-    }
-    
-    void addQueues(String[] arr) {
-      Set<String> queues = new HashSet<String>();
-      for (String s: arr) {
-        queues.add(s);
-      }
-      qm.setQueues(queues);
-    }
-    
-    public QueueManager getQueueManager() {
-      return qm;
-    }
-
-    @Override
-    public boolean killTask(TaskAttemptID taskid, boolean shouldFail) {
-      return true;
-    }
-  }
-  
-  // represents a fake queue configuration info
-  static class FakeQueueInfo {
-    String queueName;
-    float capacity;
-    boolean supportsPrio;
-    int ulMin;
-
-    public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin) {
-      this.queueName = queueName;
-      this.capacity = capacity;
-      this.supportsPrio = supportsPrio;
-      this.ulMin = ulMin;
-    }
-  }
-  
-  static class FakeResourceManagerConf extends CapacitySchedulerConf {
-  
-    // map of queue names to queue info
-    private Map<String, FakeQueueInfo> queueMap = 
-      new LinkedHashMap<String, FakeQueueInfo>();
-    String firstQueue;
-    
-    
-    void setFakeQueues(List<FakeQueueInfo> queues) {
-      for (FakeQueueInfo q: queues) {
-        queueMap.put(q.queueName, q);
-      }
-      firstQueue = new String(queues.get(0).queueName);
-    }
-    
-    public synchronized Set<String> getQueues() {
-      return queueMap.keySet();
-    }
-    
-    /*public synchronized String getFirstQueue() {
-      return firstQueue;
-    }*/
-    
-    public float getCapacity(String queue) {
-      if(queueMap.get(queue).capacity == -1) {
-        return super.getCapacity(queue);
-      }
-      return queueMap.get(queue).capacity;
-    }
-    
-    public int getMinimumUserLimitPercent(String queue) {
-      return queueMap.get(queue).ulMin;
-    }
-    
-    public boolean isPrioritySupported(String queue) {
-      return queueMap.get(queue).supportsPrio;
-    }
-    
-    @Override
-    public long getSleepInterval() {
-      return 1;
-    }
-    
-    @Override
-    public int getMaxWorkerThreads() {
-      return 1;
-    }
-  }
-
-  protected class FakeClock extends CapacityTaskScheduler.Clock {
-    private long time = 0;
-    
-    public void advance(long millis) {
-      time += millis;
-    }
 
-    @Override
-    long getTime() {
-      return time;
-    }
-  }
 
-  
   protected JobConf conf;
   protected CapacityTaskScheduler scheduler;
   private FakeTaskTrackerManager taskTrackerManager;
@@ -778,12 +50,14 @@
     setUp(2, 2, 1);
   }
 
-  private void setUp(int numTaskTrackers, int numMapTasksPerTracker,
-      int numReduceTasksPerTracker) {
+  private void setUp(
+    int numTaskTrackers, int numMapTasksPerTracker,
+    int numReduceTasksPerTracker) {
     jobCounter = 0;
     taskTrackerManager =
-        new FakeTaskTrackerManager(numTaskTrackers, numMapTasksPerTracker,
-            numReduceTasksPerTracker);
+      new FakeTaskTrackerManager(
+        numTaskTrackers, numMapTasksPerTracker,
+        numReduceTasksPerTracker);
     clock = new FakeClock();
     scheduler = new CapacityTaskScheduler(clock);
     scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -792,16 +66,16 @@
     // Don't let the JobInitializationPoller come in our way.
     resConf = new FakeResourceManagerConf();
     controlledInitializationPoller = new ControlledInitializationPoller(
-        scheduler.jobQueuesManager,
-        resConf,
-        resConf.getQueues(), taskTrackerManager);
+      scheduler.jobQueuesManager,
+      resConf,
+      resConf.getQueues(), taskTrackerManager);
     scheduler.setInitializationPoller(controlledInitializationPoller);
     scheduler.setConf(conf);
     //by default disable speculative execution.
     conf.setMapSpeculativeExecution(false);
     conf.setReduceSpeculativeExecution(false);
   }
-  
+
   @Override
   protected void tearDown() throws Exception {
     if (scheduler != null) {
@@ -809,38 +83,43 @@
     }
   }
 
-  private FakeJobInProgress submitJob(int state, JobConf jobConf) throws IOException {
+  private FakeJobInProgress submitJob(int state, JobConf jobConf)
+    throws IOException {
     FakeJobInProgress job =
-        new FakeJobInProgress(new JobID("test", ++jobCounter),
-            (jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager,
-            jobConf.getUser());
+      new FakeJobInProgress(
+        new JobID("test", ++jobCounter),
+        (jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager,
+        jobConf.getUser());
     job.getStatus().setRunState(state);
     taskTrackerManager.submitJob(job);
     return job;
   }
 
   private FakeJobInProgress submitJobAndInit(int state, JobConf jobConf)
-      throws IOException {
+    throws IOException {
     FakeJobInProgress j = submitJob(state, jobConf);
     taskTrackerManager.initJob(j);
     return j;
   }
 
-  private FakeJobInProgress submitJob(int state, int maps, int reduces, 
-      String queue, String user) throws IOException {
+  private FakeJobInProgress submitJob(
+    int state, int maps, int reduces,
+    String queue, String user) throws IOException {
     JobConf jobConf = new JobConf(conf);
     jobConf.setNumMapTasks(maps);
     jobConf.setNumReduceTasks(reduces);
-    if (queue != null)
+    if (queue != null) {
       jobConf.setQueueName(queue);
+    }
     jobConf.setUser(user);
     return submitJob(state, jobConf);
   }
-  
+
   // Submit a job and update the listeners
-  private FakeJobInProgress submitJobAndInit(int state, int maps, int reduces,
-                                             String queue, String user) 
-  throws IOException {
+  private FakeJobInProgress submitJobAndInit(
+    int state, int maps, int reduces,
+    String queue, String user)
+    throws IOException {
     FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
     taskTrackerManager.initJob(j);
     return j;
@@ -848,22 +127,23 @@
 
   /**
    * Test the max map limit.
+   *
    * @throws IOException
    */
   public void testMaxMapCap() throws IOException {
-    this.setUp(4,1,1);
-    taskTrackerManager.addQueues(new String[] {"default"});
+    this.setUp(4, 1, 1);
+    taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
     resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("default",2);
-    resConf.setMaxReduceCap("default",-1);
+    resConf.setMaxMapCap("default", 2);
+    resConf.setMaxReduceCap("default", -1);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
     //submit the Job
     FakeJobInProgress fjob1 =
-      submitJob(JobStatus.PREP,3,1,"default","user");
+      submitJob(JobStatus.PREP, 3, 1, "default", "user");
 
     taskTrackerManager.initJob(fjob1);
 
@@ -871,21 +151,27 @@
     List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
 
     //Once the 2 tasks are running the third assigment should be reduce.
-    checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt3",
+      "attempt_test_0001_r_000001_0 on tt3");
     //This should fail.
     List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
     assertNull(task4);
     //Now complete the task 1.
-        // complete the job
-    taskTrackerManager.finishTask("tt1", task1.get(0).getTaskID().toString(),
-                                  fjob1);
+    // complete the job
+    taskTrackerManager.finishTask(
+      "tt1", task1.get(0).getTaskID().toString(),
+      fjob1);
     //We have completed the tt1 task which was a map task so we expect one map
     //task to be picked up
-    checkAssignment("tt4","attempt_test_0001_m_000003_0 on tt4");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      "attempt_test_0001_m_000003_0 on tt4");
   }
 
   /**
    * Test max reduce limit
+   *
    * @throws IOException
    */
   public void testMaxReduceCap() throws IOException {
@@ -927,137 +213,152 @@
       "tt2", task2.get(0).getTaskID().toString(), fjob1);
 
     //One reduce is done hence assign the new reduce.
-    checkAssignment("tt4","attempt_test_0001_r_000003_0 on tt4");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      "attempt_test_0001_r_000003_0 on tt4");
   }
-  
+
   // test job run-state change
   public void testJobRunStateChange() throws IOException {
     // start the scheduler
-    taskTrackerManager.addQueues(new String[] {"default"});
+    taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 1));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
-    
+
     // submit the job
-    FakeJobInProgress fjob1 = 
+    FakeJobInProgress fjob1 =
       submitJob(JobStatus.PREP, 1, 0, "default", "user");
-    
-    FakeJobInProgress fjob2 = 
+
+    FakeJobInProgress fjob2 =
       submitJob(JobStatus.PREP, 1, 0, "default", "user");
-    
+
     // test if changing the job priority/start-time works as expected in the 
     // waiting queue
     testJobOrderChange(fjob1, fjob2, true);
-    
+
     // Init the jobs
     // simulate the case where the job with a lower priority becomes running 
     // first (may be because of the setup tasks).
-    
+
     // init the lower ranked job first
     taskTrackerManager.initJob(fjob2);
-    
+
     // init the higher ordered job later
     taskTrackerManager.initJob(fjob1);
-    
+
     // check if the jobs are missing from the waiting queue
     // The jobs are not removed from waiting queue until they are scheduled 
-    assertEquals("Waiting queue is garbled on job init", 2, 
-                 scheduler.jobQueuesManager.getWaitingJobs("default")
-                          .size());
-    
+    assertEquals(
+      "Waiting queue is garbled on job init", 2,
+      scheduler.jobQueuesManager.getJobQueue("default").getWaitingJobs()
+        .size());
+
     // test if changing the job priority/start-time works as expected in the 
     // running queue
     testJobOrderChange(fjob1, fjob2, false);
-    
+
     // schedule a task
     List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
-    
+
     // complete the job
-    taskTrackerManager.finishTask("tt1", tasks.get(0).getTaskID().toString(), 
-                                  fjob1);
-    
+    taskTrackerManager.finishTask(
+      "tt1", tasks.get(0).getTaskID().toString(),
+      fjob1);
+
     // mark the job as complete
     taskTrackerManager.finalizeJob(fjob1);
-    
-    Collection<JobInProgress> rqueue = 
-      scheduler.jobQueuesManager.getRunningJobQueue("default");
-    
+
+    Collection<JobInProgress> rqueue =
+      scheduler.jobQueuesManager.getJobQueue("default").getRunningJobs();
+
     // check if the job is removed from the scheduler
-    assertFalse("Scheduler contains completed job", 
-                rqueue.contains(fjob1));
-    
+    assertFalse(
+      "Scheduler contains completed job",
+      rqueue.contains(fjob1));
+
     // check if the running queue size is correct
-    assertEquals("Job finish garbles the queue", 
-                 1, rqueue.size());
+    assertEquals(
+      "Job finish garbles the queue",
+      1, rqueue.size());
 
   }
-  
+
   // test if the queue reflects the changes
-  private void testJobOrderChange(FakeJobInProgress fjob1, 
-                                  FakeJobInProgress fjob2, 
-                                  boolean waiting) {
+  private void testJobOrderChange(
+    FakeJobInProgress fjob1,
+    FakeJobInProgress fjob2,
+    boolean waiting) {
     String queueName = waiting ? "waiting" : "running";
-    
+
     // check if the jobs in the queue are the right order
     JobInProgress[] jobs = getJobsInQueue(waiting);
-    assertTrue(queueName + " queue doesnt contain job #1 in right order", 
-                jobs[0].getJobID().equals(fjob1.getJobID()));
-    assertTrue(queueName + " queue doesnt contain job #2 in right order", 
-                jobs[1].getJobID().equals(fjob2.getJobID()));
-    
+    assertTrue(
+      queueName + " queue doesnt contain job #1 in right order",
+      jobs[0].getJobID().equals(fjob1.getJobID()));
+    assertTrue(
+      queueName + " queue doesnt contain job #2 in right order",
+      jobs[1].getJobID().equals(fjob2.getJobID()));
+
     // I. Check the start-time change
     // Change job2 start-time and check if job2 bumps up in the queue 
     taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1);
-    
+
     jobs = getJobsInQueue(waiting);
-    assertTrue("Start time change didnt not work as expected for job #2 in "
-               + queueName + " queue", 
-                jobs[0].getJobID().equals(fjob2.getJobID()));
-    assertTrue("Start time change didnt not work as expected for job #1 in"
-               + queueName + " queue", 
-                jobs[1].getJobID().equals(fjob1.getJobID()));
-    
+    assertTrue(
+      "Start time change didnt not work as expected for job #2 in "
+        + queueName + " queue",
+      jobs[0].getJobID().equals(fjob2.getJobID()));
+    assertTrue(
+      "Start time change didnt not work as expected for job #1 in"
+        + queueName + " queue",
+      jobs[1].getJobID().equals(fjob1.getJobID()));
+
     // check if the queue is fine
-    assertEquals("Start-time change garbled the " + queueName + " queue", 
-                 2, jobs.length);
-    
+    assertEquals(
+      "Start-time change garbled the " + queueName + " queue",
+      2, jobs.length);
+
     // II. Change job priority change
     // Bump up job1's priority and make sure job1 bumps up in the queue
     taskTrackerManager.setPriority(fjob1, JobPriority.HIGH);
-    
+
     // Check if the priority changes are reflected
     jobs = getJobsInQueue(waiting);
-    assertTrue("Priority change didnt not work as expected for job #1 in "
-               + queueName + " queue",  
-                jobs[0].getJobID().equals(fjob1.getJobID()));
-    assertTrue("Priority change didnt not work as expected for job #2 in "
-               + queueName + " queue",  
-                jobs[1].getJobID().equals(fjob2.getJobID()));
-    
+    assertTrue(
+      "Priority change didnt not work as expected for job #1 in "
+        + queueName + " queue",
+      jobs[0].getJobID().equals(fjob1.getJobID()));
+    assertTrue(
+      "Priority change didnt not work as expected for job #2 in "
+        + queueName + " queue",
+      jobs[1].getJobID().equals(fjob2.getJobID()));
+
     // check if the queue is fine
-    assertEquals("Priority change has garbled the " + queueName + " queue", 
-                 2, jobs.length);
-    
+    assertEquals(
+      "Priority change has garbled the " + queueName + " queue",
+      2, jobs.length);
+
     // reset the queue state back to normal
     taskTrackerManager.setStartTime(fjob1, fjob2.startTime - 1);
     taskTrackerManager.setPriority(fjob1, JobPriority.NORMAL);
   }
-  
+
   private JobInProgress[] getJobsInQueue(boolean waiting) {
-    Collection<JobInProgress> queue = 
-      waiting 
-      ? scheduler.jobQueuesManager.getWaitingJobs("default")
-      : scheduler.jobQueuesManager.getRunningJobQueue("default");
+    Collection<JobInProgress> queue =
+      waiting
+        ? scheduler.jobQueuesManager.getJobQueue("default").getWaitingJobs()
+        : scheduler.jobQueuesManager.getJobQueue("default").getRunningJobs();
     return queue.toArray(new JobInProgress[0]);
   }
-  
+
   // tests if tasks can be assinged when there are multiple jobs from a same
   // user
   public void testJobFinished() throws Exception {
-    taskTrackerManager.addQueues(new String[] {"default"});
-    
+    taskTrackerManager.addQueues(new String[]{"default"});
+
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     resConf.setFakeQueues(queues);
@@ -1065,49 +366,63 @@
     scheduler.start();
 
     // submit 2 jobs
-    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
-    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
-    
+    FakeJobInProgress j1 = submitJobAndInit(
+      JobStatus.PREP, 3, 0, "default", "u1");
+    FakeJobInProgress j2 = submitJobAndInit(
+      JobStatus.PREP, 3, 0, "default", "u1");
+
     // I. Check multiple assignments with running tasks within job
     // ask for a task from first job
-    Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    Task t = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
     //  ask for another task from the first job
-    t = checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    
+    t = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000002_0 on tt1");
+
     // complete tasks
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
-    
+
     // II. Check multiple assignments with running tasks across jobs
     // ask for a task from first job
-    t = checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
-    
+    t = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000003_0 on tt1");
+
     //  ask for a task from the second job
-    t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    
+    t = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000001_0 on tt1");
+
     // complete tasks
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000003_0", j1);
-    
+
     // III. Check multiple assignments with completed tasks across jobs
     // ask for a task from the second job
-    t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
-    
+    t = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000002_0 on tt1");
+
     // complete task
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000002_0", j2);
-    
+
     // IV. Check assignment with completed job
     // finish first job
-    scheduler.jobCompleted(j1);
-    
+    scheduler.jobQueuesManager.getJobQueue(j1).jobCompleted(j1);
+
     // ask for another task from the second job
     // if tasks can be assigned then the structures are properly updated 
-    t = checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
-    
+    t = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000003_0 on tt1");
+
     // complete task
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
   }
-  
+
   // basic tests, should be able to submit to queues
   public void testSubmitToQueues() throws Exception {
     // set up some queues
@@ -1123,63 +438,70 @@
     // submit a job with no queue specified. It should be accepted
     // and given to the default queue. 
     JobInProgress j = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-    
+
     // when we ask for a task, we should get one, from the job submitted
     Task t;
-    t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    t = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
     // submit another job, to a different queue
     j = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // now when we get a task, it should be from the second job
-    t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    t = checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0002_m_000001_0 on tt2");
   }
-  
+
   public void testGetJobs() throws Exception {
     // need only one queue
-    String[] qs = { "default" };
+    String[] qs = {"default"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
-    HashMap<String, ArrayList<FakeJobInProgress>> subJobsList = 
+    HashMap<String, ArrayList<FakeJobInProgress>> subJobsList =
       submitJobs(1, 4, "default");
-   
+
     JobQueuesManager mgr = scheduler.jobQueuesManager;
-    
-    while(mgr.getWaitingJobs("default").size() < 4){
+
+    while (mgr.getJobQueue("default").getWaitingJobs().size() < 4) {
       Thread.sleep(1);
     }
     //Raise status change events for jobs submitted.
     raiseStatusChangeEvents(mgr);
     Collection<JobInProgress> jobs = scheduler.getJobs("default");
-    
-    assertTrue("Number of jobs returned by scheduler is wrong" 
-        ,jobs.size() == 4);
-    
-    assertTrue("Submitted jobs and Returned jobs are not same",
-        subJobsList.get("u1").containsAll(jobs));
+
+    assertTrue(
+      "Number of jobs returned by scheduler is wrong"
+      , jobs.size() == 4);
+
+    assertTrue(
+      "Submitted jobs and Returned jobs are not same",
+      subJobsList.get("u1").containsAll(jobs));
   }
-  
+
   //Basic test to test capacity allocation across the queues which have no
   //capacity configured.
-  
+
   public void testCapacityAllocationToQueues() throws Exception {
-    String[] qs = {"default","q1","q2","q3","q4"};
+    String[] qs = {"default", "qAZ1", "qAZ2", "qAZ3", "qAZ4"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default",25.0f,true,25));
-    queues.add(new FakeQueueInfo("q1",-1.0f,true,25));
-    queues.add(new FakeQueueInfo("q2",-1.0f,true,25));
-    queues.add(new FakeQueueInfo("q3",-1.0f,true,25));
-    queues.add(new FakeQueueInfo("q4",-1.0f,true,25));
+    queues.add(new FakeQueueInfo("default", 25.0f, true, 25));
+    queues.add(new FakeQueueInfo("qAZ1", -1.0f, true, 25));
+    queues.add(new FakeQueueInfo("qAZ2", -1.0f, true, 25));
+    queues.add(new FakeQueueInfo("qAZ3", -1.0f, true, 25));
+    queues.add(new FakeQueueInfo("qAZ4", -1.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
-    scheduler.start(); 
-    assertEquals(18.75f, resConf.getCapacity("q1"));
-    assertEquals(18.75f, resConf.getCapacity("q2"));
-    assertEquals(18.75f, resConf.getCapacity("q3"));
-    assertEquals(18.75f, resConf.getCapacity("q4"));
+    scheduler.start();
+    JobQueuesManager jqm = scheduler.jobQueuesManager;
+    assertEquals(18.75f, jqm.getJobQueue("qAZ1").qsc.getCapacityPercent());
+    assertEquals(18.75f, jqm.getJobQueue("qAZ2").qsc.getCapacityPercent());
+    assertEquals(18.75f, jqm.getJobQueue("qAZ3").qsc.getCapacityPercent());
+    assertEquals(18.75f, jqm.getJobQueue("qAZ4").qsc.getCapacityPercent());
   }
 
   // Tests how capacity is computed and assignment of tasks done
@@ -1196,47 +518,47 @@
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
-   
+
     // submit a job to the default queue
     submitJobAndInit(JobStatus.PREP, 10, 0, "default", "u1");
-    
+
     // submit a job to the second queue
     submitJobAndInit(JobStatus.PREP, 10, 0, "q2", "u1");
-    
+
     // job from q2 runs first because it has some non-zero capacity.
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    verifyCapacity("0", "default");
-    verifyCapacity("3", "q2");
-    
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000001_0 on tt1");
+    verifyCapacity(taskTrackerManager, "0", "default");
+    verifyCapacity(taskTrackerManager, "3", "q2");
+
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt3");
-    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
-    verifyCapacity("0", "default");
-    verifyCapacity("5", "q2");
-    
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0002_m_000002_0 on tt2");
+    verifyCapacity(taskTrackerManager, "0", "default");
+    verifyCapacity(taskTrackerManager, "5", "q2");
+
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt4");
-    checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
-    verifyCapacity("0", "default");
-    verifyCapacity("7", "q2");
-    
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt3",
+      "attempt_test_0002_m_000003_0 on tt3");
+    verifyCapacity(taskTrackerManager, "0", "default");
+    verifyCapacity(taskTrackerManager, "7", "q2");
+
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt5");
     // now job from default should run, as it is furthest away
     // in terms of runningMaps / capacity.
-    checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
-    verifyCapacity("1", "default");
-    verifyCapacity("9", "q2");
-  }
-  
-  private void verifyCapacity(String expectedCapacity,
-                                          String queue) throws IOException {
-    String schedInfo = taskTrackerManager.getQueueManager().
-                          getSchedulerInfo(queue).toString();    
-    assertTrue(schedInfo.contains("Map tasks\nCapacity: " 
-        + expectedCapacity + " slots"));
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      "attempt_test_0001_m_000001_0 on tt4");
+    verifyCapacity(taskTrackerManager, "1", "default");
+    verifyCapacity(taskTrackerManager, "9", "q2");
   }
-  
+
   // test capacity transfer
   public void testCapacityTransfer() throws Exception {
     // set up some queues
@@ -1253,14 +575,22 @@
     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the capacity for maps is 2. Since we're the only user,
     // we should get a task 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
     // I should get another map task. 
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000002_0 on tt1");
     // Now we're at full capacity for maps. If I ask for another map task,
     // I should get a map task from the default queue's capacity. 
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_m_000003_0 on tt2");
     // and another
-    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_m_000004_0 on tt2");
   }
 
   /**
@@ -1271,29 +601,29 @@
    * @throws IOException
    */
   public void testHighMemoryBlockingWithMaxLimit()
-      throws IOException {
+    throws IOException {
 
     // 2 map and 1 reduce slots
     taskTrackerManager = new FakeTaskTrackerManager(2, 2, 1);
 
-    taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
+    taskTrackerManager.addQueues(new String[]{"defaultXYZM"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("defaultXYZ", 100.0f, true, 25));
+    queues.add(new FakeQueueInfo("defaultXYZM", 100.0f, true, 25));
     resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("defaultXYZ",2);
+    resConf.setMaxMapCap("defaultXYZM", 2);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
     // Normal job in the cluster would be 1GB maps/reduces
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
-        2 * 1024);
+      JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+      2 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+      JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
-        1 * 1024);
+      JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+      1 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+      JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -1301,40 +631,46 @@
     //Set the max limit for queue to 2 ,
     // try submitting more map tasks to the queue , it should not happen
 
-    LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
+    LOG.debug(
+      "Submit one high memory(2GB maps, 0MB reduces) job of "
         + "2 map tasks");
     JobConf jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(2 * 1024);
     jConf.setMemoryForReduceTask(0);
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(0);
-    jConf.setQueueName("defaultXYZ");
+    jConf.setQueueName("defaultXYZM");
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
+    LOG.debug(
+      "Submit another regular memory(1GB vmem maps/reduces) job of "
         + "2 map/red tasks");
     jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
     jConf.setMemoryForReduceTask(1 * 1024);
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(2);
-    jConf.setQueueName("defaultXYZ");
+    jConf.setQueueName("defaultXYZM");
     jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
     // first, a map from j1 will run this is a high memory job so it would
     // occupy the 2 slots
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
 
-    checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
+    checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 100.0f, 1, 1);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
 
     // at this point, the scheduler tries to schedule another map from j1.
     // there isn't enough space. The second job's reduce should be scheduled.
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-    
-    checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_r_000001_0 on tt1");
+
+    checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 100.0f, 1, 1);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
 
     //at this point , the scheduler tries to schedule another map from j2 for
@@ -1342,11 +678,13 @@
     // This should not happen as all the map slots are taken
     //by the first task itself.hence reduce task from the second job is given
 
-    checkAssignment("tt2","attempt_test_0002_r_000002_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0002_r_000002_0 on tt2");
   }
 
   /**
-   *   test if user limits automatically adjust to max map or reduce limit
+   * test if user limits automatically adjust to max map or reduce limit
    */
   public void testUserLimitsWithMaxLimits() throws Exception {
     setUp(4, 4, 4);
@@ -1370,18 +708,27 @@
     // for queue 'default', the capacity for maps is 2.
     // But the max map limit is 2
     // hence user should be getting not more than 1 as it is the 50%.
-    Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    Task t1 = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
 
     //Now we should get the task from the other job. As the
     //first user has reached his max map limit.
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0002_m_000001_0 on tt2");
 
     //Now we are done with map limit , now if we ask for task we should
     // get reduce from 1st job
-    checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt3",
+      "attempt_test_0001_r_000001_0 on tt3");
     // Now we're at full capacity for maps. 1 done with reduces for job 1 so
     // now we should get 1 reduces for job 2
-    Task t4 = checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
+    Task t4 = checkAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      "attempt_test_0002_r_000001_0 on tt4");
 
     taskTrackerManager.finishTask(
       "tt1", t1.getTaskID().toString(),
@@ -1389,14 +736,18 @@
 
     //tt1 completed the task so we have 1 map slot for u1
     // we are assigning the 2nd map task from fjob1
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000002_0 on tt1");
 
     taskTrackerManager.finishTask(
       "tt4", t4.getTaskID().toString(),
       fjob2);
     //tt4 completed the task , so we have 1 reduce slot for u2
     //we are assigning the 2nd reduce from fjob2
-    checkAssignment("tt4", "attempt_test_0002_r_000002_0 on tt4");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      "attempt_test_0002_r_000002_0 on tt4");
 
   }
 
@@ -1417,16 +768,24 @@
     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the capacity for maps is 2. Since we're the only user,
     // we should get a task 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
     // Submit another job, from a different user
     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
     // Now if I ask for a map task, it should come from the second job 
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000001_0 on tt1");
     // Now we're at full capacity for maps. If I ask for another map task,
     // I should get a map task from the default queue's capacity. 
-    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_m_000002_0 on tt2");
     // and another
-    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0002_m_000002_0 on tt2");
   }
 
   // test user limits when a 2nd job is submitted much after first job 
@@ -1445,15 +804,23 @@
     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the capacity for maps is 2. Since we're the only user,
     // we should get a task 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
     // since we're the only job, we get another map
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000002_0 on tt1");
     // Submit another job, from a different user
     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
     // Now if I ask for a map task, it should come from the second job 
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0002_m_000001_0 on tt2");
     // and another
-    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0002_m_000002_0 on tt2");
   }
 
   // test user limits when a 2nd job is submitted much after first job 
@@ -1473,27 +840,43 @@
     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the capacity for maps is 2. Since we're the only user,
     // we should get a task 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
     // since we're the only job, we get another map
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000002_0 on tt1");
     // we get two more maps from 'default queue'
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_m_000004_0 on tt2");
     // Submit another job, from a different user
     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
     // one of the task finishes
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
     // Now if I ask for a map task, it should come from the second job 
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000001_0 on tt1");
     // another task from job1 finishes, another new task to job2
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
-    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000002_0 on tt1");
     // now we have equal number of tasks from each job. Whichever job's
     // task finishes, that job gets a new task
     taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0", j1);
-    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_m_000005_0 on tt2");
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
-    checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000003_0 on tt1");
   }
 
   // test user limits with many users, more slots
@@ -1514,20 +897,40 @@
     // u1 submits job
     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
     // it gets the first 5 slots
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
-    checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_m_000004_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt3",
+      "attempt_test_0001_m_000005_0 on tt3");
     // u2 submits job with 4 slots
     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
     // u2 should get next 4 slots
-    checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
-    checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
-    checkAssignment("tt4", "attempt_test_0002_m_000003_0 on tt4");
-    checkAssignment("tt5", "attempt_test_0002_m_000004_0 on tt5");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt3",
+      "attempt_test_0002_m_000001_0 on tt3");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      "attempt_test_0002_m_000002_0 on tt4");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      "attempt_test_0002_m_000003_0 on tt4");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt5",
+      "attempt_test_0002_m_000004_0 on tt5");
     // last slot should go to u1, since u2 has no more tasks
-    checkAssignment("tt5", "attempt_test_0001_m_000006_0 on tt5");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt5",
+      "attempt_test_0001_m_000006_0 on tt5");
     // u1 finishes a task
     taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1);
     // u1 submits a few more jobs 
@@ -1543,28 +946,34 @@
     submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
     // next slot should go to u3, even though u2 has an earlier job, since
     // user limits have changed and u1/u2 are over limits
-    checkAssignment("tt5", "attempt_test_0007_m_000001_0 on tt5");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt5",
+      "attempt_test_0007_m_000001_0 on tt5");
     // some other task finishes and u3 gets it
     taskTrackerManager.finishTask("tt5", "attempt_test_0002_m_000004_0", j1);
-    checkAssignment("tt5", "attempt_test_0007_m_000002_0 on tt5");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt5",
+      "attempt_test_0007_m_000002_0 on tt5");
     // now, u2 finishes a task
     taskTrackerManager.finishTask("tt4", "attempt_test_0002_m_000002_0", j1);
     // next slot will go to u1, since u3 has nothing to run and u1's job is 
     // first in the queue
-    checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      "attempt_test_0001_m_000007_0 on tt4");
   }
 
   /**
    * Test to verify that high memory jobs hit user limits faster than any normal
    * job.
-   * 
+   *
    * @throws IOException
    */
   public void testUserLimitsForHighMemoryJobs()
-      throws IOException {
+    throws IOException {
     taskTrackerManager = new FakeTaskTrackerManager(1, 10, 10);
     scheduler.setTaskTrackerManager(taskTrackerManager);
-    String[] qs = { "default" };
+    String[] qs = {"default"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
@@ -1572,13 +981,13 @@
     // enabled memory-based scheduling
     // Normal job in the cluster would be 1GB maps/reduces
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+      JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+      JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024);
+      JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+      JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -1592,7 +1001,8 @@
     jConf.setQueueName("default");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+    LOG.debug(
+      "Submit one high memory(2GB maps, 2GB reduces) job of "
         + "6 map and 6 reduce tasks");
     jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(2 * 1024);
@@ -1604,23 +1014,51 @@
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
     // Verify that normal job takes 3 task assignments to hit user limits
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000005_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000005_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_r_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_r_000002_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000003_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_r_000003_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000004_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_r_000004_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000005_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_r_000005_0 on tt1");
     // u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
     // are hit. So u2 should get slots
 
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_r_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000002_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_r_000002_0 on tt1");
 
     // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
     // slots. Because of high memory tasks, giving u2 another task would
@@ -1679,12 +1117,12 @@
     scheduler.assignTasks(tracker("tt2")); // heartbeat
     int totalMaps = taskTrackerManager.getClusterStatus().getMaxMapTasks();
     int totalReduces =
-        taskTrackerManager.getClusterStatus().getMaxReduceTasks();
+      taskTrackerManager.getClusterStatus().getMaxReduceTasks();
     QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
     String schedulingInfo =
-        queueManager.getJobQueueInfo("default").getSchedulingInfo();
+      queueManager.getJobQueueInfo("default").getSchedulingInfo();
     String schedulingInfo2 =
-        queueManager.getJobQueueInfo("q2").getSchedulingInfo();
+      queueManager.getJobQueueInfo("q2").getSchedulingInfo();
     String[] infoStrings = schedulingInfo.split("\n");
     assertEquals(infoStrings.length, 18);
     assertEquals(infoStrings[0], "Queue configuration");
@@ -1693,13 +1131,15 @@
     assertEquals(infoStrings[3], "Priority Supported: YES");
     assertEquals(infoStrings[4], "-------------");
     assertEquals(infoStrings[5], "Map tasks");
-    assertEquals(infoStrings[6], "Capacity: " + totalMaps * 50 / 100
+    assertEquals(
+      infoStrings[6], "Capacity: " + totalMaps * 50 / 100
         + " slots");
     assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
     assertEquals(infoStrings[8], "Running tasks: 0");
     assertEquals(infoStrings[9], "-------------");
     assertEquals(infoStrings[10], "Reduce tasks");
-    assertEquals(infoStrings[11], "Capacity: " + totalReduces * 50 / 100
+    assertEquals(
+      infoStrings[11], "Capacity: " + totalReduces * 50 / 100
         + " slots");
     assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
     assertEquals(infoStrings[13], "Running tasks: 0");
@@ -1744,14 +1184,16 @@
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
     raiseStatusChangeEvents(scheduler.jobQueuesManager, "q2");
     //assign one job
-    Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    Task t1 = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
     //Initalize extra job.
     controlledInitializationPoller.selectJobsToInitialize();
 
     //Get scheduling information, now the number of waiting job should have
     //changed to 4 as one is scheduled and has become running.
     // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateContextInfoForTests();
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1765,9 +1207,11 @@
     assertEquals(infoStrings[18], "Number of Waiting Jobs: 4");
 
     //assign a reduce task
-    Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    Task t2 = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_r_000001_0 on tt1");
     // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateContextInfoForTests();
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1789,7 +1233,7 @@
     taskTrackerManager.finalizeJob(u1j1);
 
     // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateContextInfoForTests();
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1802,13 +1246,14 @@
 
     //Fail a job which is initialized but not scheduled and check the count.
     FakeJobInProgress u1j2 = userJobs.get(1);
-    assertTrue("User1 job 2 not initalized ",
-        u1j2.getStatus().getRunState() == JobStatus.RUNNING);
+    assertTrue(
+      "User1 job 2 not initalized ",
+      u1j2.getStatus().getRunState() == JobStatus.RUNNING);
     taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
     //Run initializer to clean up failed jobs
     controlledInitializationPoller.selectJobsToInitialize();
     // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateContextInfoForTests();
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1823,14 +1268,15 @@
 
     //Fail a job which is not initialized but is in the waiting queue.
     FakeJobInProgress u1j5 = userJobs.get(4);
-    assertFalse("User1 job 5 initalized ",
-        u1j5.getStatus().getRunState() == JobStatus.RUNNING);
+    assertFalse(
+      "User1 job 5 initalized ",
+      u1j5.getStatus().getRunState() == JobStatus.RUNNING);
 
     taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
     //run initializer to clean up failed job
     controlledInitializationPoller.selectJobsToInitialize();
     // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateContextInfoForTests();
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1851,16 +1297,19 @@
 
     //Now schedule a map should be job3 of the user as job1 succeeded job2
     //failed and now job3 is running
-    t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+    t1 = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0003_m_000001_0 on tt1");
     FakeJobInProgress u1j3 = userJobs.get(2);
-    assertTrue("User Job 3 not running ",
-        u1j3.getStatus().getRunState() == JobStatus.RUNNING);
+    assertTrue(
+      "User Job 3 not running ",
+      u1j3.getStatus().getRunState() == JobStatus.RUNNING);
 
     //now the running count of map should be one and waiting jobs should be
     //one. run the poller as it is responsible for waiting count
     controlledInitializationPoller.selectJobsToInitialize();
     // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateContextInfoForTests();
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1874,7 +1323,7 @@
     //Fail the executing job
     taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
     // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateContextInfoForTests();
     //Now running counts should become zero
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
@@ -1888,15 +1337,16 @@
   /**
    * Test to verify that highMemoryJobs are scheduled like all other jobs when
    * memory-based scheduling is not enabled.
+   *
    * @throws IOException
    */
   public void testDisabledMemoryBasedScheduling()
-      throws IOException {
+    throws IOException {
 
     LOG.debug("Starting the scheduler.");
     taskTrackerManager = new FakeTaskTrackerManager(1, 1, 1);
 
-    taskTrackerManager.addQueues(new String[] { "default" });
+    taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
     resConf.setFakeQueues(queues);
@@ -1905,7 +1355,8 @@
     // memory-based scheduling disabled by default.
     scheduler.start();
 
-    LOG.debug("Submit one high memory job of 1 3GB map task "
+    LOG.debug(
+      "Submit one high memory job of 1 3GB map task "
         + "and 1 1GB reduce task.");
     JobConf jConf = new JobConf();
     jConf.setMemoryForMapTask(3 * 1024L); // 3GB
@@ -1919,23 +1370,27 @@
     // assert that all tasks are launched even though they transgress the
     // scheduling limits.
 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_r_000001_0 on tt1");
   }
 
   /**
    * Test reverting HADOOP-4979. If there is a high-mem job, we should now look
    * at reduce jobs (if map tasks are high-mem) or vice-versa.
-   * 
+   *
    * @throws IOException
    */
   public void testHighMemoryBlockingAcrossTaskTypes()
-      throws IOException {
+    throws IOException {
 
     // 2 map and 1 reduce slots
     taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
 
-    taskTrackerManager.addQueues(new String[] { "default" });
+    taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
     resConf.setFakeQueues(queues);
@@ -1943,15 +1398,15 @@
     // enabled memory-based scheduling
     // Normal job in the cluster would be 1GB maps/reduces
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
-        2 * 1024);
+      JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+      2 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+      JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
-        1 * 1024);
+      JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+      1 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+      JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -1960,8 +1415,9 @@
     // maps and reduces.
     // First job cannot run for want of memory for maps. In this case, second
     // job's reduces should run.
-    
-    LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
+
+    LOG.debug(
+      "Submit one high memory(2GB maps, 0MB reduces) job of "
         + "2 map tasks");
     JobConf jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(2 * 1024);
@@ -1972,7 +1428,8 @@
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
+    LOG.debug(
+      "Submit another regular memory(1GB vmem maps/reduces) job of "
         + "2 map/red tasks");
     jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
@@ -1982,53 +1439,60 @@
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
-    
+
     // first, a map from j1 will run
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
     // Total 2 map slots should be accounted for.
     checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
 
     // at this point, the scheduler tries to schedule another map from j1. 
     // there isn't enough space. The second job's reduce should be scheduled.
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_r_000001_0 on tt1");
     // Total 1 reduce slot should be accounted for.
-    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
-        100.0f);
+    checkOccupiedSlots(
+      "default", TaskType.REDUCE, 1, 1,
+      100.0f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
   }
 
   /**
    * Test blocking of cluster for lack of memory.
+   *
    * @throws IOException
    */
   public void testClusterBlockingForLackOfMemory()
-      throws IOException {
+    throws IOException {
 
     LOG.debug("Starting the scheduler.");
     taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
 
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    taskTrackerManager.addQueues(new String[] { "default" });
+    taskTrackerManager.addQueues(new String[]{"default"});
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
     // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
-        2 * 1024);
+      JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+      2 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+      JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
-        2 * 1024);
+      JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+      2 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+      JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
-    LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
+    LOG.debug(
+      "Submit one normal memory(1GB maps/reduces) job of "
         + "1 map, 1 reduce tasks.");
     JobConf jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
@@ -2040,25 +1504,33 @@
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
 
     // Fill the second tt with this job.
-    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_m_000001_0 on tt2");
     // Total 1 map slot should be accounted for.
     checkOccupiedSlots("default", TaskType.MAP, 1, 1, 25.0f);
-    assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+    assertEquals(
+      String.format(
+        TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
         1, 1, 0, 0, 0, 0),
-        (String) job1.getSchedulingInfo());
+      (String) job1.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
-    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_r_000001_0 on tt2");
     // Total 1 map slot should be accounted for.
-    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
-        25.0f);
-    assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+    checkOccupiedSlots(
+      "default", TaskType.REDUCE, 1, 1,
+      25.0f);
+    assertEquals(
+      String.format(
+        TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
         1, 1, 0, 1, 1, 0),
-        (String) job1.getSchedulingInfo());
+      (String) job1.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
 
-    LOG.debug("Submit one high memory(2GB maps/reduces) job of "
+    LOG.debug(
+      "Submit one high memory(2GB maps/reduces) job of "
         + "2 map, 2 reduce tasks.");
     jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(2 * 1024);
@@ -2069,26 +1541,34 @@
     jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000001_0 on tt1");
     // Total 3 map slots should be accounted for.
     checkOccupiedSlots("default", TaskType.MAP, 1, 3, 75.0f);
-    assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+    assertEquals(
+      String.format(
+        TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
         1, 2, 0, 0, 0, 0),
-        (String) job2.getSchedulingInfo());
+      (String) job2.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
 
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_r_000001_0 on tt1");
     // Total 3 reduce slots should be accounted for.
-    checkOccupiedSlots("default", TaskType.REDUCE, 1, 3,
-        75.0f);
-    assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+    checkOccupiedSlots(
+      "default", TaskType.REDUCE, 1, 3,
+      75.0f);
+    assertEquals(
+      String.format(
+        TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
         1, 2, 0, 1, 2, 0),
-        (String) job2.getSchedulingInfo());
+      (String) job2.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
 
-    LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
+    LOG.debug(
+      "Submit one normal memory(1GB maps/reduces) job of "
         + "1 map, 0 reduce tasks.");
     jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
@@ -2111,24 +1591,28 @@
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
     LOG.info(job2.getSchedulingInfo());
-    assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+    assertEquals(
+      String.format(
+        TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
         1, 2, 2, 1, 2, 0),
-        (String) job2.getSchedulingInfo());
-    assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+      (String) job2.getSchedulingInfo());
+    assertEquals(
+      String.format(
+        TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
         0, 0, 0, 0, 0, 0),
-        (String) job3.getSchedulingInfo());
+      (String) job3.getSchedulingInfo());
 
     // One reservation is already done for job2. So job3 should go ahead.
-    checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0003_m_000001_0 on tt2");
   }
 
   /**
    * Testcase to verify fix for a NPE (HADOOP-5641), when memory based
    * scheduling is enabled and jobs are retired from memory when tasks
    * are still active on some Tasktrackers.
-   *  
+   *
    * @throws IOException
    */
   public void testMemoryMatchingWithRetiredJobs() throws IOException {
@@ -2139,24 +1623,24 @@
     // create scheduler
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
-    taskTrackerManager.addQueues(new String[] { "default" });
+    taskTrackerManager.addQueues(new String[]{"default"});
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
     LOG.debug("Assume TT has 2GB for maps and 2GB for reduces");
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
-        2 * 1024L);

[... 1290 lines stripped ...]


Mime
View raw message