hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r804284 [3/4] - in /hadoop/mapreduce/trunk: ./ src/contrib/fairscheduler/designdoc/ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/docs/src/documentation/content...
Date Fri, 14 Aug 2009 16:32:05 GMT
Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Aug 14 16:32:04 2009
@@ -58,11 +58,14 @@
     private FakeTaskTrackerManager taskTrackerManager;
     private int mapCounter = 0;
     private int reduceCounter = 0;
+    private final String[][] mapInputLocations; // Array of hosts for each map
     
     public FakeJobInProgress(JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager) throws IOException {
+        FakeTaskTrackerManager taskTrackerManager, 
+        String[][] mapInputLocations) throws IOException {
       super(new JobID("test", ++jobCounter), jobConf, null);
       this.taskTrackerManager = taskTrackerManager;
+      this.mapInputLocations = mapInputLocations;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus();
       this.status.setRunState(JobStatus.PREP);
@@ -103,58 +106,71 @@
       setup[1].setJobSetupTask();
       // create maps
       numMapTasks = conf.getNumMapTasks();
-      System.out.println("numMapTasks = " + numMapTasks);
       maps = new TaskInProgress[numMapTasks];
       for (int i = 0; i < numMapTasks; i++) {
-        maps[i] = new FakeTaskInProgress(getJobID(), 
-            getJobConf(), true, this);
+        String[] inputLocations = null;
+        if (mapInputLocations != null)
+          inputLocations = mapInputLocations[i];
+        maps[i] = new FakeTaskInProgress(getJobID(), i,
+            getJobConf(), this, inputLocations);
+        if (mapInputLocations == null) // Job has no locality info
+          nonLocalMaps.add(maps[i]);
       }
       // create reduces
       numReduceTasks = conf.getNumReduceTasks();
-      System.out.println("numReduceTasks = " + numReduceTasks);
       reduces = new TaskInProgress[numReduceTasks];
       for (int i = 0; i < numReduceTasks; i++) {
-        reduces[i] = new FakeTaskInProgress(getJobID(), 
-            getJobConf(), false, this);
+        reduces[i] = new FakeTaskInProgress(getJobID(), i,
+            getJobConf(), this);
       }
     }
 
     @Override
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
-        int numUniqueHosts) throws IOException {
-      TaskAttemptID attemptId = getTaskAttemptID(true);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1) {
-        @Override
-        public String toString() {
-          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+        int numUniqueHosts, int localityLevel) throws IOException {
+      for (int map = 0; map < maps.length; map++) {
+        FakeTaskInProgress tip = (FakeTaskInProgress) maps[map];
+        if (!tip.isRunning() && !tip.isComplete() &&
+            getLocalityLevel(tip, tts) < localityLevel) {
+          TaskAttemptID attemptId = getTaskAttemptID(tip);
+          Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1) {
+            @Override
+            public String toString() {
+              return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+            }
+          };
+          runningMapTasks++;
+          tip.createTaskAttempt(task, tts.getTrackerName());
+          nonLocalRunningMaps.add(tip);
+          taskTrackerManager.startTask(tts.getTrackerName(), task, tip);
+          return task;
         }
-      };
-      runningMapTasks++;
-      FakeTaskInProgress tip = 
-        (FakeTaskInProgress) maps[attemptId.getTaskID().getId()];
-      tip.createTaskAttempt(task, tts.getTrackerName());
-      nonLocalRunningMaps.add(tip);
-      taskTrackerManager.startTask(tts.getTrackerName(), task, tip);
-      return task;
+      }
+      return null;
     }
     
     @Override
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
-      TaskAttemptID attemptId = getTaskAttemptID(false);
-      Task task = new ReduceTask("", attemptId, 0, 10, 1) {
-        @Override
-        public String toString() {
-          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+      for (int reduce = 0; reduce < reduces.length; reduce++) {
+        FakeTaskInProgress tip = 
+          (FakeTaskInProgress) reduces[reduce];
+        if (!tip.isRunning() && !tip.isComplete()) {
+          TaskAttemptID attemptId = getTaskAttemptID(tip);
+          Task task = new ReduceTask("", attemptId, 0, maps.length, 1) {
+            @Override
+            public String toString() {
+              return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+            }
+          };
+          runningReduceTasks++;
+          tip.createTaskAttempt(task, tts.getTrackerName());
+          runningReduces.add(tip);
+          taskTrackerManager.startTask(tts.getTrackerName(), task, tip);
+          return task;
         }
-      };
-      runningReduceTasks++;
-      FakeTaskInProgress tip = 
-        (FakeTaskInProgress) reduces[attemptId.getTaskID().getId()];
-      tip.createTaskAttempt(task, tts.getTrackerName());
-      runningReduces.add(tip);
-      taskTrackerManager.startTask(tts.getTrackerName(), task, tip);
-      return task;
+      }
+      return null;
     }
     
     public void mapTaskFinished(TaskInProgress tip) {
@@ -169,16 +185,34 @@
       runningReduces.remove(tip);
     }
     
-    private TaskAttemptID getTaskAttemptID(boolean isMap) {
+    private TaskAttemptID getTaskAttemptID(TaskInProgress tip) {
       JobID jobId = getJobID();
-      TaskType t = TaskType.REDUCE;
-      if (isMap) {
-        t = TaskType.MAP;
-        return new TaskAttemptID(jobId.getJtIdentifier(),
-            jobId.getId(), t, mapCounter++, 0);
+      TaskType type = tip.isMapTask() ? TaskType.MAP : TaskType.REDUCE;
+      return new TaskAttemptID(jobId.getJtIdentifier(),
+          jobId.getId(), type, tip.getIdWithinJob(), tip.nextTaskId++);
+    }
+    
+    @Override
+    int getLocalityLevel(TaskInProgress tip, TaskTrackerStatus tts) {
+      FakeTaskInProgress ftip = (FakeTaskInProgress) tip;
+      if (ftip.inputLocations != null) {
+        // Check whether we're on the same host as an input split
+        for (String location: ftip.inputLocations) {
+          if (location.equals(tts.host)) {
+            return 0;
+          }
+        }
+        // Check whether we're on the same rack as an input split
+        for (String location: ftip.inputLocations) {
+          if (getRack(location).equals(getRack(tts.host))) {
+            return 1;
+          }
+        }
+        // Not on same rack or host
+        return 2;
       } else {
-        return new TaskAttemptID(jobId.getJtIdentifier(),
-            jobId.getId(), t, reduceCounter++, 0);
+        // Job has no locality info  
+        return -1;
       }
     }
   }
@@ -189,17 +223,31 @@
     private TreeMap<TaskAttemptID, String> activeTasks;
     private TaskStatus taskStatus;
     private boolean isComplete = false;
+    private String[] inputLocations;
     
-    FakeTaskInProgress(JobID jId, JobConf jobConf, boolean isMap,
-        FakeJobInProgress job) {
-      super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0, 1);
-      this.isMap = isMap;
+    // Constructor for map
+    FakeTaskInProgress(JobID jId, int id, JobConf jobConf,
+        FakeJobInProgress job, String[] inputLocations) {
+      super(jId, "", new JobClient.RawSplit(), null, jobConf, job, id, 1);
+      this.isMap = true;
       this.fakeJob = job;
+      this.inputLocations = inputLocations;
       activeTasks = new TreeMap<TaskAttemptID, String>();
       taskStatus = TaskStatus.createTaskStatus(isMap);
       taskStatus.setRunState(TaskStatus.State.UNASSIGNED);
     }
 
+    // Constructor for reduce
+    FakeTaskInProgress(JobID jId, int id, JobConf jobConf,
+        FakeJobInProgress job) {
+      super(jId, "", jobConf.getNumMapTasks(), id, null, jobConf, job, 1);
+      this.isMap = false;
+      this.fakeJob = job;
+      activeTasks = new TreeMap<TaskAttemptID, String>();
+      taskStatus = TaskStatus.createTaskStatus(isMap);
+      taskStatus.setRunState(TaskStatus.State.UNASSIGNED);
+    }
+    
     private void createTaskAttempt(Task task, String taskTracker) {
       activeTasks.put(task.getTaskID(), taskTracker);
       taskStatus = TaskStatus.createTaskStatus(isMap, task.getTaskID(),
@@ -271,6 +319,7 @@
     long ttExpiryInterval = 10 * 60 * 1000L; // default interval
     List<JobInProgressListener> listeners =
       new ArrayList<JobInProgressListener>();
+    Map<JobID, JobInProgress> jobs = new HashMap<JobID, JobInProgress>();
     
     private Map<String, TaskTracker> trackers =
       new HashMap<String, TaskTracker>();
@@ -280,14 +329,20 @@
       new HashMap<String, FakeTaskInProgress>();
     private Map<String, TaskTrackerStatus> trackerForTip =
       new HashMap<String, TaskTrackerStatus>();
-
-    public FakeTaskTrackerManager(int numTrackers) {
-      for (int i = 1; i <= numTrackers; i++) {
-        TaskTracker tt = new TaskTracker("tt" + i);
-        tt.setStatus(new TaskTrackerStatus("tt" + i,  "host" + i, i,
-            new ArrayList<TaskStatus>(), 0,
-            maxMapTasksPerTracker, maxReduceTasksPerTracker));
-        trackers.put("tt" + i, tt);
+    
+    public FakeTaskTrackerManager(int numRacks, int numTrackersPerRack) {
+      int nextTrackerId = 1;
+      for (int rack = 1; rack <= numRacks; rack++) {
+        for (int node = 1; node <= numTrackersPerRack; node++) {
+          int id = nextTrackerId++;
+          String host = "rack" + rack + ".node" + node;
+          System.out.println("Creating TaskTracker tt" + id + " on " + host);
+          TaskTracker tt = new TaskTracker("tt" + id);
+          tt.setStatus(new TaskTrackerStatus("tt" + id, host, 0,
+              new ArrayList<TaskStatus>(), 0,
+              maxMapTasksPerTracker, maxReduceTasksPerTracker));
+          trackers.put("tt" + id, tt);
+        }
       }
     }
     
@@ -309,7 +364,7 @@
     
     @Override
     public int getNumberOfUniqueHosts() {
-      return 0;
+      return trackers.size();
     }
 
     @Override
@@ -344,7 +399,7 @@
 
     @Override
     public JobInProgress getJob(JobID jobid) {
-      return null;
+      return jobs.get(jobid);
     }
 
     public void initJob (JobInProgress job) {
@@ -358,6 +413,7 @@
     // Test methods
     
     public void submitJob(JobInProgress job) throws IOException {
+      jobs.put(job.getJobID(), job);
       for (JobInProgressListener listener : listeners) {
         listener.jobAdded(job);
       }
@@ -426,15 +482,24 @@
     fileWriter.write("<?xml version=\"1.0\"?>\n");
     fileWriter.write("<allocations />\n");
     fileWriter.close();
-    setUpCluster(2);
+    setUpCluster(1, 2, false);
+  }
+
+  public String getRack(String hostname) {
+    // Host names are of the form rackN.nodeM, so split at the dot.
+    return hostname.split("\\.")[0];
   }
 
-  private void setUpCluster(int numTaskTrackers) {
+  private void setUpCluster(int numRacks, int numNodesPerRack,
+      boolean assignMultiple) {
     conf = new JobConf();
     conf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE);
     conf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY);
-    conf.set("mapred.fairscheduler.assignmultiple", "false");
-    taskTrackerManager = new FakeTaskTrackerManager(numTaskTrackers);
+    conf.setBoolean("mapred.fairscheduler.assignmultiple", assignMultiple);
+    // Manually set locality delay because we aren't using a JobTracker so
+    // we can't auto-compute it from the heartbeat interval.
+    conf.setLong("mapred.fairscheduler.locality.delay", 10000);
+    taskTrackerManager = new FakeTaskTrackerManager(numRacks, numNodesPerRack);
     clock = new FakeClock();
     scheduler = new FairScheduler(clock, true);
     scheduler.waitForMapsBeforeLaunchingReduces = false;
@@ -452,17 +517,23 @@
   
   private JobInProgress submitJob(int state, int maps, int reduces)
       throws IOException {
-    return submitJob(state, maps, reduces, null);
+    return submitJob(state, maps, reduces, null, null);
   }
   
   private JobInProgress submitJob(int state, int maps, int reduces, String pool)
       throws IOException {
+    return submitJob(state, maps, reduces, pool, null);
+  }
+  
+  private JobInProgress submitJob(int state, int maps, int reduces, String pool,
+      String[][] mapInputLocations) throws IOException {
     JobConf jobConf = new JobConf(conf);
     jobConf.setNumMapTasks(maps);
     jobConf.setNumReduceTasks(reduces);
     if (pool != null)
       jobConf.set(POOL_PROPERTY, pool);
-    JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager);
+    JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
+        mapInputLocations);
     job.getStatus().setRunState(state);
     taskTrackerManager.submitJob(job);
     job.startTime = clock.time;
@@ -579,14 +650,12 @@
     JobInfo info1 = scheduler.infos.get(job1);
     
     // Check scheduler variables
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(2,    info1.neededMaps);
-    assertEquals(1,    info1.neededReduces);
-    assertEquals(0,    info1.mapDeficit);
-    assertEquals(0,    info1.reduceDeficit);
-    assertEquals(4.0,  info1.mapFairShare);
-    assertEquals(4.0,  info1.reduceFairShare);
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(2,    info1.mapSchedulable.getDemand());
+    assertEquals(1,    info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
     
     // Advance time before submitting another job j2, to make j1 run before j2
     // deterministically.
@@ -594,70 +663,119 @@
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
     JobInfo info2 = scheduler.infos.get(job2);
     
-    // Check scheduler variables; the fair shares should now have been allocated
-    // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(2,    info1.neededMaps);
-    assertEquals(1,    info1.neededReduces);
-    assertEquals(400,  info1.mapDeficit);
-    assertEquals(400,  info1.reduceDeficit);
-    assertEquals(2.0,  info1.mapFairShare);
-    assertEquals(2.0,  info1.reduceFairShare);
-    assertEquals(0,    info2.runningMaps);
-    assertEquals(0,    info2.runningReduces);
-    assertEquals(1,    info2.neededMaps);
-    assertEquals(2,    info2.neededReduces);
-    assertEquals(0,    info2.mapDeficit);
-    assertEquals(0,    info2.reduceDeficit);
-    assertEquals(2.0,  info2.mapFairShare);
-    assertEquals(2.0,  info2.reduceFairShare);
+    // Check scheduler variables
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(2,    info1.mapSchedulable.getDemand());
+    assertEquals(1,    info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(0,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(1,    info2.mapSchedulable.getDemand());
+    assertEquals(2,    info2.reduceSchedulable.getDemand());
+    assertEquals(1.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
     
-    // Assign tasks and check that all slots are filled with j1, then j2
+    // Assign tasks and check that jobs alternate in filling slots
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
     assertNull(scheduler.assignTasks(tracker("tt2")));
     
     // Check that the scheduler has started counting the tasks as running
     // as soon as it launched them.
-    assertEquals(2,  info1.runningMaps);
-    assertEquals(1,  info1.runningReduces);
-    assertEquals(0,  info1.neededMaps);
-    assertEquals(0,  info1.neededReduces);
-    assertEquals(1,  info2.runningMaps);
-    assertEquals(2,  info2.runningReduces);
-    assertEquals(0, info2.neededMaps);
-    assertEquals(0, info2.neededReduces);
+    assertEquals(2,  info1.mapSchedulable.getRunningTasks());
+    assertEquals(1,  info1.reduceSchedulable.getRunningTasks());
+    assertEquals(2,  info1.mapSchedulable.getDemand());
+    assertEquals(1,  info1.reduceSchedulable.getDemand());
+    assertEquals(1,  info2.mapSchedulable.getRunningTasks());
+    assertEquals(2,  info2.reduceSchedulable.getRunningTasks());
+    assertEquals(1, info2.mapSchedulable.getDemand());
+    assertEquals(2, info2.reduceSchedulable.getDemand());
+  }
+  
+  /**
+   * This test is identical to testSmallJobs but sets assignMultiple to
+   * true so that multiple tasks can be assigned per heartbeat.
+   */
+  public void testSmallJobsWithAssignMultiple() throws IOException {
+    setUpCluster(1, 2, true);
+    
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(2,    info1.mapSchedulable.getDemand());
+    assertEquals(1,    info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
+    
+    // Advance time before submitting another job j2, to make j1 run before j2
+    // deterministically.
+    advanceTime(100);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(2,    info1.mapSchedulable.getDemand());
+    assertEquals(1,    info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(0,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(1,    info2.mapSchedulable.getDemand());
+    assertEquals(2,    info2.reduceSchedulable.getDemand());
+    assertEquals(1.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    
+    // Assign tasks and check that jobs alternate in filling slots
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1",
+                           "attempt_test_0002_m_000000_0 on tt1",
+                           "attempt_test_0001_r_000000_0 on tt1",
+                           "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2",
+                           "attempt_test_0002_r_000001_0 on tt2");
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+    
+    // Check that the scheduler has started counting the tasks as running
+    // as soon as it launched them.
+    assertEquals(2,  info1.mapSchedulable.getRunningTasks());
+    assertEquals(1,  info1.reduceSchedulable.getRunningTasks());
+    assertEquals(2,  info1.mapSchedulable.getDemand());
+    assertEquals(1,  info1.reduceSchedulable.getDemand());
+    assertEquals(1,  info2.mapSchedulable.getRunningTasks());
+    assertEquals(2,  info2.reduceSchedulable.getRunningTasks());
+    assertEquals(1, info2.mapSchedulable.getDemand());
+    assertEquals(2, info2.reduceSchedulable.getDemand());
   }
   
   /**
    * This test begins by submitting two jobs with 10 maps and reduces each.
-   * The first job is submitted 100ms after the second, during which time no
-   * tasks run. After this, we assign tasks to all slots, which should all be
-   * from job 1. These run for 200ms, at which point job 2 now has a deficit
-   * of 400 while job 1 is down to a deficit of 0. We then finish all tasks and
-   * assign new ones, which should all be from job 2. These run for 50 ms,
-   * which is not enough time for job 2 to make up its deficit (it only makes up
-   * 100 ms of deficit). Finally we assign a new round of tasks, which should
-   * all be from job 2 again.
+   * The first job is submitted 100ms after the second, to make it get slots
+   * first deterministically. We then assign a wave of tasks and check that
+   * they are given alternately to job1, job2, job1, job2, etc. We finish
+   * these tasks and assign a second wave, which should continue to be
+   * allocated in this manner.
    */
   public void testLargeJobs() throws IOException {
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     JobInfo info1 = scheduler.infos.get(job1);
     
     // Check scheduler variables
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(10,   info1.neededMaps);
-    assertEquals(10,   info1.neededReduces);
-    assertEquals(0,    info1.mapDeficit);
-    assertEquals(0,    info1.reduceDeficit);
-    assertEquals(4.0,  info1.mapFairShare);
-    assertEquals(4.0,  info1.reduceFairShare);
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(4.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(4.0,  info1.reduceSchedulable.getFairShare());
     
     // Advance time before submitting another job j2, to make j1 run before j2
     // deterministically.
@@ -665,34 +783,29 @@
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
     JobInfo info2 = scheduler.infos.get(job2);
     
-    // Check scheduler variables; the fair shares should now have been allocated
-    // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(10,   info1.neededMaps);
-    assertEquals(10,   info1.neededReduces);
-    assertEquals(400,  info1.mapDeficit);
-    assertEquals(400,  info1.reduceDeficit);
-    assertEquals(2.0,  info1.mapFairShare);
-    assertEquals(2.0,  info1.reduceFairShare);
-    assertEquals(0,    info2.runningMaps);
-    assertEquals(0,    info2.runningReduces);
-    assertEquals(10,   info2.neededMaps);
-    assertEquals(10,   info2.neededReduces);
-    assertEquals(0,    info2.mapDeficit);
-    assertEquals(0,    info2.reduceDeficit);
-    assertEquals(2.0,  info2.mapFairShare);
-    assertEquals(2.0,  info2.reduceFairShare);
+    // Check scheduler variables
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(0,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info2.mapSchedulable.getDemand());
+    assertEquals(10,   info2.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
     
-    // Assign tasks and check that all slots are initially filled with job 1
+    // Check that tasks are filled alternately by the jobs
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
     
     // Check that no new tasks can be launched once the tasktrackers are full
     assertNull(scheduler.assignTasks(tracker("tt1")));
@@ -700,84 +813,175 @@
     
     // Check that the scheduler has started counting the tasks as running
     // as soon as it launched them.
-    assertEquals(4,  info1.runningMaps);
-    assertEquals(4,  info1.runningReduces);
-    assertEquals(6,  info1.neededMaps);
-    assertEquals(6,  info1.neededReduces);
-    assertEquals(0,  info2.runningMaps);
-    assertEquals(0,  info2.runningReduces);
-    assertEquals(10, info2.neededMaps);
-    assertEquals(10, info2.neededReduces);
+    assertEquals(2,  info1.mapSchedulable.getRunningTasks());
+    assertEquals(2,  info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,  info1.mapSchedulable.getDemand());
+    assertEquals(10,  info1.reduceSchedulable.getDemand());
+    assertEquals(2,  info2.mapSchedulable.getRunningTasks());
+    assertEquals(2,  info2.reduceSchedulable.getRunningTasks());
+    assertEquals(10, info2.mapSchedulable.getDemand());
+    assertEquals(10, info2.reduceSchedulable.getDemand());
     
     // Finish up the tasks and advance time again. Note that we must finish
     // the task since FakeJobInProgress does not properly maintain running
     // tasks, so the scheduler will always get an empty task list from
     // the JobInProgress's getMapTasks/getReduceTasks and think they finished.
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000000_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000000_0");
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000000_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000002_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000002_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000003_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000000_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0");
     advanceTime(200);
-    assertEquals(0,   info1.runningMaps);
-    assertEquals(0,   info1.runningReduces);
-    assertEquals(0,   info1.mapDeficit);
-    assertEquals(0,   info1.reduceDeficit);
-    assertEquals(0,   info2.runningMaps);
-    assertEquals(0,   info2.runningReduces);
-    assertEquals(400, info2.mapDeficit);
-    assertEquals(400, info2.reduceDeficit);
-
-    // Assign tasks and check that all slots are now filled with job 2
-    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+    assertEquals(0,   info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,   info1.reduceSchedulable.getRunningTasks());
+    assertEquals(0,   info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,   info2.reduceSchedulable.getRunningTasks());
+
+    // Check that tasks are filled alternately by the jobs
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
-
-    // Finish up the tasks and advance time again, but give job 2 only 50ms.
+    
+    // Check scheduler variables; the demands should now be 8 because 2 tasks
+    // of each type have finished in each job
+    assertEquals(2,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(2,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(8,   info1.mapSchedulable.getDemand());
+    assertEquals(8,   info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(2,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(2,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(8,   info2.mapSchedulable.getDemand());
+    assertEquals(8,   info2.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+  }
+  
+  /**
+   * A copy of testLargeJobs that enables the assignMultiple feature to launch
+   * multiple tasks per heartbeat. Results should be the same as testLargeJobs.
+   */
+  public void testLargeJobsWithAssignMultiple() throws IOException {
+    setUpCluster(1, 2, true);
+    
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(4.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(4.0,  info1.reduceSchedulable.getFairShare());
+    
+    // Advance time before submitting another job j2, to make j1 run before j2
+    // deterministically.
+    advanceTime(100);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check scheduler variables; the fair shares should now have been allocated
+    // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(0,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info2.mapSchedulable.getDemand());
+    assertEquals(10,   info2.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    
+    // Check that tasks are filled alternately by the jobs
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1",
+                           "attempt_test_0002_m_000000_0 on tt1",
+                           "attempt_test_0001_r_000000_0 on tt1",
+                           "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2",
+                           "attempt_test_0002_m_000001_0 on tt2",
+                           "attempt_test_0001_r_000001_0 on tt2",
+                           "attempt_test_0002_r_000001_0 on tt2");
+    
+    // Check that no new tasks can be launched once the tasktrackers are full
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+    
+    // Check that the scheduler has started counting the tasks as running
+    // as soon as it launched them.
+    assertEquals(2,  info1.mapSchedulable.getRunningTasks());
+    assertEquals(2,  info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,  info1.mapSchedulable.getDemand());
+    assertEquals(10,  info1.reduceSchedulable.getDemand());
+    assertEquals(2,  info2.mapSchedulable.getRunningTasks());
+    assertEquals(2,  info2.reduceSchedulable.getRunningTasks());
+    assertEquals(10, info2.mapSchedulable.getDemand());
+    assertEquals(10, info2.reduceSchedulable.getDemand());
+    
+    // Finish up the tasks and advance time again. Note that we must finish
+    // the task since FakeJobInProgress does not properly maintain running
+    // tasks, so the scheduler will always get an empty task list from
+    // the JobInProgress's getMapTasks/getReduceTasks and think they finished.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000000_0");
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000000_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000000_0");
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000000_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000001_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000002_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000003_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000002_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000003_0");
-    advanceTime(50);
-    assertEquals(0,   info1.runningMaps);
-    assertEquals(0,   info1.runningReduces);
-    assertEquals(100, info1.mapDeficit);
-    assertEquals(100, info1.reduceDeficit);
-    assertEquals(0,   info2.runningMaps);
-    assertEquals(0,   info2.runningReduces);
-    assertEquals(300, info2.mapDeficit);
-    assertEquals(300, info2.reduceDeficit);
-
-    // Assign tasks and check that all slots are now still with job 2
-    checkAssignment("tt1", "attempt_test_0002_m_000004_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000005_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000005_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0");
+    advanceTime(200);
+    assertEquals(0,   info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,   info1.reduceSchedulable.getRunningTasks());
+    assertEquals(0,   info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,   info2.reduceSchedulable.getRunningTasks());
+
+    // Check that tasks are filled alternately by the jobs
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1",
+                           "attempt_test_0002_m_000002_0 on tt1",
+                           "attempt_test_0001_r_000002_0 on tt1",
+                           "attempt_test_0002_r_000002_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2",
+                           "attempt_test_0002_m_000003_0 on tt2",
+                           "attempt_test_0001_r_000003_0 on tt2",
+                           "attempt_test_0002_r_000003_0 on tt2");
+    
+    // Check scheduler variables; the demands should now be 8 because 2 tasks
+    // of each type have finished in each job
+    assertEquals(2,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(2,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(8,   info1.mapSchedulable.getDemand());
+    assertEquals(8,   info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(2,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(2,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(8,   info2.mapSchedulable.getDemand());
+    assertEquals(8,   info2.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
   }
-  
 
   /**
-   * We submit two jobs such that one has 2x the priority of the other, wait
-   * for 100 ms, and check that the weights/deficits are okay and that the
-   * tasks all go to the high-priority job.
+   * We submit two jobs such that one has 2x the priority of the other to 
+   * a cluster of 3 nodes, wait for 100 ms, and check that the weights/shares 
+   * the high-priority job gets 4 tasks while the normal-priority job gets 2.
    */
   public void testJobsWithPriorities() throws IOException {
+    setUpCluster(1, 3, false);
+    
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     JobInfo info1 = scheduler.infos.get(job1);
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
@@ -786,56 +990,52 @@
     scheduler.update();
     
     // Check scheduler variables
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(10,   info1.neededMaps);
-    assertEquals(10,   info1.neededReduces);
-    assertEquals(0,    info1.mapDeficit);
-    assertEquals(0,    info1.reduceDeficit);
-    assertEquals(1.33, info1.mapFairShare, 0.1);
-    assertEquals(1.33, info1.reduceFairShare, 0.1);
-    assertEquals(0,    info2.runningMaps);
-    assertEquals(0,    info2.runningReduces);
-    assertEquals(10,   info2.neededMaps);
-    assertEquals(10,   info2.neededReduces);
-    assertEquals(0,    info2.mapDeficit);
-    assertEquals(0,    info2.reduceDeficit);
-    assertEquals(2.66, info2.mapFairShare, 0.1);
-    assertEquals(2.66, info2.reduceFairShare, 0.1);
-    
-    // Advance time and check deficits
-    advanceTime(100);
-    assertEquals(133,  info1.mapDeficit, 1.0);
-    assertEquals(133,  info1.reduceDeficit, 1.0);
-    assertEquals(266,  info2.mapDeficit, 1.0);
-    assertEquals(266,  info2.reduceDeficit, 1.0);
+    assertEquals(0,   info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,   info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,  info1.mapSchedulable.getDemand());
+    assertEquals(10,  info1.reduceSchedulable.getDemand());
+    assertEquals(2.0, info1.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(2.0, info1.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0,   info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,   info2.reduceSchedulable.getRunningTasks());
+    assertEquals(10,  info2.mapSchedulable.getDemand());
+    assertEquals(10,  info2.reduceSchedulable.getDemand());
+    assertEquals(4.0, info2.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(4.0, info2.reduceSchedulable.getFairShare(), 0.1);
+    
+    // Advance time
+    advanceTime(100);
     
-    // Assign tasks and check that all slots are filled with j1, then j2
+    // Assign tasks and check that j2 gets 2x more tasks than j1. In addition,
+    // whenever the jobs' runningTasks/weight ratios are tied, j1 should get
+    // the new task first because it started first; thus the tasks of each
+    // type should be handed out alternately to 1, 2, 2, 1, 2, 2, etc.
+    System.out.println("HEREEEE");
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0002_r_000002_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0002_r_000003_0 on tt3");
   }
   
   /**
    * This test starts by submitting three large jobs:
    * - job1 in the default pool, at time 0
    * - job2 in poolA, with an allocation of 1 map / 2 reduces, at time 200
-   * - job3 in poolB, with an allocation of 2 maps / 1 reduce, at time 200
+   * - job3 in poolB, with an allocation of 2 maps / 1 reduce, at time 300
    * 
-   * After this, we sleep 100ms, until time 300. At this point, job1 has the
-   * highest map deficit, job3 the second, and job2 the third. This is because
-   * job3 has more maps in its min share than job2, but job1 has been around
-   * a long time at the beginning. The reduce deficits are similar, except job2
-   * comes before job3 because it had a higher reduce minimum share.
-   * 
-   * Finally, assign tasks to all slots. The maps should be assigned in the
-   * order job3, job2, job1 because 3 and 2 both have guaranteed slots and 3
-   * has a higher deficit. The reduces should be assigned as job2, job3, job1.
+   * We then assign tasks to all slots. The maps should be assigned in the
+   * order job2, job3, job 3, job1 because jobs 3 and 2 have guaranteed slots
+   * (1 and 2 respectively). Job2 comes before job3 when they are both at 0
+   * slots because it has an earlier start time. In a similar manner,
+   * reduces should be assigned as job2, job3, job2, job1.
    */
   public void testLargeJobsWithPools() throws Exception {
     // Set up pools file
@@ -855,60 +1055,54 @@
     out.println("</allocations>");
     out.close();
     scheduler.getPoolManager().reloadAllocs();
+    Pool defaultPool = scheduler.getPoolManager().getPool("default");
+    Pool poolA = scheduler.getPoolManager().getPool("poolA");
+    Pool poolB = scheduler.getPoolManager().getPool("poolB");
     
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     JobInfo info1 = scheduler.infos.get(job1);
     
     // Check scheduler variables
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(10,   info1.neededMaps);
-    assertEquals(10,   info1.neededReduces);
-    assertEquals(0,    info1.mapDeficit);
-    assertEquals(0,    info1.reduceDeficit);
-    assertEquals(4.0,  info1.mapFairShare);
-    assertEquals(4.0,  info1.reduceFairShare);
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(4.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(4.0,  info1.reduceSchedulable.getFairShare());
     
     // Advance time 200ms and submit jobs 2 and 3
     advanceTime(200);
-    assertEquals(800,  info1.mapDeficit);
-    assertEquals(800,  info1.reduceDeficit);
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
     JobInfo info2 = scheduler.infos.get(job2);
+    advanceTime(100);
     JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
     JobInfo info3 = scheduler.infos.get(job3);
     
     // Check that minimum and fair shares have been allocated
-    assertEquals(0,    info1.minMaps);
-    assertEquals(0,    info1.minReduces);
-    assertEquals(1.0,  info1.mapFairShare);
-    assertEquals(1.0,  info1.reduceFairShare);
-    assertEquals(1,    info2.minMaps);
-    assertEquals(2,    info2.minReduces);
-    assertEquals(1.0,  info2.mapFairShare);
-    assertEquals(2.0,  info2.reduceFairShare);
-    assertEquals(2,    info3.minMaps);
-    assertEquals(1,    info3.minReduces);
-    assertEquals(2.0,  info3.mapFairShare);
-    assertEquals(1.0,  info3.reduceFairShare);
-    
-    // Advance time 100ms and check deficits
-    advanceTime(100);
-    assertEquals(900,  info1.mapDeficit);
-    assertEquals(900,  info1.reduceDeficit);
-    assertEquals(100,  info2.mapDeficit);
-    assertEquals(200,  info2.reduceDeficit);
-    assertEquals(200,  info3.mapDeficit);
-    assertEquals(100,  info3.reduceDeficit);
+    assertEquals(0,    defaultPool.getMapSchedulable().getMinShare());
+    assertEquals(0,    defaultPool.getReduceSchedulable().getMinShare());
+    assertEquals(1.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(1,    poolA.getMapSchedulable().getMinShare());
+    assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
+    assertEquals(1.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    assertEquals(2,    poolB.getMapSchedulable().getMinShare());
+    assertEquals(1,    poolB.getReduceSchedulable().getMinShare());
+    assertEquals(2.0,  info3.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info3.reduceSchedulable.getFairShare());
+    
+    // Advance time 100ms
+    advanceTime(100);
     
     // Assign tasks and check that slots are first given to needy jobs
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2");
   }
 
@@ -918,8 +1112,7 @@
    * - job2 in poolA, with an allocation of 2 maps / 2 reduces, at time 200
    * - job3 in poolA, with an allocation of 2 maps / 2 reduces, at time 300
    * 
-   * After this, we sleep 100ms, until time 400. At this point, job1 has the
-   * highest deficit, job2 the second, and job3 the third. The first two tasks
+   * After this, we start assigning tasks. The first two tasks of each type
    * should be assigned to job2 and job3 since they are in a pool with an
    * allocation guarantee, but the next two slots should be assigned to job 3
    * because the pool will no longer be needy.
@@ -937,71 +1130,52 @@
     out.println("</allocations>");
     out.close();
     scheduler.getPoolManager().reloadAllocs();
+    Pool poolA = scheduler.getPoolManager().getPool("poolA");
     
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     JobInfo info1 = scheduler.infos.get(job1);
     
     // Check scheduler variables
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(10,   info1.neededMaps);
-    assertEquals(10,   info1.neededReduces);
-    assertEquals(0,    info1.mapDeficit);
-    assertEquals(0,    info1.reduceDeficit);
-    assertEquals(4.0,  info1.mapFairShare);
-    assertEquals(4.0,  info1.reduceFairShare);
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(4.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(4.0,  info1.reduceSchedulable.getFairShare());
     
     // Advance time 200ms and submit job 2
     advanceTime(200);
-    assertEquals(800,  info1.mapDeficit);
-    assertEquals(800,  info1.reduceDeficit);
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
     JobInfo info2 = scheduler.infos.get(job2);
     
     // Check that minimum and fair shares have been allocated
-    assertEquals(0,    info1.minMaps);
-    assertEquals(0,    info1.minReduces);
-    assertEquals(2.0,  info1.mapFairShare);
-    assertEquals(2.0,  info1.reduceFairShare);
-    assertEquals(2,    info2.minMaps);
-    assertEquals(2,    info2.minReduces);
-    assertEquals(2.0,  info2.mapFairShare);
-    assertEquals(2.0,  info2.reduceFairShare);
+    assertEquals(2,    poolA.getMapSchedulable().getMinShare());
+    assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
     
     // Advance time 100ms and submit job 3
     advanceTime(100);
-    assertEquals(1000, info1.mapDeficit);
-    assertEquals(1000, info1.reduceDeficit);
-    assertEquals(200,  info2.mapDeficit);
-    assertEquals(200,  info2.reduceDeficit);
     JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
     JobInfo info3 = scheduler.infos.get(job3);
     
     // Check that minimum and fair shares have been allocated
-    assertEquals(0,    info1.minMaps);
-    assertEquals(0,    info1.minReduces);
-    assertEquals(2,    info1.mapFairShare, 0.1);
-    assertEquals(2,    info1.reduceFairShare, 0.1);
-    assertEquals(1,    info2.minMaps);
-    assertEquals(1,    info2.minReduces);
-    assertEquals(1,    info2.mapFairShare, 0.1);
-    assertEquals(1,    info2.reduceFairShare, 0.1);
-    assertEquals(1,    info3.minMaps);
-    assertEquals(1,    info3.minReduces);
-    assertEquals(1,    info3.mapFairShare, 0.1);
-    assertEquals(1,    info3.reduceFairShare, 0.1);
-    
-    // Advance time 100ms and check deficits
-    advanceTime(100);
-    assertEquals(1200, info1.mapDeficit, 1.0);
-    assertEquals(1200, info1.reduceDeficit, 1.0);
-    assertEquals(300,  info2.mapDeficit, 1.0);
-    assertEquals(300,  info2.reduceDeficit, 1.0);
-    assertEquals(100,  info3.mapDeficit, 1.0);
-    assertEquals(100,  info3.reduceDeficit, 1.0);
+    assertEquals(2,    poolA.getMapSchedulable().getMinShare());
+    assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(1.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info2.reduceSchedulable.getFairShare());
+    assertEquals(1.0,  info3.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info3.reduceSchedulable.getFairShare());
+    
+    // Advance time
+    advanceTime(100);
     
     // Assign tasks and check that slots are first given to needy jobs, but
-    // that job 1 gets two tasks after due to having a larger deficit.
+    // that job 1 gets two tasks after due to having a larger share.
     checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
@@ -1013,16 +1187,88 @@
   }
   
   /**
+   * A copy of testLargeJobsWithExcessCapacity that enables assigning multiple
+   * tasks per heartbeat. Results should match testLargeJobsWithExcessCapacity.
+   */
+  public void testLargeJobsWithExcessCapacityAndAssignMultiple() 
+      throws Exception {
+    setUpCluster(1, 2, true);
+    
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give pool A a minimum of 2 maps, 2 reduces
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("<minReduces>2</minReduces>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    Pool poolA = scheduler.getPoolManager().getPool("poolA");
+    
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(4.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(4.0,  info1.reduceSchedulable.getFairShare());
+    
+    // Advance time 200ms and submit job 2
+    advanceTime(200);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check that minimum and fair shares have been allocated
+    assertEquals(2,    poolA.getMapSchedulable().getMinShare());
+    assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    
+    // Advance time 100ms and submit job 3
+    advanceTime(100);
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    JobInfo info3 = scheduler.infos.get(job3);
+    
+    // Check that minimum and fair shares have been allocated
+    assertEquals(2,    poolA.getMapSchedulable().getMinShare());
+    assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(1.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info2.reduceSchedulable.getFairShare());
+    assertEquals(1.0,  info3.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info3.reduceSchedulable.getFairShare());
+    
+    // Advance time
+    advanceTime(100);
+    
+    // Assign tasks and check that slots are first given to needy jobs, but
+    // that job 1 gets two tasks after due to having a larger share.
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1",
+                           "attempt_test_0003_m_000000_0 on tt1",
+                           "attempt_test_0002_r_000000_0 on tt1",
+                           "attempt_test_0003_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2",
+                           "attempt_test_0001_m_000001_0 on tt2",
+                           "attempt_test_0001_r_000000_0 on tt2",
+                           "attempt_test_0001_r_000001_0 on tt2");
+  }
+  
+  /**
    * This test starts by submitting two jobs at time 0:
    * - job1 in the default pool
    * - job2, with 1 map and 1 reduce, in poolA, which has an alloc of 4
    *   maps and 4 reduces
    * 
    * When we assign the slots, job2 should only get 1 of each type of task.
-   * 
-   * The fair share for job 2 should be 2.0 however, because even though it is
-   * running only one task, it accumulates deficit in case it will have failures
-   * or need speculative tasks later. (TODO: This may not be a good policy.)
    */
   public void testSmallJobInLargePool() throws Exception {
     // Set up pools file
@@ -1044,22 +1290,18 @@
     JobInfo info2 = scheduler.infos.get(job2);
     
     // Check scheduler variables
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(10,   info1.neededMaps);
-    assertEquals(10,   info1.neededReduces);
-    assertEquals(0,    info1.mapDeficit);
-    assertEquals(0,    info1.reduceDeficit);
-    assertEquals(2.0,  info1.mapFairShare);
-    assertEquals(2.0,  info1.reduceFairShare);
-    assertEquals(0,    info2.runningMaps);
-    assertEquals(0,    info2.runningReduces);
-    assertEquals(1,    info2.neededMaps);
-    assertEquals(1,    info2.neededReduces);
-    assertEquals(0,    info2.mapDeficit);
-    assertEquals(0,    info2.reduceDeficit);
-    assertEquals(2.0,  info2.mapFairShare);
-    assertEquals(2.0,  info2.reduceFairShare);
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(3.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(3.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(0,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(1,    info2.mapSchedulable.getDemand());
+    assertEquals(1,    info2.reduceSchedulable.getDemand());
+    assertEquals(1.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info2.reduceSchedulable.getFairShare());
     
     // Assign tasks and check that slots are first given to needy jobs
     checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
@@ -1104,24 +1346,24 @@
     JobInfo info4 = scheduler.infos.get(job4);
     
     // Check scheduler variables
-    assertEquals(2.0,  info1.mapFairShare);
-    assertEquals(2.0,  info1.reduceFairShare);
-    assertEquals(2.0,  info2.mapFairShare);
-    assertEquals(2.0,  info2.reduceFairShare);
-    assertEquals(0.0,  info3.mapFairShare);
-    assertEquals(0.0,  info3.reduceFairShare);
-    assertEquals(0.0,  info4.mapFairShare);
-    assertEquals(0.0,  info4.reduceFairShare);
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    assertEquals(0.0,  info3.mapSchedulable.getFairShare());
+    assertEquals(0.0,  info3.reduceSchedulable.getFairShare());
+    assertEquals(0.0,  info4.mapSchedulable.getFairShare());
+    assertEquals(0.0,  info4.reduceSchedulable.getFairShare());
     
-    // Assign tasks and check that slots are first to jobs 1 and 2
+    // Assign tasks and check that only jobs 1 and 2 get them
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
     advanceTime(100);
-    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
   }
 
@@ -1161,25 +1403,25 @@
     JobInfo info4 = scheduler.infos.get(job4);
     
     // Check scheduler variables
-    assertEquals(1.33,  info1.mapFairShare, 0.1);
-    assertEquals(1.33,  info1.reduceFairShare, 0.1);
-    assertEquals(0.0,   info2.mapFairShare);
-    assertEquals(0.0,   info2.reduceFairShare);
-    assertEquals(1.33,  info3.mapFairShare, 0.1);
-    assertEquals(1.33,  info3.reduceFairShare, 0.1);
-    assertEquals(1.33,  info4.mapFairShare, 0.1);
-    assertEquals(1.33,  info4.reduceFairShare, 0.1);
+    assertEquals(1.33,  info1.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(1.33,  info1.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.0,   info2.mapSchedulable.getFairShare());
+    assertEquals(0.0,   info2.reduceSchedulable.getFairShare());
+    assertEquals(1.33,  info3.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(1.33,  info3.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(1.33,  info4.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(1.33,  info4.reduceSchedulable.getFairShare(), 0.1);
     
-    // Assign tasks and check that slots are first to jobs 1 and 3
+    // Assign tasks and check that slots are given only to jobs 1, 3 and 4
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
     advanceTime(100);
-    checkAssignment("tt2", "attempt_test_0003_m_000000_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_r_000000_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0004_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
   }
   
   /**
@@ -1262,60 +1504,36 @@
     // the other half. This works out to 2 slots each for the jobs
     // in poolA and 1/3 each for the jobs in the default pool because
     // there are 2 runnable jobs in poolA and 6 jobs in the default pool.
-    assertEquals(0.33,   info1.mapFairShare, 0.1);
-    assertEquals(0.33,   info1.reduceFairShare, 0.1);
-    assertEquals(0.0,    info2.mapFairShare);
-    assertEquals(0.0,    info2.reduceFairShare);
-    assertEquals(0.33,   info3.mapFairShare, 0.1);
-    assertEquals(0.33,   info3.reduceFairShare, 0.1);
-    assertEquals(0.33,   info4.mapFairShare, 0.1);
-    assertEquals(0.33,   info4.reduceFairShare, 0.1);
-    assertEquals(0.33,   info5.mapFairShare, 0.1);
-    assertEquals(0.33,   info5.reduceFairShare, 0.1);
-    assertEquals(0.33,   info6.mapFairShare, 0.1);
-    assertEquals(0.33,   info6.reduceFairShare, 0.1);
-    assertEquals(0.33,   info7.mapFairShare, 0.1);
-    assertEquals(0.33,   info7.reduceFairShare, 0.1);
-    assertEquals(0.0,    info8.mapFairShare);
-    assertEquals(0.0,    info8.reduceFairShare);
-    assertEquals(2.0,    info9.mapFairShare, 0.1);
-    assertEquals(2.0,    info9.reduceFairShare, 0.1);
-    assertEquals(0.0,    info10.mapFairShare);
-    assertEquals(0.0,    info10.reduceFairShare);
+    assertEquals(0.33,   info1.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info1.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.0,    info2.mapSchedulable.getFairShare());
+    assertEquals(0.0,    info2.reduceSchedulable.getFairShare());
+    assertEquals(0.33,   info3.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info3.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info4.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info4.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info5.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info5.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info6.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info6.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info7.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info7.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.0,    info8.mapSchedulable.getFairShare());
+    assertEquals(0.0,    info8.reduceSchedulable.getFairShare());
+    assertEquals(2.0,    info9.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(2.0,    info9.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.0,    info10.mapSchedulable.getFairShare());
+    assertEquals(0.0,    info10.reduceSchedulable.getFairShare());
   }
   
   public void testSizeBasedWeight() throws Exception {
     scheduler.sizeBasedWeight = true;
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 10);
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 20, 1);
-    assertTrue(scheduler.infos.get(job2).mapFairShare >
-               scheduler.infos.get(job1).mapFairShare);
-    assertTrue(scheduler.infos.get(job1).reduceFairShare >
-               scheduler.infos.get(job2).reduceFairShare);
-  }
-  
-  public void testWaitForMapsBeforeLaunchingReduces() {
-    // We have set waitForMapsBeforeLaunchingReduces to false by default in
-    // this class, so this should return true
-    assertTrue(scheduler.enoughMapsFinishedToRunReduces(0, 100));
-    
-    // However, if we set waitForMapsBeforeLaunchingReduces to true, we should
-    // now no longer be able to assign reduces until 5 have finished
-    scheduler.waitForMapsBeforeLaunchingReduces = true;
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 100));
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(1, 100));
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(2, 100));
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(3, 100));
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(4, 100));
-    assertTrue(scheduler.enoughMapsFinishedToRunReduces(5, 100));
-    assertTrue(scheduler.enoughMapsFinishedToRunReduces(6, 100));
-    
-    // Also test some jobs that have very few maps, in which case we will
-    // wait for at least 1 map to finish
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 5));
-    assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 5));
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 1));
-    assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 1));
+    assertTrue(scheduler.infos.get(job2).mapSchedulable.getFairShare() >
+               scheduler.infos.get(job1).mapSchedulable.getFairShare());
+    assertTrue(scheduler.infos.get(job1).reduceSchedulable.getFairShare() >
+               scheduler.infos.get(job2).reduceSchedulable.getFairShare());
   }
 
   /**
@@ -1350,25 +1568,25 @@
     JobInfo info3 = scheduler.infos.get(job3);
     advanceTime(10);
     
-    assertEquals(1.14,  info1.mapFairShare, 0.01);
-    assertEquals(1.14,  info1.reduceFairShare, 0.01);
-    assertEquals(2.28,  info2.mapFairShare, 0.01);
-    assertEquals(2.28,  info2.reduceFairShare, 0.01);
-    assertEquals(0.57,  info3.mapFairShare, 0.01);
-    assertEquals(0.57,  info3.reduceFairShare, 0.01);
+    assertEquals(1.14,  info1.mapSchedulable.getFairShare(), 0.01);
+    assertEquals(1.14,  info1.reduceSchedulable.getFairShare(), 0.01);
+    assertEquals(2.28,  info2.mapSchedulable.getFairShare(), 0.01);
+    assertEquals(2.28,  info2.reduceSchedulable.getFairShare(), 0.01);
+    assertEquals(0.57,  info3.mapSchedulable.getFairShare(), 0.01);
+    assertEquals(0.57,  info3.reduceSchedulable.getFairShare(), 0.01);
     
     JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
     JobInfo info4 = scheduler.infos.get(job4);
     advanceTime(10);
     
-    assertEquals(1.14,  info1.mapFairShare, 0.01);
-    assertEquals(1.14,  info1.reduceFairShare, 0.01);
-    assertEquals(2.28,  info2.mapFairShare, 0.01);
-    assertEquals(2.28,  info2.reduceFairShare, 0.01);
-    assertEquals(0.28,  info3.mapFairShare, 0.01);
-    assertEquals(0.28,  info3.reduceFairShare, 0.01);
-    assertEquals(0.28,  info4.mapFairShare, 0.01);
-    assertEquals(0.28,  info4.reduceFairShare, 0.01);
+    assertEquals(1.14,  info1.mapSchedulable.getFairShare(), 0.01);
+    assertEquals(1.14,  info1.reduceSchedulable.getFairShare(), 0.01);
+    assertEquals(2.28,  info2.mapSchedulable.getFairShare(), 0.01);
+    assertEquals(2.28,  info2.reduceSchedulable.getFairShare(), 0.01);
+    assertEquals(0.28,  info3.mapSchedulable.getFairShare(), 0.01);
+    assertEquals(0.28,  info3.reduceSchedulable.getFairShare(), 0.01);
+    assertEquals(0.28,  info4.mapSchedulable.getFairShare(), 0.01);
+    assertEquals(0.28,  info4.reduceSchedulable.getFairShare(), 0.01);
   }
 
   /**
@@ -1401,19 +1619,21 @@
     JobInfo info3 = scheduler.infos.get(job3);
     advanceTime(10);
     
+    /*
     assertEquals(0,     info1.mapWeight, 0.01);
     assertEquals(1.0,   info1.reduceWeight, 0.01);
     assertEquals(0,     info2.mapWeight, 0.01);
     assertEquals(1.0,   info2.reduceWeight, 0.01);
     assertEquals(1.0,   info3.mapWeight, 0.01);
     assertEquals(1.0,   info3.reduceWeight, 0.01);
+    */
     
-    assertEquals(0,     info1.mapFairShare, 0.01);
-    assertEquals(1.33,  info1.reduceFairShare, 0.01);
-    assertEquals(0,     info2.mapFairShare, 0.01);
-    assertEquals(1.33,  info2.reduceFairShare, 0.01);
-    assertEquals(4,     info3.mapFairShare, 0.01);
-    assertEquals(1.33,  info3.reduceFairShare, 0.01);
+    assertEquals(0,     info1.mapSchedulable.getFairShare(), 0.01);
+    assertEquals(1.33,  info1.reduceSchedulable.getFairShare(), 0.01);
+    assertEquals(0,     info2.mapSchedulable.getFairShare(), 0.01);
+    assertEquals(1.33,  info2.reduceSchedulable.getFairShare(), 0.01);
+    assertEquals(4,     info3.mapSchedulable.getFairShare(), 0.01);
+    assertEquals(1.33,  info3.reduceSchedulable.getFairShare(), 0.01);
   }
 
   /**
@@ -1438,7 +1658,7 @@
    * This test starts by launching a job in the default pool that takes
    * all the slots in the cluster. We then submit a job in a pool with
    * min share of 2 maps and 1 reduce task. After the min share preemption
-   * timeout, this job should be allowed to preempt tasks. 
+   * timeout, this pool should be allowed to preempt tasks. 
    */
   public void testMinSharePreemption() throws Exception {
     // Enable preemption in scheduler
@@ -1457,6 +1677,7 @@
     out.println("</allocations>");
     out.close();
     scheduler.getPoolManager().reloadAllocs();
+    Pool poolA = scheduler.getPoolManager().getPool("poolA");
 
     // Submit job 1 and assign all slots to it. Sleep a bit before assigning
     // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
@@ -1477,32 +1698,32 @@
     
     // Ten seconds later, check that job 2 is not able to preempt tasks.
     advanceTime(10000);
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+    assertEquals(0, scheduler.tasksToPreempt(poolA.getMapSchedulable(),
         clock.getTime()));
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+    assertEquals(0, scheduler.tasksToPreempt(poolA.getReduceSchedulable(),
         clock.getTime()));
     
     // Advance time by 49 more seconds, putting us at 59s after the
     // submission of job 2. It should still not be able to preempt.
     advanceTime(49000);
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+    assertEquals(0, scheduler.tasksToPreempt(poolA.getMapSchedulable(),
         clock.getTime()));
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+    assertEquals(0, scheduler.tasksToPreempt(poolA.getReduceSchedulable(),
         clock.getTime()));
     
     // Advance time by 2 seconds, putting us at 61s after the submission
     // of job 2. It should now be able to preempt 2 maps and 1 reduce.
     advanceTime(2000);
-    assertEquals(2, scheduler.tasksToPreempt(job2, TaskType.MAP,
+    assertEquals(2, scheduler.tasksToPreempt(poolA.getMapSchedulable(),
         clock.getTime()));
-    assertEquals(1, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+    assertEquals(1, scheduler.tasksToPreempt(poolA.getReduceSchedulable(),
         clock.getTime()));
-
+    
     // Test that the tasks actually get preempted and we can assign new ones
     scheduler.preemptTasksIfNecessary();
     scheduler.update();
-    assertEquals(2, scheduler.runningTasks(job1, TaskType.MAP));
-    assertEquals(3, scheduler.runningTasks(job1, TaskType.REDUCE));
+    assertEquals(2, job1.runningMaps());
+    assertEquals(3, job1.runningReduces());
     checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
@@ -1514,7 +1735,7 @@
    * This test starts by launching a job in the default pool that takes
    * all the slots in the cluster. We then submit a job in a pool with
    * min share of 3 maps and 3 reduce tasks, but which only actually
-   * needs 1 map and 2 reduces. We check that this job does not prempt
+   * needs 1 map and 2 reduces. We check that this pool does not prempt
    * more than this many tasks despite its min share being higher. 
    */
   public void testMinSharePreemptionWithSmallJob() throws Exception {
@@ -1534,6 +1755,7 @@
     out.println("</allocations>");
     out.close();
     scheduler.getPoolManager().reloadAllocs();
+    Pool poolA = scheduler.getPoolManager().getPool("poolA");
 
     // Submit job 1 and assign all slots to it. Sleep a bit before assigning
     // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
@@ -1554,24 +1776,24 @@
     
     // Advance time by 59 seconds and check that no preemption occurs.
     advanceTime(59000);
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+    assertEquals(0, scheduler.tasksToPreempt(poolA.getMapSchedulable(),
         clock.getTime()));
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+    assertEquals(0, scheduler.tasksToPreempt(poolA.getReduceSchedulable(),
         clock.getTime()));
     
     // Advance time by 2 seconds, putting us at 61s after the submission
     // of job 2. Job 2 should now preempt 1 map and 2 reduces.
     advanceTime(2000);
-    assertEquals(1, scheduler.tasksToPreempt(job2, TaskType.MAP,
+    assertEquals(1, scheduler.tasksToPreempt(poolA.getMapSchedulable(),
         clock.getTime()));
-    assertEquals(2, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+    assertEquals(2, scheduler.tasksToPreempt(poolA.getReduceSchedulable(),
         clock.getTime()));
 
     // Test that the tasks actually get preempted and we can assign new ones
     scheduler.preemptTasksIfNecessary();
     scheduler.update();
-    assertEquals(3, scheduler.runningTasks(job1, TaskType.MAP));
-    assertEquals(2, scheduler.runningTasks(job1, TaskType.REDUCE));
+    assertEquals(3, job1.runningMaps());
+    assertEquals(2, job1.runningReduces());
     checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
@@ -1580,21 +1802,21 @@
   }
 
   /**
-   * This test runs on a 4-node (8-slot) cluster to allow 3 jobs with fair
+   * This test runs on a 4-node (8-slot) cluster to allow 3 pools with fair
    * shares greater than 2 slots to coexist (which makes the half-fair-share 
-   * of each job more than 1 so that fair share preemption can kick in). 
+   * of each pool more than 1 so that fair share preemption can kick in). 
    * 
-   * The test first launches job 1, which takes 6 map slots and 6 reduce slots. 
-   * We then submit job 2, which takes 2 slots of each type. Finally, we submit 
-   * a third job, job 3, which gets no slots. At this point the fair share
-   * of each job will be 8/3 ~= 2.7 slots. Job 1 will be above its fair share,
-   * job 2 will be below it but at half fair share, and job 3 will
-   * be below half fair share. Therefore job 3 should be allowed to
-   * preempt a task (after a timeout) but jobs 1 and 2 shouldn't. 
+   * The test first starts job 1, which takes 6 map slots and 6 reduce slots,
+   * in pool 1.  We then submit job 2 in pool 2, which takes 2 slots of each
+   * type. Finally, we submit a third job, job 3 in pool3, which gets no slots. 
+   * At this point the fair share of each pool will be 8/3 ~= 2.7 slots. 
+   * Pool 1 will be above its fair share, pool 2 will be below it but at half
+   * fair share, and pool 3 will be below half fair share. Therefore pool 3 
+   * should preempt a task (after a timeout) but pools 1 and 2 shouldn't. 
    */
   public void testFairSharePreemption() throws Exception {
     // Create a bigger cluster than normal (4 tasktrackers instead of 2)
-    setUpCluster(4);
+    setUpCluster(1, 4, false);
     // Enable preemption in scheduler
     scheduler.preemptionEnabled = true;
     // Set up pools file with a fair share preemtion timeout of 1 minute
@@ -1605,14 +1827,18 @@
     out.println("</allocations>");
     out.close();
     scheduler.getPoolManager().reloadAllocs();
+    
+    // Grab pools (they'll be created even though they're not in the alloc file)
+    Pool pool1 = scheduler.getPoolManager().getPool("pool1");
+    Pool pool2 = scheduler.getPoolManager().getPool("pool2");
+    Pool pool3 = scheduler.getPoolManager().getPool("pool3");
 
-    // Submit jobs 1 and 2. We advance time by 100 between each task tracker
+    // Submit job 1. We advance time by 100 between each task tracker
     // assignment stage to ensure that the tasks from job1 on tt3 are the ones
     // that are deterministically preempted first (being the latest launched
     // tasks in an over-allocated job).
-    JobInProgress job1 = submitJob(JobStatus.RUNNING, 6, 6);
-    advanceTime(100); // Makes job 1 deterministically launch before job 2
-    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 6, 6, "pool1");
+    advanceTime(100);
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
@@ -1628,42 +1854,47 @@
     checkAssignment("tt3", "attempt_test_0001_r_000004_0 on tt3");
     checkAssignment("tt3", "attempt_test_0001_r_000005_0 on tt3");
     advanceTime(100);
+    
+    // Submit job 2. It should get the last 2 slots.
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "pool2");
+    advanceTime(100);
     checkAssignment("tt4", "attempt_test_0002_m_000000_0 on tt4");
     checkAssignment("tt4", "attempt_test_0002_m_000001_0 on tt4");
     checkAssignment("tt4", "attempt_test_0002_r_000000_0 on tt4");
     checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
     
     // Submit job 3.
-    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "pool3");
     
-    // Check that after 59 seconds, neither job can preempt
+    // Check that after 59 seconds, neither pool can preempt
     advanceTime(59000);
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+    assertEquals(0, scheduler.tasksToPreempt(pool2.getMapSchedulable(),
         clock.getTime()));
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+    assertEquals(0, scheduler.tasksToPreempt(pool2.getReduceSchedulable(),
         clock.getTime()));
-    assertEquals(0, scheduler.tasksToPreempt(job3, TaskType.MAP,
+    assertEquals(0, scheduler.tasksToPreempt(pool3.getMapSchedulable(),
         clock.getTime()));
-    assertEquals(0, scheduler.tasksToPreempt(job3, TaskType.REDUCE,
+    assertEquals(0, scheduler.tasksToPreempt(pool3.getReduceSchedulable(),
         clock.getTime()));
     
     // Wait 2 more seconds, so that job 3 has now been in the system for 61s.
-    // Now job 3 should be able to preempt 1 task but job 2 shouldn't.
+    // Now pool 3 should be able to preempt 2 tasks (its share of 2.7 rounded
+    // down to its floor), but pool 2 shouldn't.
     advanceTime(2000);
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+    assertEquals(0, scheduler.tasksToPreempt(pool2.getMapSchedulable(),
         clock.getTime()));
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+    assertEquals(0, scheduler.tasksToPreempt(pool2.getReduceSchedulable(),
         clock.getTime()));
-    assertEquals(2, scheduler.tasksToPreempt(job3, TaskType.MAP,
+    assertEquals(2, scheduler.tasksToPreempt(pool3.getMapSchedulable(),
         clock.getTime()));
-    assertEquals(2, scheduler.tasksToPreempt(job3, TaskType.REDUCE,
+    assertEquals(2, scheduler.tasksToPreempt(pool3.getReduceSchedulable(),
         clock.getTime()));
     
     // Test that the tasks actually get preempted and we can assign new ones
     scheduler.preemptTasksIfNecessary();
     scheduler.update();
-    assertEquals(4, scheduler.runningTasks(job1, TaskType.MAP));
-    assertEquals(4, scheduler.runningTasks(job1, TaskType.REDUCE));
+    assertEquals(4, job1.runningMaps());
+    assertEquals(4, job1.runningReduces());
     checkAssignment("tt3", "attempt_test_0003_m_000000_0 on tt3");
     checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
     checkAssignment("tt3", "attempt_test_0003_r_000000_0 on tt3");
@@ -1675,9 +1906,9 @@
   }
   
   /**
-   * This test submits a job that takes all 4 slots, and then a second
-   * job that has both a min share of 2 slots with a 60s timeout and a
-   * fair share timeout of 60s. After 60 seconds, this job will be starved
+   * This test submits a job that takes all 4 slots, and then a second job in
+   * a pool that has both a min share of 2 slots with a 60s timeout and a
+   * fair share timeout of 60s. After 60 seconds, this pool will be starved
    * of both min share (2 slots of each type) and fair share (2 slots of each
    * type), and we test that it does not kill more than 2 tasks of each type
    * in total.
@@ -1700,6 +1931,7 @@
     out.println("</allocations>");
     out.close();
     scheduler.getPoolManager().reloadAllocs();
+    Pool poolA = scheduler.getPoolManager().getPool("poolA");
 
     // Submit job 1 and assign all slots to it. Sleep a bit before assigning
     // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
@@ -1720,32 +1952,32 @@
     
     // Ten seconds later, check that job 2 is not able to preempt tasks.
     advanceTime(10000);
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+    assertEquals(0, scheduler.tasksToPreempt(poolA.getMapSchedulable(),
         clock.getTime()));
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+    assertEquals(0, scheduler.tasksToPreempt(poolA.getReduceSchedulable(),
         clock.getTime()));
     
     // Advance time by 49 more seconds, putting us at 59s after the
     // submission of job 2. It should still not be able to preempt.
     advanceTime(49000);
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+    assertEquals(0, scheduler.tasksToPreempt(poolA.getMapSchedulable(),
         clock.getTime()));
-    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+    assertEquals(0, scheduler.tasksToPreempt(poolA.getReduceSchedulable(),
         clock.getTime()));
     
     // Advance time by 2 seconds, putting us at 61s after the submission
     // of job 2. It should now be able to preempt 2 maps and 1 reduce.
     advanceTime(2000);
-    assertEquals(2, scheduler.tasksToPreempt(job2, TaskType.MAP,
+    assertEquals(2, scheduler.tasksToPreempt(poolA.getMapSchedulable(),
         clock.getTime()));
-    assertEquals(2, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+    assertEquals(2, scheduler.tasksToPreempt(poolA.getReduceSchedulable(),
         clock.getTime()));
 
     // Test that the tasks actually get preempted and we can assign new ones
     scheduler.preemptTasksIfNecessary();
     scheduler.update();
-    assertEquals(2, scheduler.runningTasks(job1, TaskType.MAP));
-    assertEquals(2, scheduler.runningTasks(job1, TaskType.REDUCE));
+    assertEquals(2, job1.runningMaps());
+    assertEquals(2, job1.runningReduces());
     checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
@@ -1797,8 +2029,8 @@
     advanceTime(61000);
     scheduler.preemptTasksIfNecessary();
     scheduler.update();
-    assertEquals(4, scheduler.runningTasks(job1, TaskType.MAP));
-    assertEquals(4, scheduler.runningTasks(job1, TaskType.REDUCE));
+    assertEquals(4, job1.runningMaps());
+    assertEquals(4, job1.runningReduces());
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
   }
@@ -1851,11 +2083,391 @@
     advanceTime(61000);
     scheduler.preemptTasksIfNecessary();
     scheduler.update();
-    assertEquals(4, scheduler.runningTasks(job1, TaskType.MAP));
-    assertEquals(4, scheduler.runningTasks(job1, TaskType.REDUCE));
+    assertEquals(4, job1.runningMaps());
+    assertEquals(4, job1.runningReduces());
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
   }
+
+  /**
+   * This test exercises delay scheduling at the node level. We submit a job
+   * with data on rack1.node2 and check that it doesn't get assigned on earlier
+   * nodes. A second job with no locality info should get assigned instead.
+   * 
+   * TaskTracker names in this test map to nodes as follows:
+   * - tt1 = rack1.node1
+   * - tt2 = rack1.node2
+   * - tt3 = rack2.node1
+   * - tt4 = rack2.node2
+   */
+  public void testDelaySchedulingAtNodeLevel() throws IOException {
+    setUpCluster(2, 2, true);
+    scheduler.assignMultiple = true;
+    
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 0, "pool1",
+        new String[][] {
+          {"rack2.node2"}
+        });
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Advance time before submitting another job j2, to make j1 be ahead
+    // of j2 in the queue deterministically.
+    advanceTime(100);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 0);
+    
+    // Assign tasks on nodes 1-3 and check that j2 gets them
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1", 
+                           "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2",
+                           "attempt_test_0002_m_000003_0 on tt2");
+    checkAssignment("tt3", "attempt_test_0002_m_000004_0 on tt3",
+                           "attempt_test_0002_m_000005_0 on tt3");
+    
+    // Assign a task on node 4 now and check that j1 gets it. The other slot
+    // on the node should be given to j2 because j1 will be out of tasks.
+    checkAssignment("tt4", "attempt_test_0001_m_000000_0 on tt4",
+                           "attempt_test_0002_m_000006_0 on tt4");
+    
+    // Check that delay scheduling info is properly set
+    assertEquals(info1.lastMapLocalityLevel, LocalityLevel.NODE);
+    assertEquals(info1.timeWaitedForLocalMap, 0);
+    assertEquals(info1.skippedAtLastHeartbeat, false);
+  }
+  
+  /**
+   * This test submits a job and causes it to exceed its node-level delay,
+   * and thus to go on to launch a rack-local task. We submit one job with data
+   * on rack2.node4 and check that it does not get assigned on any of the other
+   * nodes until 10 seconds (the delay configured in setUpCluster) pass.
+   * Finally, after some delay, we let the job assign local tasks and check
+   * that it has returned to waiting for node locality.

[... 351 lines stripped ...]


Mime
View raw message