hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1169585 [4/5] - in /hadoop/common/branches/branch-0.20-security: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/ src/co...
Date Sun, 11 Sep 2011 23:57:38 GMT
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Sun Sep 11 23:57:37 2011
@@ -25,17 +25,33 @@ import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapred.MRConstants;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.split.JobSplit;
-import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.spi.NoEmitMetricsContext;
+import org.apache.hadoop.metrics.spi.OutputRecord;
+import org.apache.hadoop.net.Node;
+import org.mortbay.log.Log;
 
 public class TestFairScheduler extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -44,64 +60,292 @@ public class TestFairScheduler extends T
       "test-pools").getAbsolutePath();
   
   private static final String POOL_PROPERTY = "pool";
+  private static final String EXPLICIT_POOL_PROPERTY = "mapred.fairscheduler.pool";
   
   private static int jobCounter;
-  private static int taskCounter;
   
-  static class FakeJobInProgress extends JobInProgress {
+  class FakeJobInProgress extends JobInProgress {
     
     private FakeTaskTrackerManager taskTrackerManager;
+    private int mapCounter = 0;
+    private int reduceCounter = 0;
+    private final String[][] mapInputLocations; // Array of hosts for each map
+    private boolean initialized;
     
     public FakeJobInProgress(JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, 
-        JobTracker jt) throws IOException {
+        String[][] mapInputLocations, JobTracker jt) throws IOException {
       super(new JobID("test", ++jobCounter), jobConf, jt);
       this.taskTrackerManager = taskTrackerManager;
+      this.mapInputLocations = mapInputLocations;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus();
       this.status.setRunState(JobStatus.PREP);
+      this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+      this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+      this.nonRunningReduces = new LinkedHashSet<TaskInProgress>();   
+      this.runningReduces = new LinkedHashSet<TaskInProgress>();
+      this.initialized = false;
     }
     
     @Override
     public synchronized void initTasks() throws IOException {
-      // do nothing
+      // initTasks is needed to create non-empty cleanup and setup TIP
+      // arrays, otherwise calls such as job.getTaskInProgress will fail
+      JobID jobId = getJobID();
+      JobConf conf = getJobConf();
+      String jobFile = "";
+      // create two cleanup tips, one map and one reduce.
+      cleanup = new TaskInProgress[2];
+      // cleanup map tip.
+      cleanup[0] = new TaskInProgress(jobId, jobFile, null, 
+              jobtracker, conf, this, numMapTasks, 1);
+      cleanup[0].setJobCleanupTask();
+      // cleanup reduce tip.
+      cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
+                         numReduceTasks, jobtracker, conf, this, 1);
+      cleanup[1].setJobCleanupTask();
+      // create two setup tips, one map and one reduce.
+      setup = new TaskInProgress[2];
+      // setup map tip.
+      setup[0] = new TaskInProgress(jobId, jobFile, null, 
+              jobtracker, conf, this, numMapTasks + 1, 1);
+      setup[0].setJobSetupTask();
+      // setup reduce tip.
+      setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
+                         numReduceTasks + 1, jobtracker, conf, this, 1);
+      setup[1].setJobSetupTask();
+      // create maps
+      numMapTasks = conf.getNumMapTasks();
+      maps = new TaskInProgress[numMapTasks];
+      // empty format
+      JobSplit.TaskSplitMetaInfo split = JobSplit.EMPTY_TASK_SPLIT;
+      for (int i = 0; i < numMapTasks; i++) {
+        String[] inputLocations = null;
+        if (mapInputLocations != null)
+          inputLocations = mapInputLocations[i];
+        maps[i] = new FakeTaskInProgress(getJobID(), i,
+            getJobConf(), this, inputLocations, split, jobtracker);
+        if (mapInputLocations == null) // Job has no locality info
+          nonLocalMaps.add(maps[i]);
+      }
+      // create reduces
+      numReduceTasks = conf.getNumReduceTasks();
+      reduces = new TaskInProgress[numReduceTasks];
+      for (int i = 0; i < numReduceTasks; i++) {
+        reduces[i] = new FakeTaskInProgress(getJobID(), i,
+            getJobConf(), this, jobtracker);
+      }
+      
+      initialized = true;
     }
-
+    
+    @Override
+    public boolean inited() {
+      return initialized;
+    }
+    
+    @Override
+    public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
+        int numUniqueHosts) throws IOException {
+      return obtainNewMapTask(tts, clusterSize, numUniqueHosts, Integer.MAX_VALUE);
+    }
+    
     @Override
+    public Task obtainNewNodeLocalMapTask(final TaskTrackerStatus tts, int clusterSize,
+        int numUniqueHosts) throws IOException {
+      return obtainNewMapTask(tts, clusterSize, numUniqueHosts, 1);
+    }
+    
+    @Override
+    public Task obtainNewNodeOrRackLocalMapTask(final TaskTrackerStatus tts, int clusterSize,
+        int numUniqueHosts) throws IOException {
+      return obtainNewMapTask(tts, clusterSize, numUniqueHosts, 2);
+    }
+
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
-        int ignored) throws IOException {
-      TaskAttemptID attemptId = getTaskAttemptID(true);
-      Task task = new MapTask("", attemptId, 0, new JobSplit.TaskSplitIndex(),
-          1) {
-        @Override
-        public String toString() {
-          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+        int numUniqueHosts, int localityLevel) throws IOException {
+      for (int map = 0; map < maps.length; map++) {
+        FakeTaskInProgress tip = (FakeTaskInProgress) maps[map];
+        if (!tip.isRunning() && !tip.isComplete() &&
+            getLocalityLevel(tip, tts) < localityLevel) {
+          TaskAttemptID attemptId = getTaskAttemptID(tip);
+          JobSplit.TaskSplitMetaInfo split = JobSplit.EMPTY_TASK_SPLIT;
+          Task task = new MapTask("", attemptId, 0, split.getSplitIndex(), 1) {
+            @Override
+            public String toString() {
+              return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+            }
+          };
+          runningMapTasks++;
+          tip.createTaskAttempt(task, tts.getTrackerName());
+          nonLocalRunningMaps.add(tip);
+          taskTrackerManager.startTask(tts.getTrackerName(), task, tip);
+          return task;
         }
-      };
-      taskTrackerManager.startTask(tts.getTrackerName(), task);
-      runningMapTasks++;
-      return task;
+      }
+      return null;
     }
     
     @Override
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
-      TaskAttemptID attemptId = getTaskAttemptID(false);
-      Task task = new ReduceTask("", attemptId, 0, 10, 1) {
-        @Override
-        public String toString() {
-          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+      for (int reduce = 0; reduce < reduces.length; reduce++) {
+        FakeTaskInProgress tip = 
+          (FakeTaskInProgress) reduces[reduce];
+        if (!tip.isRunning() && !tip.isComplete()) {
+          TaskAttemptID attemptId = getTaskAttemptID(tip);
+          Task task = new ReduceTask("", attemptId, 0, maps.length, 1) {
+            @Override
+            public String toString() {
+              return String.format("%s on %s", getTaskID(), tts.getTrackerName());
+            }
+          };
+          runningReduceTasks++;
+          tip.createTaskAttempt(task, tts.getTrackerName());
+          runningReduces.add(tip);
+          taskTrackerManager.startTask(tts.getTrackerName(), task, tip);
+          return task;
         }
-      };
-      taskTrackerManager.startTask(tts.getTrackerName(), task);
-      runningReduceTasks++;
-      return task;
+      }
+      return null;
+    }
+    
+    public void mapTaskFinished(TaskInProgress tip) {
+      runningMapTasks--;
+      finishedMapTasks++;
+      nonLocalRunningMaps.remove(tip);
+    }
+    
+    public void reduceTaskFinished(TaskInProgress tip) {
+      runningReduceTasks--;
+      finishedReduceTasks++;
+      runningReduces.remove(tip);
     }
     
-    private TaskAttemptID getTaskAttemptID(boolean isMap) {
+    private TaskAttemptID getTaskAttemptID(TaskInProgress tip) {
       JobID jobId = getJobID();
       return new TaskAttemptID(jobId.getJtIdentifier(),
-          jobId.getId(), isMap, ++taskCounter, 0);
+          jobId.getId(), tip.isMapTask(), tip.getIdWithinJob(), tip.nextTaskId++);
+    }
+    
+    @Override
+    int getLocalityLevel(TaskInProgress tip, TaskTrackerStatus tts) {
+      FakeTaskInProgress ftip = (FakeTaskInProgress) tip;
+      if (ftip.inputLocations != null) {
+        // Check whether we're on the same host as an input split
+        for (String location: ftip.inputLocations) {
+          if (location.equals(tts.host)) {
+            return 0;
+          }
+        }
+        // Check whether we're on the same rack as an input split
+        for (String location: ftip.inputLocations) {
+          if (getRack(location).equals(getRack(tts.host))) {
+            return 1;
+          }
+        }
+        // Not on same rack or host
+        return 2;
+      } else {
+        // Job has no locality info  
+        return -1;
+      }
+    }
+  }
+  
+  class FakeTaskInProgress extends TaskInProgress {
+    private boolean isMap;
+    private FakeJobInProgress fakeJob;
+    private TreeMap<TaskAttemptID, String> activeTasks;
+    private TaskStatus taskStatus;
+    private boolean isComplete = false;
+    private String[] inputLocations;
+    
+    // Constructor for map
+    FakeTaskInProgress(JobID jId, int id, JobConf jobConf,
+        FakeJobInProgress job, String[] inputLocations, 
+        JobSplit.TaskSplitMetaInfo split, JobTracker jt) {
+      super(jId, "", split, jt, jobConf, job, id, 1);
+      this.isMap = true;
+      this.fakeJob = job;
+      this.inputLocations = inputLocations;
+      activeTasks = new TreeMap<TaskAttemptID, String>();
+      taskStatus = TaskStatus.createTaskStatus(isMap);
+      taskStatus.setRunState(TaskStatus.State.UNASSIGNED);
+    }
+
+    // Constructor for reduce
+    FakeTaskInProgress(JobID jId, int id, JobConf jobConf,
+        FakeJobInProgress job, JobTracker jt) {
+      super(jId, "", jobConf.getNumMapTasks(), id, jt, jobConf, job, 1);
+      this.isMap = false;
+      this.fakeJob = job;
+      activeTasks = new TreeMap<TaskAttemptID, String>();
+      taskStatus = TaskStatus.createTaskStatus(isMap);
+      taskStatus.setRunState(TaskStatus.State.UNASSIGNED);
+    }
+    
+    private void createTaskAttempt(Task task, String taskTracker) {
+      activeTasks.put(task.getTaskID(), taskTracker);
+      taskStatus = TaskStatus.createTaskStatus(isMap, task.getTaskID(),
+          0.5f, 1, TaskStatus.State.RUNNING, "", "", "", 
+          TaskStatus.Phase.STARTING, new Counters());
+      taskStatus.setStartTime(clock.getTime());
+    }
+    
+    @Override
+    TreeMap<TaskAttemptID, String> getActiveTasks() {
+      return activeTasks;
+    }
+    
+    public synchronized boolean isComplete() {
+      return isComplete;
+    }
+    
+    public boolean isRunning() {
+      return activeTasks.size() > 0;
+    }
+    
+    @Override
+    public TaskStatus getTaskStatus(TaskAttemptID taskid) {
+      return taskStatus;
+    }
+    
+    void killAttempt() {
+      if (isMap) {
+        fakeJob.mapTaskFinished(this);
+      }
+      else {
+        fakeJob.reduceTaskFinished(this);
+      }
+      activeTasks.clear();
+      taskStatus.setRunState(TaskStatus.State.UNASSIGNED);
+    }
+    
+    void finishAttempt() {
+      isComplete = true;
+      if (isMap) {
+        fakeJob.mapTaskFinished(this);
+      }
+      else {
+        fakeJob.reduceTaskFinished(this);
+      }
+      activeTasks.clear();
+      taskStatus.setRunState(TaskStatus.State.UNASSIGNED);
+    }
+  }
+  
+  static class FakeQueueManager extends QueueManager {
+    private Set<String> queues = null;
+    FakeQueueManager() {
+      super(new Configuration());
+    }
+    void setQueues(Set<String> queues) {
+      this.queues = queues;
+    }
+    public synchronized Set<String> getLeafQueueNames() {
+      return queues;
     }
   }
   
@@ -110,35 +354,42 @@ public class TestFairScheduler extends T
     int reduces = 0;
     int maxMapTasksPerTracker = 2;
     int maxReduceTasksPerTracker = 2;
+    long ttExpiryInterval = 10 * 60 * 1000L; // default interval
     List<JobInProgressListener> listeners =
       new ArrayList<JobInProgressListener>();
+    Map<JobID, JobInProgress> jobs = new HashMap<JobID, JobInProgress>();
     
     private Map<String, TaskTracker> trackers =
       new HashMap<String, TaskTracker>();
-    private Map<String, TaskStatus> taskStatuses = 
+    private Map<String, TaskStatus> statuses = 
       new HashMap<String, TaskStatus>();
-
-    public FakeTaskTrackerManager() {
-      TaskTracker tt1 = new TaskTracker("tt1");
-      tt1.setStatus(new TaskTrackerStatus("tt1", "tt1.host", 1,
-                                          new ArrayList<TaskStatus>(), 0,
-                                          maxMapTasksPerTracker, 
-                                          maxReduceTasksPerTracker));
-      trackers.put("tt1", tt1);
-      
-      TaskTracker tt2 = new TaskTracker("tt2");
-      tt2.setStatus(new TaskTrackerStatus("tt2", "tt2.host", 2,
-                                          new ArrayList<TaskStatus>(), 0,
-                                          maxMapTasksPerTracker, 
-                                          maxReduceTasksPerTracker));
-      trackers.put("tt2", tt2);
-
+    private Map<String, FakeTaskInProgress> tips = 
+      new HashMap<String, FakeTaskInProgress>();
+    private Map<String, TaskTrackerStatus> trackerForTip =
+      new HashMap<String, TaskTrackerStatus>();
+    
+    public FakeTaskTrackerManager(int numRacks, int numTrackersPerRack) {
+      int nextTrackerId = 1;
+      for (int rack = 1; rack <= numRacks; rack++) {
+        for (int node = 1; node <= numTrackersPerRack; node++) {
+          int id = nextTrackerId++;
+          String host = "rack" + rack + ".node" + node;
+          System.out.println("Creating TaskTracker tt" + id + " on " + host);
+          TaskTracker tt = new TaskTracker("tt" + id);
+          tt.setStatus(new TaskTrackerStatus("tt" + id, host, 0,
+              new ArrayList<TaskStatus>(), 0,
+              maxMapTasksPerTracker, maxReduceTasksPerTracker));
+          trackers.put("tt" + id, tt);
+        }
+      }
     }
     
     @Override
     public ClusterStatus getClusterStatus() {
       int numTrackers = trackers.size();
-      return new ClusterStatus(numTrackers, maps, reduces,
+
+      return new ClusterStatus(numTrackers, 0, 0,
+          ttExpiryInterval, maps, reduces,
           numTrackers * maxMapTasksPerTracker,
           numTrackers * maxReduceTasksPerTracker,
           JobTracker.State.RUNNING);
@@ -151,7 +402,7 @@ public class TestFairScheduler extends T
     
     @Override
     public int getNumberOfUniqueHosts() {
-      return 0;
+      return trackers.size();
     }
 
     @Override
@@ -186,11 +437,15 @@ public class TestFairScheduler extends T
 
     @Override
     public JobInProgress getJob(JobID jobid) {
-      return null;
+      return jobs.get(jobid);
     }
 
     public void initJob (JobInProgress job) {
-      // do nothing
+      try {
+        job.initTasks();
+      } catch (KillInterruptedException e) {
+      } catch (IOException e) {
+      }
     }
     
     public void failJob (JobInProgress job) {
@@ -200,6 +455,7 @@ public class TestFairScheduler extends T
     // Test methods
     
     public void submitJob(JobInProgress job) throws IOException {
+      jobs.put(job.getJobID(), job);
       for (JobInProgressListener listener : listeners) {
         listener.jobAdded(job);
       }
@@ -209,31 +465,48 @@ public class TestFairScheduler extends T
       return trackers.get(trackerID);
     }
     
-    public void startTask(String taskTrackerName, final Task t) {
-      if (t.isMapTask()) {
+    public void startTask(String trackerName, Task t, FakeTaskInProgress tip) {
+      final boolean isMap = t.isMapTask();
+      if (isMap) {
         maps++;
       } else {
         reduces++;
       }
-      TaskStatus status = new TaskStatus() {
-        @Override
-        public boolean getIsMap() {
-          return t.isMapTask();
-        }
-      };
-      taskStatuses.put(t.getTaskID().toString(), status);
+      String attemptId = t.getTaskID().toString();
+      TaskStatus status = tip.getTaskStatus(t.getTaskID());
+      TaskTrackerStatus trackerStatus = trackers.get(trackerName).getStatus();
+      tips.put(attemptId, tip);
+      statuses.put(attemptId, status);
+      trackerForTip.put(attemptId, trackerStatus);
       status.setRunState(TaskStatus.State.RUNNING);
-      trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
+      trackerStatus.getTaskReports().add(status);
     }
     
-    public void finishTask(String taskTrackerName, String tipId) {
-      TaskStatus status = taskStatuses.get(tipId);
-      if (status.getIsMap()) {
+    public void finishTask(String taskTrackerName, String attemptId) {
+      FakeTaskInProgress tip = tips.get(attemptId);
+      if (tip.isMapTask()) {
         maps--;
       } else {
         reduces--;
       }
-      status.setRunState(TaskStatus.State.SUCCEEDED);
+      tip.finishAttempt();
+      TaskStatus status = statuses.get(attemptId);
+      trackers.get(taskTrackerName).getStatus().getTaskReports().remove(status);
+    }
+
+    @Override
+    public boolean killTask(TaskAttemptID attemptId, boolean shouldFail) {
+      String attemptIdStr = attemptId.toString();
+      FakeTaskInProgress tip = tips.get(attemptIdStr);
+      if (tip.isMapTask()) {
+        maps--;
+      } else {
+        reduces--;
+      }
+      tip.killAttempt();
+      TaskStatus status = statuses.get(attemptIdStr);
+      trackerForTip.get(attemptIdStr).getTaskReports().remove(status);
+      return true;
     }
   }
   
@@ -241,29 +514,70 @@ public class TestFairScheduler extends T
   protected FairScheduler scheduler;
   private FakeTaskTrackerManager taskTrackerManager;
   private FakeClock clock;
+  private JobTracker jobTracker;
 
   @Override
   protected void setUp() throws Exception {
     jobCounter = 0;
-    taskCounter = 0;
     new File(TEST_DIR).mkdirs(); // Make sure data directory exists
     // Create an empty pools file (so we can add/remove pools later)
     FileWriter fileWriter = new FileWriter(ALLOC_FILE);
     fileWriter.write("<?xml version=\"1.0\"?>\n");
     fileWriter.write("<allocations />\n");
     fileWriter.close();
+    setUpCluster(1, 2, false);
+  }
+
+  public String getRack(String hostname) {
+    // Host names are of the form rackN.nodeM, so split at the dot.
+    return hostname.split("\\.")[0];
+  }
+
+  private void setUpCluster(int numRacks, int numNodesPerRack,
+      boolean assignMultiple) throws IOException {
+    
+    resetMetrics();
+    
     conf = new JobConf();
     conf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE);
     conf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY);
-    taskTrackerManager = new FakeTaskTrackerManager();
+    conf.setBoolean("mapred.fairscheduler.assignmultiple", assignMultiple);
+    // Manually set locality delay because we aren't using a JobTracker so
+    // we can't auto-compute it from the heartbeat interval.
+    conf.setLong("mapred.fairscheduler.locality.delay.node", 5000);
+    conf.setLong("mapred.fairscheduler.locality.delay.rack", 10000);
+    conf.set("mapred.job.tracker", "localhost:0");
+    conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+    taskTrackerManager = new FakeTaskTrackerManager(numRacks, numNodesPerRack);
     clock = new FakeClock();
-    scheduler = new FairScheduler(clock, false);
+    try {
+      jobTracker = new JobTracker(conf, clock);
+    } catch (Exception e) {
+      throw new RuntimeException("Could not start JT", e);
+    }
+    scheduler = new FairScheduler(clock, true);
     scheduler.waitForMapsBeforeLaunchingReduces = false;
     scheduler.setConf(conf);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     scheduler.start();
+    // TaskStatus complains if a task's start time is 0, so advance it a bit
+    advanceTime(100);
   }
   
+  /**
+   * Set up a metrics context that doesn't emit anywhere but stores the data
+   * so we can verify it. Also clears it of any data so that different test
+   * cases don't pollute each other.
+   */
+  private void resetMetrics() throws IOException {
+    ContextFactory factory = ContextFactory.getFactory();
+    factory.setAttribute("fairscheduler.class",
+        NoEmitMetricsContext.class.getName());
+    
+    MetricsUtil.getContext("fairscheduler").createRecord("jobs").remove();
+    MetricsUtil.getContext("fairscheduler").createRecord("pools").remove();
+  }
+
   @Override
   protected void tearDown() throws Exception {
     if (scheduler != null) {
@@ -271,20 +585,33 @@ public class TestFairScheduler extends T
     }
   }
   
+  private JobInProgress submitJobNotInitialized(int state, int maps, int reduces)
+	    throws IOException {
+    return submitJob(state, maps, reduces, null, null, false);
+  }
+
   private JobInProgress submitJob(int state, int maps, int reduces)
       throws IOException {
-    return submitJob(state, maps, reduces, null);
+    return submitJob(state, maps, reduces, null, null, true);
   }
   
   private JobInProgress submitJob(int state, int maps, int reduces, String pool)
       throws IOException {
+    return submitJob(state, maps, reduces, pool, null, true);
+  }
+  
+  private JobInProgress submitJob(int state, int maps, int reduces, String pool,
+      String[][] mapInputLocations, boolean initializeJob) throws IOException {
     JobConf jobConf = new JobConf(conf);
     jobConf.setNumMapTasks(maps);
     jobConf.setNumReduceTasks(reduces);
     if (pool != null)
       jobConf.set(POOL_PROPERTY, pool);
     JobInProgress job = new FakeJobInProgress(jobConf, taskTrackerManager,
-        UtilsForTests.getJobTracker());
+        mapInputLocations, jobTracker);
+    if (initializeJob) {
+      taskTrackerManager.initJob(job);
+    }
     job.getStatus().setRunState(state);
     taskTrackerManager.submitJob(job);
     job.startTime = clock.time;
@@ -320,19 +647,30 @@ public class TestFairScheduler extends T
     out.println("<pool name=\"poolD\">");
     out.println("<maxRunningJobs>3</maxRunningJobs>");
     out.println("</pool>");
+    // Give pool E a preemption timeout of one minute
+    out.println("<pool name=\"poolE\">");
+    out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
+    out.println("</pool>");
+    // Set default limit of jobs per pool to 15
+    out.println("<poolMaxJobsDefault>15</poolMaxJobsDefault>");
     // Set default limit of jobs per user to 5
     out.println("<userMaxJobsDefault>5</userMaxJobsDefault>");
     // Give user1 a limit of 10 jobs
     out.println("<user name=\"user1\">");
     out.println("<maxRunningJobs>10</maxRunningJobs>");
     out.println("</user>");
+    // Set default min share preemption timeout to 2 minutes
+    out.println("<defaultMinSharePreemptionTimeout>120" 
+        + "</defaultMinSharePreemptionTimeout>"); 
+    // Set fair share preemption timeout to 5 minutes
+    out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>"); 
     out.println("</allocations>"); 
     out.close();
     
     PoolManager poolManager = scheduler.getPoolManager();
     poolManager.reloadAllocs();
     
-    assertEquals(5, poolManager.getPools().size()); // 4 in file + default pool
+    assertEquals(6, poolManager.getPools().size()); // 5 in file + default pool
     assertEquals(0, poolManager.getAllocation(Pool.DEFAULT_POOL_NAME,
         TaskType.MAP));
     assertEquals(0, poolManager.getAllocation(Pool.DEFAULT_POOL_NAME,
@@ -345,10 +683,25 @@ public class TestFairScheduler extends T
     assertEquals(0, poolManager.getAllocation("poolC", TaskType.REDUCE));
     assertEquals(0, poolManager.getAllocation("poolD", TaskType.MAP));
     assertEquals(0, poolManager.getAllocation("poolD", TaskType.REDUCE));
-    assertEquals(Integer.MAX_VALUE, poolManager.getPoolMaxJobs("poolA"));
+    assertEquals(0, poolManager.getAllocation("poolE", TaskType.MAP));
+    assertEquals(0, poolManager.getAllocation("poolE", TaskType.REDUCE));
+    assertEquals(15, poolManager.getPoolMaxJobs(Pool.DEFAULT_POOL_NAME));
+    assertEquals(15, poolManager.getPoolMaxJobs("poolA"));
+    assertEquals(15, poolManager.getPoolMaxJobs("poolB"));
+    assertEquals(15, poolManager.getPoolMaxJobs("poolC"));
     assertEquals(3, poolManager.getPoolMaxJobs("poolD"));
+    assertEquals(15, poolManager.getPoolMaxJobs("poolE"));
     assertEquals(10, poolManager.getUserMaxJobs("user1"));
     assertEquals(5, poolManager.getUserMaxJobs("user2"));
+    assertEquals(120000, poolManager.getMinSharePreemptionTimeout(
+        Pool.DEFAULT_POOL_NAME));
+    assertEquals(120000, poolManager.getMinSharePreemptionTimeout("poolA"));
+    assertEquals(120000, poolManager.getMinSharePreemptionTimeout("poolB"));
+    assertEquals(120000, poolManager.getMinSharePreemptionTimeout("poolC"));
+    assertEquals(120000, poolManager.getMinSharePreemptionTimeout("poolD"));
+    assertEquals(120000, poolManager.getMinSharePreemptionTimeout("poolA"));
+    assertEquals(60000, poolManager.getMinSharePreemptionTimeout("poolE"));
+    assertEquals(300000, poolManager.getFairSharePreemptionTimeout());
   }
   
   public void testTaskNotAssignedWhenNoJobsArePresent() throws IOException {
@@ -356,7 +709,6 @@ public class TestFairScheduler extends T
   }
 
   public void testNonRunningJobsAreIgnored() throws IOException {
-    submitJobs(1, JobStatus.PREP, 10, 10);
     submitJobs(1, JobStatus.SUCCEEDED, 10, 10);
     submitJobs(1, JobStatus.FAILED, 10, 10);
     submitJobs(1, JobStatus.KILLED, 10, 10);
@@ -375,85 +727,137 @@ public class TestFairScheduler extends T
     JobInfo info1 = scheduler.infos.get(job1);
     
     // Check scheduler variables
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(2,    info1.neededMaps);
-    assertEquals(1,    info1.neededReduces);
-    assertEquals(0,    info1.mapDeficit);
-    assertEquals(0,    info1.reduceDeficit);
-    assertEquals(4.0,  info1.mapFairShare);
-    assertEquals(4.0,  info1.reduceFairShare);
-    
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(2,    info1.mapSchedulable.getDemand());
+    assertEquals(1,    info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
+    verifyMetrics();
+
     // Advance time before submitting another job j2, to make j1 run before j2
     // deterministically.
     advanceTime(100);
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
     JobInfo info2 = scheduler.infos.get(job2);
     
-    // Check scheduler variables; the fair shares should now have been allocated
-    // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(2,    info1.neededMaps);
-    assertEquals(1,    info1.neededReduces);
-    assertEquals(400,  info1.mapDeficit);
-    assertEquals(400,  info1.reduceDeficit);
-    assertEquals(2.0,  info1.mapFairShare);
-    assertEquals(2.0,  info1.reduceFairShare);
-    assertEquals(0,    info2.runningMaps);
-    assertEquals(0,    info2.runningReduces);
-    assertEquals(1,    info2.neededMaps);
-    assertEquals(2,    info2.neededReduces);
-    assertEquals(0,    info2.mapDeficit);
-    assertEquals(0,    info2.reduceDeficit);
-    assertEquals(2.0,  info2.mapFairShare);
-    assertEquals(2.0,  info2.reduceFairShare);
+    // Check scheduler variables
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(2,    info1.mapSchedulable.getDemand());
+    assertEquals(1,    info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(0,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(1,    info2.mapSchedulable.getDemand());
+    assertEquals(2,    info2.reduceSchedulable.getDemand());
+    assertEquals(1.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    verifyMetrics();
+    
+    // Assign tasks and check that jobs alternate in filling slots
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    assertNull(scheduler.assignTasks(tracker("tt2")));
     
-    // Assign tasks and check that all slots are filled with j1, then j2
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000006_0 on tt2");
+    // Check that the scheduler has started counting the tasks as running
+    // as soon as it launched them.
+    assertEquals(2,  info1.mapSchedulable.getRunningTasks());
+    assertEquals(1,  info1.reduceSchedulable.getRunningTasks());
+    assertEquals(2,  info1.mapSchedulable.getDemand());
+    assertEquals(1,  info1.reduceSchedulable.getDemand());
+    assertEquals(1,  info2.mapSchedulable.getRunningTasks());
+    assertEquals(2,  info2.reduceSchedulable.getRunningTasks());
+    assertEquals(1, info2.mapSchedulable.getDemand());
+    assertEquals(2, info2.reduceSchedulable.getDemand());
+    verifyMetrics();
+  }
+  /**
+   * This test is identical to testSmallJobs but sets assignMultiple to
+   * true so that multiple tasks can be assigned per heartbeat.
+   */
+  public void testSmallJobsWithAssignMultiple() throws IOException {
+    setUpCluster(1, 2, true);
+    
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 1);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(2,    info1.mapSchedulable.getDemand());
+    assertEquals(1,    info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
+    verifyMetrics();
+    
+    // Advance time before submitting another job j2, to make j1 run before j2
+    // deterministically.
+    advanceTime(100);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2);
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(2,    info1.mapSchedulable.getDemand());
+    assertEquals(1,    info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(0,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(1,    info2.mapSchedulable.getDemand());
+    assertEquals(2,    info2.reduceSchedulable.getDemand());
+    assertEquals(1.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    verifyMetrics();
+    
+    // Assign tasks and check that jobs alternate in filling slots
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1",
+                           "attempt_test_0001_r_000000_0 on tt1",
+                           "attempt_test_0002_m_000000_0 on tt1",
+                           "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2",
+                           "attempt_test_0002_r_000001_0 on tt2");
     assertNull(scheduler.assignTasks(tracker("tt2")));
     
     // Check that the scheduler has started counting the tasks as running
     // as soon as it launched them.
-    assertEquals(2,  info1.runningMaps);
-    assertEquals(1,  info1.runningReduces);
-    assertEquals(0,  info1.neededMaps);
-    assertEquals(0,  info1.neededReduces);
-    assertEquals(1,  info2.runningMaps);
-    assertEquals(2,  info2.runningReduces);
-    assertEquals(0, info2.neededMaps);
-    assertEquals(0, info2.neededReduces);
+    assertEquals(2,  info1.mapSchedulable.getRunningTasks());
+    assertEquals(1,  info1.reduceSchedulable.getRunningTasks());
+    assertEquals(2,  info1.mapSchedulable.getDemand());
+    assertEquals(1,  info1.reduceSchedulable.getDemand());
+    assertEquals(1,  info2.mapSchedulable.getRunningTasks());
+    assertEquals(2,  info2.reduceSchedulable.getRunningTasks());
+    assertEquals(1, info2.mapSchedulable.getDemand());
+    assertEquals(2, info2.reduceSchedulable.getDemand());
+    verifyMetrics();
   }
   
   /**
    * This test begins by submitting two jobs with 10 maps and reduces each.
-   * The first job is submitted 100ms after the second, during which time no
-   * tasks run. After this, we assign tasks to all slots, which should all be
-   * from job 1. These run for 200ms, at which point job 2 now has a deficit
-   * of 400 while job 1 is down to a deficit of 0. We then finish all tasks and
-   * assign new ones, which should all be from job 2. These run for 50 ms,
-   * which is not enough time for job 2 to make up its deficit (it only makes up
-   * 100 ms of deficit). Finally we assign a new round of tasks, which should
-   * all be from job 2 again.
+   * The first job is submitted 100ms after the second, to make it get slots
+   * first deterministically. We then assign a wave of tasks and check that
+   * they are given alternately to job1, job2, job1, job2, etc. We finish
+   * these tasks and assign a second wave, which should continue to be
+   * allocated in this manner.
    */
   public void testLargeJobs() throws IOException {
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     JobInfo info1 = scheduler.infos.get(job1);
     
     // Check scheduler variables
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(10,   info1.neededMaps);
-    assertEquals(10,   info1.neededReduces);
-    assertEquals(0,    info1.mapDeficit);
-    assertEquals(0,    info1.reduceDeficit);
-    assertEquals(4.0,  info1.mapFairShare);
-    assertEquals(4.0,  info1.reduceFairShare);
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(4.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(4.0,  info1.reduceSchedulable.getFairShare());
     
     // Advance time before submitting another job j2, to make j1 run before j2
     // deterministically.
@@ -461,116 +865,207 @@ public class TestFairScheduler extends T
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
     JobInfo info2 = scheduler.infos.get(job2);
     
-    // Check scheduler variables; the fair shares should now have been allocated
-    // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(10,   info1.neededMaps);
-    assertEquals(10,   info1.neededReduces);
-    assertEquals(400,  info1.mapDeficit);
-    assertEquals(400,  info1.reduceDeficit);
-    assertEquals(2.0,  info1.mapFairShare);
-    assertEquals(2.0,  info1.reduceFairShare);
-    assertEquals(0,    info2.runningMaps);
-    assertEquals(0,    info2.runningReduces);
-    assertEquals(10,   info2.neededMaps);
-    assertEquals(10,   info2.neededReduces);
-    assertEquals(0,    info2.mapDeficit);
-    assertEquals(0,    info2.reduceDeficit);
-    assertEquals(2.0,  info2.mapFairShare);
-    assertEquals(2.0,  info2.reduceFairShare);
+    // Check scheduler variables
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(0,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info2.mapSchedulable.getDemand());
+    assertEquals(10,   info2.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    
+    // Check that tasks are filled alternately by the jobs
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
     
-    // Assign tasks and check that all slots are initially filled with job 1
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+    // Check that no new tasks can be launched once the tasktrackers are full
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
     
     // Check that the scheduler has started counting the tasks as running
     // as soon as it launched them.
-    assertEquals(4,  info1.runningMaps);
-    assertEquals(4,  info1.runningReduces);
-    assertEquals(6,  info1.neededMaps);
-    assertEquals(6,  info1.neededReduces);
-    assertEquals(0,  info2.runningMaps);
-    assertEquals(0,  info2.runningReduces);
-    assertEquals(10, info2.neededMaps);
-    assertEquals(10, info2.neededReduces);
+    assertEquals(2,  info1.mapSchedulable.getRunningTasks());
+    assertEquals(2,  info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,  info1.mapSchedulable.getDemand());
+    assertEquals(10,  info1.reduceSchedulable.getDemand());
+    assertEquals(2,  info2.mapSchedulable.getRunningTasks());
+    assertEquals(2,  info2.reduceSchedulable.getRunningTasks());
+    assertEquals(10, info2.mapSchedulable.getDemand());
+    assertEquals(10, info2.reduceSchedulable.getDemand());
     
     // Finish up the tasks and advance time again. Note that we must finish
     // the task since FakeJobInProgress does not properly maintain running
     // tasks, so the scheduler will always get an empty task list from
     // the JobInProgress's getTasks(TaskType.MAP)/getTasks(TaskType.REDUCE) and 
     // think they finished.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000003_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000004_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000005_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000006_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000007_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000008_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000000_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000000_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000000_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000000_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0");
     advanceTime(200);
-    assertEquals(0,   info1.runningMaps);
-    assertEquals(0,   info1.runningReduces);
-    assertEquals(0,   info1.mapDeficit);
-    assertEquals(0,   info1.reduceDeficit);
-    assertEquals(0,   info2.runningMaps);
-    assertEquals(0,   info2.runningReduces);
-    assertEquals(400, info2.mapDeficit);
-    assertEquals(400, info2.reduceDeficit);
-
-    // Assign tasks and check that all slots are now filled with job 2
-    checkAssignment("tt1", "attempt_test_0002_m_000009_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000010_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000011_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000012_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000013_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000014_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000015_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000016_0 on tt2");
-
-    // Finish up the tasks and advance time again, but give job 2 only 50ms.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000009_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000010_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000011_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000012_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000013_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000014_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000015_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000016_0");
-    advanceTime(50);
-    assertEquals(0,   info1.runningMaps);
-    assertEquals(0,   info1.runningReduces);
-    assertEquals(100, info1.mapDeficit);
-    assertEquals(100, info1.reduceDeficit);
-    assertEquals(0,   info2.runningMaps);
-    assertEquals(0,   info2.runningReduces);
-    assertEquals(300, info2.mapDeficit);
-    assertEquals(300, info2.reduceDeficit);
-
-    // Assign tasks and check that all slots are now still with job 2
-    checkAssignment("tt1", "attempt_test_0002_m_000017_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000018_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000019_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000020_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000021_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000022_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000023_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000024_0 on tt2");
+    assertEquals(0,   info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,   info1.reduceSchedulable.getRunningTasks());
+    assertEquals(0,   info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,   info2.reduceSchedulable.getRunningTasks());
+
+    // Check that tasks are filled alternately by the jobs
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
+    
+    // Check scheduler variables; the demands should now be 8 because 2 tasks
+    // of each type have finished in each job
+    assertEquals(2,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(2,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(8,   info1.mapSchedulable.getDemand());
+    assertEquals(8,   info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(2,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(2,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(8,   info2.mapSchedulable.getDemand());
+    assertEquals(8,   info2.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
   }
   
+  /**
+   * A copy of testLargeJobs that enables the assignMultiple feature to launch
+   * multiple tasks per heartbeat. Results should be the same as testLargeJobs.
+   */
+  public void testLargeJobsWithAssignMultiple() throws IOException {
+    setUpCluster(1, 2, true);
+    
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(4.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(4.0,  info1.reduceSchedulable.getFairShare());
+    
+    // Advance time before submitting another job j2, to make j1 run before j2
+    // deterministically.
+    advanceTime(100);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check scheduler variables; the fair shares should now have been allocated
+    // equally between j1 and j2, but j1 should have (4 slots)*(100 ms) deficit
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(0,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info2.mapSchedulable.getDemand());
+    assertEquals(10,   info2.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    
+    // Check that tasks are filled alternately by the jobs
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1",
+                           "attempt_test_0001_r_000000_0 on tt1",
+                           "attempt_test_0002_m_000000_0 on tt1",
+                           "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2",
+                           "attempt_test_0001_r_000001_0 on tt2",
+                           "attempt_test_0002_m_000001_0 on tt2",
+                           "attempt_test_0002_r_000001_0 on tt2");
+    
+    // Check that no new tasks can be launched once the tasktrackers are full
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+    
+    // Check that the scheduler has started counting the tasks as running
+    // as soon as it launched them.
+    assertEquals(2,  info1.mapSchedulable.getRunningTasks());
+    assertEquals(2,  info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,  info1.mapSchedulable.getDemand());
+    assertEquals(10,  info1.reduceSchedulable.getDemand());
+    assertEquals(2,  info2.mapSchedulable.getRunningTasks());
+    assertEquals(2,  info2.reduceSchedulable.getRunningTasks());
+    assertEquals(10, info2.mapSchedulable.getDemand());
+    assertEquals(10, info2.reduceSchedulable.getDemand());
+    
+    // Finish up the tasks and advance time again. Note that we must finish
+    // the task since FakeJobInProgress does not properly maintain running
+    // tasks, so the scheduler will always get an empty task list from
+    // the JobInProgress's getTasks(TaskType.MAP)/getTasks(TaskType.REDUCE) and
+    // think they finished.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000000_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000000_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000000_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000000_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0");
+    advanceTime(200);
+    assertEquals(0,   info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,   info1.reduceSchedulable.getRunningTasks());
+    assertEquals(0,   info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,   info2.reduceSchedulable.getRunningTasks());
+
+    // Check that tasks are filled alternately by the jobs
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1",
+                           "attempt_test_0001_r_000002_0 on tt1",
+                           "attempt_test_0002_m_000002_0 on tt1",
+                           "attempt_test_0002_r_000002_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2",
+                           "attempt_test_0001_r_000003_0 on tt2",
+                           "attempt_test_0002_m_000003_0 on tt2",
+                           "attempt_test_0002_r_000003_0 on tt2");
+    
+    // Check scheduler variables; the demands should now be 8 because 2 tasks
+    // of each type have finished in each job
+    assertEquals(2,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(2,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(8,   info1.mapSchedulable.getDemand());
+    assertEquals(8,   info1.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(2,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(2,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(8,   info2.mapSchedulable.getDemand());
+    assertEquals(8,   info2.reduceSchedulable.getDemand());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+  }
 
   /**
-   * We submit two jobs such that one has 2x the priority of the other, wait
-   * for 100 ms, and check that the weights/deficits are okay and that the
-   * tasks all go to the high-priority job.
+   * We submit two jobs such that one has 2x the priority of the other to 
+   * a cluster of 3 nodes, wait for 100 ms, and check that the weights/shares 
+   * the high-priority job gets 4 tasks while the normal-priority job gets 2.
    */
   public void testJobsWithPriorities() throws IOException {
+    setUpCluster(1, 3, false);
+    
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     JobInfo info1 = scheduler.infos.get(job1);
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
@@ -579,56 +1074,52 @@ public class TestFairScheduler extends T
     scheduler.update();
     
     // Check scheduler variables
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(10,   info1.neededMaps);
-    assertEquals(10,   info1.neededReduces);
-    assertEquals(0,    info1.mapDeficit);
-    assertEquals(0,    info1.reduceDeficit);
-    assertEquals(1.33, info1.mapFairShare, 0.1);
-    assertEquals(1.33, info1.reduceFairShare, 0.1);
-    assertEquals(0,    info2.runningMaps);
-    assertEquals(0,    info2.runningReduces);
-    assertEquals(10,   info2.neededMaps);
-    assertEquals(10,   info2.neededReduces);
-    assertEquals(0,    info2.mapDeficit);
-    assertEquals(0,    info2.reduceDeficit);
-    assertEquals(2.66, info2.mapFairShare, 0.1);
-    assertEquals(2.66, info2.reduceFairShare, 0.1);
-    
-    // Advance time and check deficits
-    advanceTime(100);
-    assertEquals(133,  info1.mapDeficit, 1.0);
-    assertEquals(133,  info1.reduceDeficit, 1.0);
-    assertEquals(266,  info2.mapDeficit, 1.0);
-    assertEquals(266,  info2.reduceDeficit, 1.0);
+    assertEquals(0,   info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,   info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,  info1.mapSchedulable.getDemand());
+    assertEquals(10,  info1.reduceSchedulable.getDemand());
+    assertEquals(2.0, info1.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(2.0, info1.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0,   info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,   info2.reduceSchedulable.getRunningTasks());
+    assertEquals(10,  info2.mapSchedulable.getDemand());
+    assertEquals(10,  info2.reduceSchedulable.getDemand());
+    assertEquals(4.0, info2.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(4.0, info2.reduceSchedulable.getFairShare(), 0.1);
     
-    // Assign tasks and check that all slots are filled with j1, then j2
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
+    // Advance time
+    advanceTime(100);
+    
+    // Assign tasks and check that j2 gets 2x more tasks than j1. In addition,
+    // whenever the jobs' runningTasks/weight ratios are tied, j1 should get
+    // the new task first because it started first; thus the tasks of each
+    // type should be handed out alternately to 1, 2, 2, 1, 2, 2, etc.
+    System.out.println("HEREEEE");
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0002_r_000002_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0002_r_000003_0 on tt3");
   }
   
   /**
    * This test starts by submitting three large jobs:
    * - job1 in the default pool, at time 0
    * - job2 in poolA, with an allocation of 1 map / 2 reduces, at time 200
-   * - job3 in poolB, with an allocation of 2 maps / 1 reduce, at time 200
+   * - job3 in poolB, with an allocation of 2 maps / 1 reduce, at time 300
    * 
-   * After this, we sleep 100ms, until time 300. At this point, job1 has the
-   * highest map deficit, job3 the second, and job2 the third. This is because
-   * job3 has more maps in its min share than job2, but job1 has been around
-   * a long time at the beginning. The reduce deficits are similar, except job2
-   * comes before job3 because it had a higher reduce minimum share.
-   * 
-   * Finally, assign tasks to all slots. The maps should be assigned in the
-   * order job3, job2, job1 because 3 and 2 both have guaranteed slots and 3
-   * has a higher deficit. The reduces should be assigned as job2, job3, job1.
+   * We then assign tasks to all slots. The maps should be assigned in the
+   * order job2, job3, job 3, job1 because jobs 3 and 2 have guaranteed slots
+   * (1 and 2 respectively). Job2 comes before job3 when they are both at 0
+   * slots because it has an earlier start time. In a similar manner,
+   * reduces should be assigned as job2, job3, job2, job1.
    */
   public void testLargeJobsWithPools() throws Exception {
     // Set up pools file
@@ -648,61 +1139,55 @@ public class TestFairScheduler extends T
     out.println("</allocations>");
     out.close();
     scheduler.getPoolManager().reloadAllocs();
+    Pool defaultPool = scheduler.getPoolManager().getPool("default");
+    Pool poolA = scheduler.getPoolManager().getPool("poolA");
+    Pool poolB = scheduler.getPoolManager().getPool("poolB");
     
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     JobInfo info1 = scheduler.infos.get(job1);
     
     // Check scheduler variables
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(10,   info1.neededMaps);
-    assertEquals(10,   info1.neededReduces);
-    assertEquals(0,    info1.mapDeficit);
-    assertEquals(0,    info1.reduceDeficit);
-    assertEquals(4.0,  info1.mapFairShare);
-    assertEquals(4.0,  info1.reduceFairShare);
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(4.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(4.0,  info1.reduceSchedulable.getFairShare());
     
     // Advance time 200ms and submit jobs 2 and 3
     advanceTime(200);
-    assertEquals(800,  info1.mapDeficit);
-    assertEquals(800,  info1.reduceDeficit);
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
     JobInfo info2 = scheduler.infos.get(job2);
+    advanceTime(100);
     JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
     JobInfo info3 = scheduler.infos.get(job3);
     
     // Check that minimum and fair shares have been allocated
-    assertEquals(0,    info1.minMaps);
-    assertEquals(0,    info1.minReduces);
-    assertEquals(1.0,  info1.mapFairShare);
-    assertEquals(1.0,  info1.reduceFairShare);
-    assertEquals(1,    info2.minMaps);
-    assertEquals(2,    info2.minReduces);
-    assertEquals(1.0,  info2.mapFairShare);
-    assertEquals(2.0,  info2.reduceFairShare);
-    assertEquals(2,    info3.minMaps);
-    assertEquals(1,    info3.minReduces);
-    assertEquals(2.0,  info3.mapFairShare);
-    assertEquals(1.0,  info3.reduceFairShare);
-    
-    // Advance time 100ms and check deficits
-    advanceTime(100);
-    assertEquals(900,  info1.mapDeficit);
-    assertEquals(900,  info1.reduceDeficit);
-    assertEquals(100,  info2.mapDeficit);
-    assertEquals(200,  info2.reduceDeficit);
-    assertEquals(200,  info3.mapDeficit);
-    assertEquals(100,  info3.reduceDeficit);
+    assertEquals(0,    defaultPool.getMapSchedulable().getMinShare());
+    assertEquals(0,    defaultPool.getReduceSchedulable().getMinShare());
+    assertEquals(1.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(1,    poolA.getMapSchedulable().getMinShare());
+    assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
+    assertEquals(1.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    assertEquals(2,    poolB.getMapSchedulable().getMinShare());
+    assertEquals(1,    poolB.getReduceSchedulable().getMinShare());
+    assertEquals(2.0,  info3.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info3.reduceSchedulable.getFairShare());
+    
+    // Advance time 100ms
+    advanceTime(100);
     
     // Assign tasks and check that slots are first given to needy jobs
-    checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2");
   }
 
   /**
@@ -711,8 +1196,7 @@ public class TestFairScheduler extends T
    * - job2 in poolA, with an allocation of 2 maps / 2 reduces, at time 200
    * - job3 in poolA, with an allocation of 2 maps / 2 reduces, at time 300
    * 
-   * After this, we sleep 100ms, until time 400. At this point, job1 has the
-   * highest deficit, job2 the second, and job3 the third. The first two tasks
+   * After this, we start assigning tasks. The first two tasks of each type
    * should be assigned to job2 and job3 since they are in a pool with an
    * allocation guarantee, but the next two slots should be assigned to job 3
    * because the pool will no longer be needy.
@@ -730,79 +1214,136 @@ public class TestFairScheduler extends T
     out.println("</allocations>");
     out.close();
     scheduler.getPoolManager().reloadAllocs();
+    Pool poolA = scheduler.getPoolManager().getPool("poolA");
+    
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInfo info1 = scheduler.infos.get(job1);
+    
+    // Check scheduler variables
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(4.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(4.0,  info1.reduceSchedulable.getFairShare());
+    
+    // Advance time 200ms and submit job 2
+    advanceTime(200);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    JobInfo info2 = scheduler.infos.get(job2);
+    
+    // Check that minimum and fair shares have been allocated
+    assertEquals(2,    poolA.getMapSchedulable().getMinShare());
+    assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    
+    // Advance time 100ms and submit job 3
+    advanceTime(100);
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    JobInfo info3 = scheduler.infos.get(job3);
+    
+    // Check that minimum and fair shares have been allocated
+    assertEquals(2,    poolA.getMapSchedulable().getMinShare());
+    assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(1.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info2.reduceSchedulable.getFairShare());
+    assertEquals(1.0,  info3.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info3.reduceSchedulable.getFairShare());
+    
+    // Advance time
+    advanceTime(100);
+    
+    // Assign tasks and check that slots are first given to needy jobs, but
+    // that job 1 gets two tasks after due to having a larger share.
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+  }
+  
+  /**
+   * A copy of testLargeJobsWithExcessCapacity that enables assigning multiple
+   * tasks per heartbeat. Results should match testLargeJobsWithExcessCapacity.
+   */
+  public void testLargeJobsWithExcessCapacityAndAssignMultiple() 
+      throws Exception {
+    setUpCluster(1, 2, true);
+    
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give pool A a minimum of 2 maps, 2 reduces
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("<minReduces>2</minReduces>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    Pool poolA = scheduler.getPoolManager().getPool("poolA");
     
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
     JobInfo info1 = scheduler.infos.get(job1);
     
     // Check scheduler variables
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(10,   info1.neededMaps);
-    assertEquals(10,   info1.neededReduces);
-    assertEquals(0,    info1.mapDeficit);
-    assertEquals(0,    info1.reduceDeficit);
-    assertEquals(4.0,  info1.mapFairShare);
-    assertEquals(4.0,  info1.reduceFairShare);
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(4.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(4.0,  info1.reduceSchedulable.getFairShare());
     
     // Advance time 200ms and submit job 2
     advanceTime(200);
-    assertEquals(800,  info1.mapDeficit);
-    assertEquals(800,  info1.reduceDeficit);
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
     JobInfo info2 = scheduler.infos.get(job2);
     
     // Check that minimum and fair shares have been allocated
-    assertEquals(0,    info1.minMaps);
-    assertEquals(0,    info1.minReduces);
-    assertEquals(2.0,  info1.mapFairShare);
-    assertEquals(2.0,  info1.reduceFairShare);
-    assertEquals(2,    info2.minMaps);
-    assertEquals(2,    info2.minReduces);
-    assertEquals(2.0,  info2.mapFairShare);
-    assertEquals(2.0,  info2.reduceFairShare);
+    assertEquals(2,    poolA.getMapSchedulable().getMinShare());
+    assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
     
     // Advance time 100ms and submit job 3
     advanceTime(100);
-    assertEquals(1000, info1.mapDeficit);
-    assertEquals(1000, info1.reduceDeficit);
-    assertEquals(200,  info2.mapDeficit);
-    assertEquals(200,  info2.reduceDeficit);
     JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
     JobInfo info3 = scheduler.infos.get(job3);
     
     // Check that minimum and fair shares have been allocated
-    assertEquals(0,    info1.minMaps);
-    assertEquals(0,    info1.minReduces);
-    assertEquals(2,    info1.mapFairShare, 0.1);
-    assertEquals(2,    info1.reduceFairShare, 0.1);
-    assertEquals(1,    info2.minMaps);
-    assertEquals(1,    info2.minReduces);
-    assertEquals(1,    info2.mapFairShare, 0.1);
-    assertEquals(1,    info2.reduceFairShare, 0.1);
-    assertEquals(1,    info3.minMaps);
-    assertEquals(1,    info3.minReduces);
-    assertEquals(1,    info3.mapFairShare, 0.1);
-    assertEquals(1,    info3.reduceFairShare, 0.1);
-    
-    // Advance time 100ms and check deficits
-    advanceTime(100);
-    assertEquals(1200, info1.mapDeficit, 1.0);
-    assertEquals(1200, info1.reduceDeficit, 1.0);
-    assertEquals(300,  info2.mapDeficit, 1.0);
-    assertEquals(300,  info2.reduceDeficit, 1.0);
-    assertEquals(100,  info3.mapDeficit, 1.0);
-    assertEquals(100,  info3.reduceDeficit, 1.0);
+    assertEquals(2,    poolA.getMapSchedulable().getMinShare());
+    assertEquals(2,    poolA.getReduceSchedulable().getMinShare());
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(1.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info2.reduceSchedulable.getFairShare());
+    assertEquals(1.0,  info3.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info3.reduceSchedulable.getFairShare());
+    
+    // Advance time
+    advanceTime(100);
     
     // Assign tasks and check that slots are first given to needy jobs, but
-    // that job 1 gets two tasks after due to having a larger deficit.
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_r_000004_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+    // that job 1 gets two tasks after due to having a larger share.
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1",
+                           "attempt_test_0002_r_000000_0 on tt1",
+                           "attempt_test_0003_m_000000_0 on tt1",
+                           "attempt_test_0003_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2",
+                           "attempt_test_0001_r_000000_0 on tt2",
+                           "attempt_test_0001_m_000001_0 on tt2",
+                           "attempt_test_0001_r_000001_0 on tt2");
   }
   
   /**
@@ -812,10 +1353,6 @@ public class TestFairScheduler extends T
    *   maps and 4 reduces
    * 
    * When we assign the slots, job2 should only get 1 of each type of task.
-   * 
-   * The fair share for job 2 should be 2.0 however, because even though it is
-   * running only one task, it accumulates deficit in case it will have failures
-   * or need speculative tasks later. (TODO: This may not be a good policy.)
    */
   public void testSmallJobInLargePool() throws Exception {
     // Set up pools file
@@ -837,32 +1374,28 @@ public class TestFairScheduler extends T
     JobInfo info2 = scheduler.infos.get(job2);
     
     // Check scheduler variables
-    assertEquals(0,    info1.runningMaps);
-    assertEquals(0,    info1.runningReduces);
-    assertEquals(10,   info1.neededMaps);
-    assertEquals(10,   info1.neededReduces);
-    assertEquals(0,    info1.mapDeficit);
-    assertEquals(0,    info1.reduceDeficit);
-    assertEquals(2.0,  info1.mapFairShare);
-    assertEquals(2.0,  info1.reduceFairShare);
-    assertEquals(0,    info2.runningMaps);
-    assertEquals(0,    info2.runningReduces);
-    assertEquals(1,    info2.neededMaps);
-    assertEquals(1,    info2.neededReduces);
-    assertEquals(0,    info2.mapDeficit);
-    assertEquals(0,    info2.reduceDeficit);
-    assertEquals(2.0,  info2.mapFairShare);
-    assertEquals(2.0,  info2.reduceFairShare);
+    assertEquals(0,    info1.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info1.reduceSchedulable.getRunningTasks());
+    assertEquals(10,   info1.mapSchedulable.getDemand());
+    assertEquals(10,   info1.reduceSchedulable.getDemand());
+    assertEquals(3.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(3.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(0,    info2.mapSchedulable.getRunningTasks());
+    assertEquals(0,    info2.reduceSchedulable.getRunningTasks());
+    assertEquals(1,    info2.mapSchedulable.getDemand());
+    assertEquals(1,    info2.reduceSchedulable.getDemand());
+    assertEquals(1.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(1.0,  info2.reduceSchedulable.getFairShare());
     
     // Assign tasks and check that slots are first given to needy jobs
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
   }
   
   /**
@@ -884,38 +1417,48 @@ public class TestFairScheduler extends T
     
     // Submit jobs, advancing time in-between to make sure that they are
     // all submitted at distinct times.
-    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job1 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
+    assertTrue(((FakeJobInProgress)job1).inited());
+    job1.getStatus().setRunState(JobStatus.RUNNING);
     JobInfo info1 = scheduler.infos.get(job1);
     advanceTime(10);
-    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job2 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
+    assertTrue(((FakeJobInProgress)job2).inited());
+    job2.getStatus().setRunState(JobStatus.RUNNING);
     JobInfo info2 = scheduler.infos.get(job2);
     advanceTime(10);
-    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job3 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
     JobInfo info3 = scheduler.infos.get(job3);
     advanceTime(10);
-    JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10);
+    JobInProgress job4 = submitJobNotInitialized(JobStatus.PREP, 10, 10);
     JobInfo info4 = scheduler.infos.get(job4);
     
-    // Check scheduler variables
-    assertEquals(2.0,  info1.mapFairShare);
-    assertEquals(2.0,  info1.reduceFairShare);
-    assertEquals(2.0,  info2.mapFairShare);
-    assertEquals(2.0,  info2.reduceFairShare);
-    assertEquals(0.0,  info3.mapFairShare);
-    assertEquals(0.0,  info3.reduceFairShare);
-    assertEquals(0.0,  info4.mapFairShare);
-    assertEquals(0.0,  info4.reduceFairShare);
+    // Only two of the jobs should be initialized.
+    assertTrue(((FakeJobInProgress)job1).inited());
+    assertTrue(((FakeJobInProgress)job2).inited());
+    assertFalse(((FakeJobInProgress)job3).inited());
+    assertFalse(((FakeJobInProgress)job4).inited());
     
-    // Assign tasks and check that slots are first to jobs 1 and 2
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    // Check scheduler variables
+    assertEquals(2.0,  info1.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info1.reduceSchedulable.getFairShare());
+    assertEquals(2.0,  info2.mapSchedulable.getFairShare());
+    assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    assertEquals(0.0,  info3.mapSchedulable.getFairShare());
+    assertEquals(0.0,  info3.reduceSchedulable.getFairShare());
+    assertEquals(0.0,  info4.mapSchedulable.getFairShare());
+    assertEquals(0.0,  info4.reduceSchedulable.getFairShare());
+    
+    // Assign tasks and check that only jobs 1 and 2 get them
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
     advanceTime(100);
-    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
   }
 
   /**
@@ -954,25 +1497,25 @@ public class TestFairScheduler extends T
     JobInfo info4 = scheduler.infos.get(job4);
     
     // Check scheduler variables
-    assertEquals(1.33,  info1.mapFairShare, 0.1);
-    assertEquals(1.33,  info1.reduceFairShare, 0.1);
-    assertEquals(0.0,   info2.mapFairShare);
-    assertEquals(0.0,   info2.reduceFairShare);
-    assertEquals(1.33,  info3.mapFairShare, 0.1);
-    assertEquals(1.33,  info3.reduceFairShare, 0.1);
-    assertEquals(1.33,  info4.mapFairShare, 0.1);
-    assertEquals(1.33,  info4.reduceFairShare, 0.1);
-    
-    // Assign tasks and check that slots are first to jobs 1 and 3
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    assertEquals(1.33,  info1.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(1.33,  info1.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.0,   info2.mapSchedulable.getFairShare());
+    assertEquals(0.0,   info2.reduceSchedulable.getFairShare());
+    assertEquals(1.33,  info3.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(1.33,  info3.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(1.33,  info4.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(1.33,  info4.reduceSchedulable.getFairShare(), 0.1);
+    
+    // Assign tasks and check that slots are given only to jobs 1, 3 and 4
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
     advanceTime(100);
-    checkAssignment("tt2", "attempt_test_0003_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_r_000008_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0004_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0004_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
   }
   
   /**
@@ -1055,62 +1598,37 @@ public class TestFairScheduler extends T
     // the other half. This works out to 2 slots each for the jobs
     // in poolA and 1/3 each for the jobs in the default pool because
     // there are 2 runnable jobs in poolA and 6 jobs in the default pool.
-    assertEquals(0.33,   info1.mapFairShare, 0.1);
-    assertEquals(0.33,   info1.reduceFairShare, 0.1);
-    assertEquals(0.0,    info2.mapFairShare);
-    assertEquals(0.0,    info2.reduceFairShare);
-    assertEquals(0.33,   info3.mapFairShare, 0.1);
-    assertEquals(0.33,   info3.reduceFairShare, 0.1);
-    assertEquals(0.33,   info4.mapFairShare, 0.1);
-    assertEquals(0.33,   info4.reduceFairShare, 0.1);
-    assertEquals(0.33,   info5.mapFairShare, 0.1);
-    assertEquals(0.33,   info5.reduceFairShare, 0.1);
-    assertEquals(0.33,   info6.mapFairShare, 0.1);
-    assertEquals(0.33,   info6.reduceFairShare, 0.1);
-    assertEquals(0.33,   info7.mapFairShare, 0.1);
-    assertEquals(0.33,   info7.reduceFairShare, 0.1);
-    assertEquals(0.0,    info8.mapFairShare);
-    assertEquals(0.0,    info8.reduceFairShare);
-    assertEquals(2.0,    info9.mapFairShare, 0.1);
-    assertEquals(2.0,    info9.reduceFairShare, 0.1);
-    assertEquals(0.0,    info10.mapFairShare);
-    assertEquals(0.0,    info10.reduceFairShare);
+    assertEquals(0.33,   info1.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info1.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.0,    info2.mapSchedulable.getFairShare());
+    assertEquals(0.0,    info2.reduceSchedulable.getFairShare());
+    assertEquals(0.33,   info3.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info3.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info4.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info4.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info5.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info5.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info6.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info6.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info7.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(0.33,   info7.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.0,    info8.mapSchedulable.getFairShare());
+    assertEquals(0.0,    info8.reduceSchedulable.getFairShare());
+    assertEquals(2.0,    info9.mapSchedulable.getFairShare(), 0.1);
+    assertEquals(2.0,    info9.reduceSchedulable.getFairShare(), 0.1);
+    assertEquals(0.0,    info10.mapSchedulable.getFairShare());
+    assertEquals(0.0,    info10.reduceSchedulable.getFairShare());
   }
   
   public void testSizeBasedWeight() throws Exception {
     scheduler.sizeBasedWeight = true;
     JobInProgress job1 = submitJob(JobStatus.RUNNING, 2, 10);
     JobInProgress job2 = submitJob(JobStatus.RUNNING, 20, 1);
-    assertTrue(scheduler.infos.get(job2).mapFairShare >
-               scheduler.infos.get(job1).mapFairShare);
-    assertTrue(scheduler.infos.get(job1).reduceFairShare >
-               scheduler.infos.get(job2).reduceFairShare);
-  }
-  
-  public void testWaitForMapsBeforeLaunchingReduces() {
-    // We have set waitForMapsBeforeLaunchingReduces to false by default in
-    // this class, so this should return true
-    assertTrue(scheduler.enoughMapsFinishedToRunReduces(0, 100));
-    
-    // However, if we set waitForMapsBeforeLaunchingReduces to true, we should
-    // now no longer be able to assign reduces until 5 have finished
-    scheduler.waitForMapsBeforeLaunchingReduces = true;
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 100));
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(1, 100));
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(2, 100));
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(3, 100));
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(4, 100));
-    assertTrue(scheduler.enoughMapsFinishedToRunReduces(5, 100));
-    assertTrue(scheduler.enoughMapsFinishedToRunReduces(6, 100));
-    
-    // Also test some jobs that have very few maps, in which case we will
-    // wait for at least 1 map to finish
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 5));
-    assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 5));
-    assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 1));
-    assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 1));
+    assertTrue(scheduler.infos.get(job2).mapSchedulable.getFairShare() >
+               scheduler.infos.get(job1).mapSchedulable.getFairShare());
+    assertTrue(scheduler.infos.get(job1).reduceSchedulable.getFairShare() >
+               scheduler.infos.get(job2).reduceSchedulable.getFairShare());
   }
-  
 
   /**
    * This test submits jobs in three pools: poolA, which has a weight
@@ -1144,25 +1662,26 @@ public class TestFairScheduler extends T
     JobInfo info3 = scheduler.infos.get(job3);
     advanceTime(10);
     
-    assertEquals(1.14,  info1.mapFairShare, 0.01);
-    assertEquals(1.14,  info1.reduceFairShare, 0.01);
-    assertEquals(2.28,  info2.mapFairShare, 0.01);
-    assertEquals(2.28,  info2.reduceFairShare, 0.01);
-    assertEquals(0.57,  info3.mapFairShare, 0.01);
-    assertEquals(0.57,  info3.reduceFairShare, 0.01);
+    assertEquals(1.14,  info1.mapSchedulable.getFairShare(), 0.01);
+    assertEquals(1.14,  info1.reduceSchedulable.getFairShare(), 0.01);
+    assertEquals(2.28,  info2.mapSchedulable.getFairShare(), 0.01);
+    assertEquals(2.28,  info2.reduceSchedulable.getFairShare(), 0.01);
+    assertEquals(0.57,  info3.mapSchedulable.getFairShare(), 0.01);
+    assertEquals(0.57,  info3.reduceSchedulable.getFairShare(), 0.01);
     
     JobInProgress job4 = submitJob(JobStatus.RUNNING, 10, 10, "poolB");
     JobInfo info4 = scheduler.infos.get(job4);
     advanceTime(10);
     
-    assertEquals(1.14,  info1.mapFairShare, 0.01);

[... 1363 lines stripped ...]


Mime
View raw message