hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r788922 [2/2] - in /hadoop/mapreduce/trunk: ./ conf/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/java/org/apache/hadoop/map...
Date Sat, 27 Jun 2009 03:44:11 GMT
Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=788922&r1=788921&r2=788922&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Sat Jun 27 03:44:10 2009
@@ -25,15 +25,21 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 
 import junit.framework.TestCase;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 
@@ -46,11 +52,12 @@
   private static final String POOL_PROPERTY = "pool";
   
   private static int jobCounter;
-  private static int taskCounter;
   
-  static class FakeJobInProgress extends JobInProgress {
+  class FakeJobInProgress extends JobInProgress {
     
     private FakeTaskTrackerManager taskTrackerManager;
+    private int mapCounter = 0;
+    private int reduceCounter = 0;
     
     public FakeJobInProgress(JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager) throws IOException {
@@ -59,16 +66,62 @@
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus();
       this.status.setRunState(JobStatus.PREP);
+      this.nonLocalMaps = new LinkedList<TaskInProgress>();
+      this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+      this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+      this.nonRunningReduces = new LinkedList<TaskInProgress>();   
+      this.runningReduces = new LinkedHashSet<TaskInProgress>();
+      initTasks();
     }
     
     @Override
     public synchronized void initTasks() throws IOException {
-      // do nothing
+      // initTasks is needed to create non-empty cleanup and setup TIP
+      // arrays, otherwise calls such as job.getTaskInProgress will fail
+      JobID jobId = getJobID();
+      JobConf conf = getJobConf();
+      String jobFile = "";
+      // create two cleanup tips, one map and one reduce.
+      cleanup = new TaskInProgress[2];
+      // cleanup map tip.
+      cleanup[0] = new TaskInProgress(jobId, jobFile, null, 
+              jobtracker, conf, this, numMapTasks, 1);
+      cleanup[0].setJobCleanupTask();
+      // cleanup reduce tip.
+      cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
+                         numReduceTasks, jobtracker, conf, this, 1);
+      cleanup[1].setJobCleanupTask();
+      // create two setup tips, one map and one reduce.
+      setup = new TaskInProgress[2];
+      // setup map tip.
+      setup[0] = new TaskInProgress(jobId, jobFile, null, 
+              jobtracker, conf, this, numMapTasks + 1, 1);
+      setup[0].setJobSetupTask();
+      // setup reduce tip.
+      setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
+                         numReduceTasks + 1, jobtracker, conf, this, 1);
+      setup[1].setJobSetupTask();
+      // create maps
+      numMapTasks = conf.getNumMapTasks();
+      System.out.println("numMapTasks = " + numMapTasks);
+      maps = new TaskInProgress[numMapTasks];
+      for (int i = 0; i < numMapTasks; i++) {
+        maps[i] = new FakeTaskInProgress(getJobID(), 
+            getJobConf(), true, this);
+      }
+      // create reduces
+      numReduceTasks = conf.getNumReduceTasks();
+      System.out.println("numReduceTasks = " + numReduceTasks);
+      reduces = new TaskInProgress[numReduceTasks];
+      for (int i = 0; i < numReduceTasks; i++) {
+        reduces[i] = new FakeTaskInProgress(getJobID(), 
+            getJobConf(), false, this);
+      }
     }
 
     @Override
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
-        int ignored) throws IOException {
+        int numUniqueHosts) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(true);
       Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1) {
         @Override
@@ -76,8 +129,12 @@
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
         }
       };
-      taskTrackerManager.startTask(tts.getTrackerName(), task);
       runningMapTasks++;
+      FakeTaskInProgress tip = 
+        (FakeTaskInProgress) maps[attemptId.getTaskID().getId()];
+      tip.createTaskAttempt(task, tts.getTrackerName());
+      nonLocalRunningMaps.add(tip);
+      taskTrackerManager.startTask(tts.getTrackerName(), task, tip);
       return task;
     }
     
@@ -91,19 +148,118 @@
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
         }
       };
-      taskTrackerManager.startTask(tts.getTrackerName(), task);
       runningReduceTasks++;
+      FakeTaskInProgress tip = 
+        (FakeTaskInProgress) reduces[attemptId.getTaskID().getId()];
+      tip.createTaskAttempt(task, tts.getTrackerName());
+      runningReduces.add(tip);
+      taskTrackerManager.startTask(tts.getTrackerName(), task, tip);
       return task;
     }
     
+    public void mapTaskFinished(TaskInProgress tip) {
+      runningMapTasks--;
+      finishedMapTasks++;
+      nonLocalRunningMaps.remove(tip);
+    }
+    
+    public void reduceTaskFinished(TaskInProgress tip) {
+      runningReduceTasks--;
+      finishedReduceTasks++;
+      runningReduces.remove(tip);
+    }
+    
     private TaskAttemptID getTaskAttemptID(boolean isMap) {
       JobID jobId = getJobID();
       TaskType t = TaskType.REDUCE;
       if (isMap) {
         t = TaskType.MAP;
+        return new TaskAttemptID(jobId.getJtIdentifier(),
+            jobId.getId(), t, mapCounter++, 0);
+      } else {
+        return new TaskAttemptID(jobId.getJtIdentifier(),
+            jobId.getId(), t, reduceCounter++, 0);
       }
-      return new TaskAttemptID(jobId.getJtIdentifier(),
-          jobId.getId(), t, ++taskCounter, 0);
+    }
+  }
+  
+  class FakeTaskInProgress extends TaskInProgress {
+    private boolean isMap;
+    private FakeJobInProgress fakeJob;
+    private TreeMap<TaskAttemptID, String> activeTasks;
+    private TaskStatus taskStatus;
+    private boolean isComplete = false;
+    
+    FakeTaskInProgress(JobID jId, JobConf jobConf, boolean isMap,
+        FakeJobInProgress job) {
+      super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0, 1);
+      this.isMap = isMap;
+      this.fakeJob = job;
+      activeTasks = new TreeMap<TaskAttemptID, String>();
+      taskStatus = TaskStatus.createTaskStatus(isMap);
+      taskStatus.setRunState(TaskStatus.State.UNASSIGNED);
+    }
+
+    private void createTaskAttempt(Task task, String taskTracker) {
+      activeTasks.put(task.getTaskID(), taskTracker);
+      taskStatus = TaskStatus.createTaskStatus(isMap, task.getTaskID(),
+          0.5f, 1, TaskStatus.State.RUNNING, "", "", "", 
+          TaskStatus.Phase.STARTING, new Counters());
+      taskStatus.setStartTime(clock.getTime());
+    }
+    
+    @Override
+    TreeMap<TaskAttemptID, String> getActiveTasks() {
+      return activeTasks;
+    }
+    
+    public synchronized boolean isComplete() {
+      return isComplete;
+    }
+    
+    public boolean isRunning() {
+      return activeTasks.size() > 0;
+    }
+    
+    @Override
+    public TaskStatus getTaskStatus(TaskAttemptID taskid) {
+      return taskStatus;
+    }
+    
+    void killAttempt() {
+      if (isMap) {
+        fakeJob.mapTaskFinished(this);
+      }
+      else {
+        fakeJob.reduceTaskFinished(this);
+      }
+      activeTasks.clear();
+      taskStatus.setRunState(TaskStatus.State.UNASSIGNED);
+    }
+    
+    void finishAttempt() {
+      isComplete = true;
+      if (isMap) {
+        fakeJob.mapTaskFinished(this);
+      }
+      else {
+        fakeJob.reduceTaskFinished(this);
+      }
+      activeTasks.clear();
+      taskStatus.setRunState(TaskStatus.State.UNASSIGNED);
+    }
+  }
+  
+  static class FakeQueueManager extends QueueManager {
+    private Set<String> queues = null;
+    FakeQueueManager() {
+      super(new Configuration());
+    }
+    void setQueues(Set<String> queues) {
+      this.queues = queues;
+    }
+    public synchronized Set<String> getQueues() {
+      return queues;
     }
   }
   
@@ -118,24 +274,21 @@
     
     private Map<String, TaskTracker> trackers =
       new HashMap<String, TaskTracker>();
-    private Map<String, TaskStatus> taskStatuses = 
+    private Map<String, TaskStatus> statuses = 
       new HashMap<String, TaskStatus>();
-
-    public FakeTaskTrackerManager() {
-      TaskTracker tt1 = new TaskTracker("tt1");
-      tt1.setStatus(new TaskTrackerStatus("tt1", "tt1.host", 1,
-                                          new ArrayList<TaskStatus>(), 0,
-                                          maxMapTasksPerTracker, 
-                                          maxReduceTasksPerTracker));
-      trackers.put("tt1", tt1);
-      
-      TaskTracker tt2 = new TaskTracker("tt2");
-      tt2.setStatus(new TaskTrackerStatus("tt2", "tt2.host", 2,
-                                          new ArrayList<TaskStatus>(), 0,
-                                          maxMapTasksPerTracker, 
-                                          maxReduceTasksPerTracker));
-      trackers.put("tt2", tt2);
-
+    private Map<String, FakeTaskInProgress> tips = 
+      new HashMap<String, FakeTaskInProgress>();
+    private Map<String, TaskTrackerStatus> trackerForTip =
+      new HashMap<String, TaskTrackerStatus>();
+
+    public FakeTaskTrackerManager(int numTrackers) {
+      for (int i = 1; i <= numTrackers; i++) {
+        TaskTracker tt = new TaskTracker("tt" + i);
+        tt.setStatus(new TaskTrackerStatus("tt" + i,  "host" + i, i,
+            new ArrayList<TaskStatus>(), 0,
+            maxMapTasksPerTracker, maxReduceTasksPerTracker));
+        trackers.put("tt" + i, tt);
+      }
     }
     
     @Override
@@ -210,31 +363,48 @@
       return trackers.get(trackerID);
     }
     
-    public void startTask(String taskTrackerName, final Task t) {
-      if (t.isMapTask()) {
+    public void startTask(String trackerName, Task t, FakeTaskInProgress tip) {
+      final boolean isMap = t.isMapTask();
+      if (isMap) {
         maps++;
       } else {
         reduces++;
       }
-      TaskStatus status = new TaskStatus() {
-        @Override
-        public boolean getIsMap() {
-          return t.isMapTask();
-        }
-      };
-      taskStatuses.put(t.getTaskID().toString(), status);
+      String attemptId = t.getTaskID().toString();
+      TaskStatus status = tip.getTaskStatus(t.getTaskID());
+      TaskTrackerStatus trackerStatus = trackers.get(trackerName).getStatus();
+      tips.put(attemptId, tip);
+      statuses.put(attemptId, status);
+      trackerForTip.put(attemptId, trackerStatus);
       status.setRunState(TaskStatus.State.RUNNING);
-      trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
+      trackerStatus.getTaskReports().add(status);
     }
     
-    public void finishTask(String taskTrackerName, String tipId) {
-      TaskStatus status = taskStatuses.get(tipId);
-      if (status.getIsMap()) {
+    public void finishTask(String taskTrackerName, String attemptId) {
+      FakeTaskInProgress tip = tips.get(attemptId);
+      if (tip.isMapTask()) {
         maps--;
       } else {
         reduces--;
       }
-      status.setRunState(TaskStatus.State.SUCCEEDED);
+      tip.finishAttempt();
+      TaskStatus status = statuses.get(attemptId);
+      trackers.get(taskTrackerName).getStatus().getTaskReports().remove(status);
+    }
+
+    @Override
+    public boolean killTask(TaskAttemptID attemptId, boolean shouldFail) {
+      String attemptIdStr = attemptId.toString();
+      FakeTaskInProgress tip = tips.get(attemptIdStr);
+      if (tip.isMapTask()) {
+        maps--;
+      } else {
+        reduces--;
+      }
+      tip.killAttempt();
+      TaskStatus status = statuses.get(attemptIdStr);
+      trackerForTip.get(attemptIdStr).getTaskReports().remove(status);
+      return true;
     }
   }
   
@@ -246,20 +416,23 @@
   @Override
   protected void setUp() throws Exception {
     jobCounter = 0;
-    taskCounter = 0;
     new File(TEST_DIR).mkdirs(); // Make sure data directory exists
     // Create an empty pools file (so we can add/remove pools later)
     FileWriter fileWriter = new FileWriter(ALLOC_FILE);
     fileWriter.write("<?xml version=\"1.0\"?>\n");
     fileWriter.write("<allocations />\n");
     fileWriter.close();
+    setUpCluster(2);
+  }
+
+  private void setUpCluster(int numTaskTrackers) {
     conf = new JobConf();
     conf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE);
     conf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY);
     conf.set("mapred.fairscheduler.assignmultiple", "false");
-    taskTrackerManager = new FakeTaskTrackerManager();
+    taskTrackerManager = new FakeTaskTrackerManager(numTaskTrackers);
     clock = new FakeClock();
-    scheduler = new FairScheduler(clock, false);
+    scheduler = new FairScheduler(clock, true);
     scheduler.waitForMapsBeforeLaunchingReduces = false;
     scheduler.setConf(conf);
     scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -321,6 +494,10 @@
     out.println("<pool name=\"poolD\">");
     out.println("<maxRunningJobs>3</maxRunningJobs>");
     out.println("</pool>");
+    // Give pool E a preemption timeout of one minute
+    out.println("<pool name=\"poolE\">");
+    out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
+    out.println("</pool>");
     // Set default limit of jobs per pool to 15
     out.println("<poolMaxJobsDefault>15</poolMaxJobsDefault>");
     // Set default limit of jobs per user to 5
@@ -329,13 +506,18 @@
     out.println("<user name=\"user1\">");
     out.println("<maxRunningJobs>10</maxRunningJobs>");
     out.println("</user>");
+    // Set default min share preemption timeout to 2 minutes
+    out.println("<defaultMinSharePreemptionTimeout>120" 
+        + "</defaultMinSharePreemptionTimeout>"); 
+    // Set fair share preemption timeout to 5 minutes
+    out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>"); 
     out.println("</allocations>"); 
     out.close();
     
     PoolManager poolManager = scheduler.getPoolManager();
     poolManager.reloadAllocs();
     
-    assertEquals(5, poolManager.getPools().size()); // 4 in file + default pool
+    assertEquals(6, poolManager.getPools().size()); // 5 in file + default pool
     assertEquals(0, poolManager.getAllocation(Pool.DEFAULT_POOL_NAME,
         TaskType.MAP));
     assertEquals(0, poolManager.getAllocation(Pool.DEFAULT_POOL_NAME,
@@ -348,12 +530,25 @@
     assertEquals(0, poolManager.getAllocation("poolC", TaskType.REDUCE));
     assertEquals(0, poolManager.getAllocation("poolD", TaskType.MAP));
     assertEquals(0, poolManager.getAllocation("poolD", TaskType.REDUCE));
+    assertEquals(0, poolManager.getAllocation("poolE", TaskType.MAP));
+    assertEquals(0, poolManager.getAllocation("poolE", TaskType.REDUCE));
+    assertEquals(15, poolManager.getPoolMaxJobs(Pool.DEFAULT_POOL_NAME));
     assertEquals(15, poolManager.getPoolMaxJobs("poolA"));
     assertEquals(15, poolManager.getPoolMaxJobs("poolB"));
     assertEquals(15, poolManager.getPoolMaxJobs("poolC"));
     assertEquals(3, poolManager.getPoolMaxJobs("poolD"));
+    assertEquals(15, poolManager.getPoolMaxJobs("poolE"));
     assertEquals(10, poolManager.getUserMaxJobs("user1"));
     assertEquals(5, poolManager.getUserMaxJobs("user2"));
+    assertEquals(120000, poolManager.getMinSharePreemptionTimeout(
+        Pool.DEFAULT_POOL_NAME));
+    assertEquals(120000, poolManager.getMinSharePreemptionTimeout("poolA"));
+    assertEquals(120000, poolManager.getMinSharePreemptionTimeout("poolB"));
+    assertEquals(120000, poolManager.getMinSharePreemptionTimeout("poolC"));
+    assertEquals(120000, poolManager.getMinSharePreemptionTimeout("poolD"));
+    assertEquals(120000, poolManager.getMinSharePreemptionTimeout("poolA"));
+    assertEquals(60000, poolManager.getMinSharePreemptionTimeout("poolE"));
+    assertEquals(300000, poolManager.getFairSharePreemptionTimeout());
   }
   
   public void testTaskNotAssignedWhenNoJobsArePresent() throws IOException {
@@ -415,12 +610,12 @@
     assertEquals(2.0,  info2.reduceFairShare);
     
     // Assign tasks and check that all slots are filled with j1, then j2
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000006_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
     assertNull(scheduler.assignTasks(tracker("tt2")));
     
     // Check that the scheduler has started counting the tasks as running
@@ -486,14 +681,18 @@
     assertEquals(2.0,  info2.reduceFairShare);
     
     // Assign tasks and check that all slots are initially filled with job 1
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+    
+    // Check that no new tasks can be launched once the tasktrackers are full
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
     
     // Check that the scheduler has started counting the tasks as running
     // as soon as it launched them.
@@ -510,14 +709,14 @@
     // the task since FakeJobInProgress does not properly maintain running
     // tasks, so the scheduler will always get an empty task list from
     // the JobInProgress's getMapTasks/getReduceTasks and think they finished.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000000_0");
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000003_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000004_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000005_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000006_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000007_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000008_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000000_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000002_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000002_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000003_0");
     advanceTime(200);
     assertEquals(0,   info1.runningMaps);
     assertEquals(0,   info1.runningReduces);
@@ -529,24 +728,24 @@
     assertEquals(400, info2.reduceDeficit);
 
     // Assign tasks and check that all slots are now filled with job 2
-    checkAssignment("tt1", "attempt_test_0002_m_000009_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000010_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000011_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000012_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000013_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000014_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000015_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000016_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
 
     // Finish up the tasks and advance time again, but give job 2 only 50ms.
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000009_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000010_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000011_0");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000012_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000013_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000014_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000015_0");
-    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000016_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000000_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000000_0");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_r_000001_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000002_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000003_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000002_0");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000003_0");
     advanceTime(50);
     assertEquals(0,   info1.runningMaps);
     assertEquals(0,   info1.runningReduces);
@@ -558,14 +757,14 @@
     assertEquals(300, info2.reduceDeficit);
 
     // Assign tasks and check that all slots are now still with job 2
-    checkAssignment("tt1", "attempt_test_0002_m_000017_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000018_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000019_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000020_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000021_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000022_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000023_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000024_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0002_m_000004_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000005_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000005_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000007_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000006_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
   }
   
 
@@ -608,14 +807,14 @@
     assertEquals(266,  info2.reduceDeficit, 1.0);
     
     // Assign tasks and check that all slots are filled with j1, then j2
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
   }
   
   /**
@@ -699,14 +898,14 @@
     assertEquals(100,  info3.reduceDeficit);
     
     // Assign tasks and check that slots are first given to needy jobs
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000004_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2");
   }
 
   /**
@@ -799,14 +998,14 @@
     
     // Assign tasks and check that slots are first given to needy jobs, but
     // that job 1 gets two tasks after due to having a larger deficit.
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0003_r_000004_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0003_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
   }
   
   /**
@@ -859,14 +1058,14 @@
     assertEquals(2.0,  info2.reduceFairShare);
     
     // Assign tasks and check that slots are first given to needy jobs
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000008_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
   }
   
   /**
@@ -911,15 +1110,15 @@
     assertEquals(0.0,  info4.reduceFairShare);
     
     // Assign tasks and check that slots are first to jobs 1 and 2
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
-    checkAssignment("tt2", "attempt_test_0002_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0002_r_000008_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
   }
 
   /**
@@ -968,15 +1167,15 @@
     assertEquals(1.33,  info4.reduceFairShare, 0.1);
     
     // Assign tasks and check that slots are first to jobs 1 and 3
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     advanceTime(100);
-    checkAssignment("tt2", "attempt_test_0003_m_000005_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_m_000006_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_r_000007_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_r_000008_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
   }
   
   /**
@@ -1114,7 +1313,6 @@
     assertFalse(scheduler.enoughMapsFinishedToRunReduces(0, 1));
     assertTrue(scheduler.enoughMapsFinishedToRunReduces(1, 1));
   }
-  
 
   /**
    * This test submits jobs in three pools: poolA, which has a weight
@@ -1231,6 +1429,429 @@
     assertEquals(5, loadMgr.getCap(100, 5, 100));
     assertEquals(5, loadMgr.getCap(200, 5, 100));
   }
+
+  /**
+   * This test starts by launching a job in the default pool that takes
+   * all the slots in the cluster. We then submit a job in a pool with
+   * min share of 2 maps and 1 reduce task. After the min share preemption
+   * timeout, this job should be allowed to preempt tasks. 
+   */
+  public void testMinSharePreemption() throws Exception {
+    // Enable preemption in scheduler
+    scheduler.preemptionEnabled = true;
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give pool A a min share of 2 maps and 1 reduce, and a preemption
+    // timeout of 1 minute
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("<minReduces>1</minReduces>");
+    out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+
+    // Submit job 1 and assign all slots to it. Sleep a bit before assigning
+    // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    advanceTime(100);
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+    
+    // Ten seconds later, submit job 2.
+    advanceTime(10000);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    
+    // Ten seconds later, check that job 2 is not able to preempt tasks.
+    advanceTime(10000);
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+        clock.getTime()));
+    
+    // Advance time by 49 more seconds, putting us at 59s after the
+    // submission of job 2. It should still not be able to preempt.
+    advanceTime(49000);
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+        clock.getTime()));
+    
+    // Advance time by 2 seconds, putting us at 61s after the submission
+    // of job 2. It should now be able to preempt 2 maps and 1 reduce.
+    advanceTime(2000);
+    assertEquals(2, scheduler.tasksToPreempt(job2, TaskType.MAP,
+        clock.getTime()));
+    assertEquals(1, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+        clock.getTime()));
+
+    // Test that the tasks actually get preempted and we can assign new ones
+    scheduler.preemptTasksIfNecessary();
+    scheduler.update();
+    assertEquals(2, scheduler.runningTasks(job1, TaskType.MAP));
+    assertEquals(3, scheduler.runningTasks(job1, TaskType.REDUCE));
+    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+  }
+
+  /**
+   * This test starts by launching a job in the default pool that takes
+   * all the slots in the cluster. We then submit a job in a pool with
+   * min share of 3 maps and 3 reduce tasks, but which only actually
+   * needs 1 map and 2 reduces. We check that this job does not prempt
+   * more than this many tasks despite its min share being higher. 
+   */
+  public void testMinSharePreemptionWithSmallJob() throws Exception {
+    // Enable preemption in scheduler
+    scheduler.preemptionEnabled = true;
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give pool A a min share of 2 maps and 1 reduce, and a preemption
+    // timeout of 1 minute
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>3</minMaps>");
+    out.println("<minReduces>3</minReduces>");
+    out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+
+    // Submit job 1 and assign all slots to it. Sleep a bit before assigning
+    // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    advanceTime(100);
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+    
+    // Ten seconds later, submit job 2.
+    advanceTime(10000);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 1, 2, "poolA");
+    
+    // Advance time by 59 seconds and check that no preemption occurs.
+    advanceTime(59000);
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+        clock.getTime()));
+    
+    // Advance time by 2 seconds, putting us at 61s after the submission
+    // of job 2. Job 2 should now preempt 1 map and 2 reduces.
+    advanceTime(2000);
+    assertEquals(1, scheduler.tasksToPreempt(job2, TaskType.MAP,
+        clock.getTime()));
+    assertEquals(2, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+        clock.getTime()));
+
+    // Test that the tasks actually get preempted and we can assign new ones
+    scheduler.preemptTasksIfNecessary();
+    scheduler.update();
+    assertEquals(3, scheduler.runningTasks(job1, TaskType.MAP));
+    assertEquals(2, scheduler.runningTasks(job1, TaskType.REDUCE));
+    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+  }
+
+  /**
+   * This test runs on a 4-node (8-slot) cluster to allow 3 jobs with fair
+   * shares greater than 2 slots to coexist (which makes the half-fair-share 
+   * of each job more than 1 so that fair share preemption can kick in). 
+   * 
+   * The test first launches job 1, which takes 6 map slots and 6 reduce slots. 
+   * We then submit job 2, which takes 2 slots of each type. Finally, we submit 
+   * a third job, job 3, which gets no slots. At this point the fair share
+   * of each job will be 8/3 ~= 2.7 slots. Job 1 will be above its fair share,
+   * job 2 will be below it but at half fair share, and job 3 will
+   * be below half fair share. Therefore job 3 should be allowed to
+   * preempt a task (after a timeout) but jobs 1 and 2 shouldn't. 
+   */
+  public void testFairSharePreemption() throws Exception {
+    // Create a bigger cluster than normal (4 tasktrackers instead of 2)
+    setUpCluster(4);
+    // Enable preemption in scheduler
+    scheduler.preemptionEnabled = true;
+    // Set up pools file with a fair share preemtion timeout of 1 minute
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<fairSharePreemptionTimeout>60</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+
+    // Submit jobs 1 and 2. We advance time by 100 between each task tracker
+    // assignment stage to ensure that the tasks from job1 on tt3 are the ones
+    // that are deterministically preempted first (being the latest launched
+    // tasks in an over-allocated job).
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 6, 6);
+    advanceTime(100); // Makes job 1 deterministically launch before job 2
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10);
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    advanceTime(100);
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+    advanceTime(100);
+    checkAssignment("tt3", "attempt_test_0001_m_000004_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0001_r_000004_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0001_r_000005_0 on tt3");
+    advanceTime(100);
+    checkAssignment("tt4", "attempt_test_0002_m_000000_0 on tt4");
+    checkAssignment("tt4", "attempt_test_0002_m_000001_0 on tt4");
+    checkAssignment("tt4", "attempt_test_0002_r_000000_0 on tt4");
+    checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
+    
+    // Submit job 3.
+    JobInProgress job3 = submitJob(JobStatus.RUNNING, 10, 10);
+    
+    // Check that after 59 seconds, neither job can preempt
+    advanceTime(59000);
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(job3, TaskType.MAP,
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(job3, TaskType.REDUCE,
+        clock.getTime()));
+    
+    // Wait 2 more seconds, so that job 3 has now been in the system for 61s.
+    // Now job 3 should be able to preempt 1 task but job 2 shouldn't.
+    advanceTime(2000);
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+        clock.getTime()));
+    assertEquals(2, scheduler.tasksToPreempt(job3, TaskType.MAP,
+        clock.getTime()));
+    assertEquals(2, scheduler.tasksToPreempt(job3, TaskType.REDUCE,
+        clock.getTime()));
+    
+    // Test that the tasks actually get preempted and we can assign new ones
+    scheduler.preemptTasksIfNecessary();
+    scheduler.update();
+    assertEquals(4, scheduler.runningTasks(job1, TaskType.MAP));
+    assertEquals(4, scheduler.runningTasks(job1, TaskType.REDUCE));
+    checkAssignment("tt3", "attempt_test_0003_m_000000_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0003_r_000000_0 on tt3");
+    checkAssignment("tt3", "attempt_test_0003_r_000001_0 on tt3");
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+    assertNull(scheduler.assignTasks(tracker("tt3")));
+    assertNull(scheduler.assignTasks(tracker("tt4")));
+  }
+  
+  /**
+   * This test submits a job that takes all 4 slots, and then a second
+   * job that has both a min share of 2 slots with a 60s timeout and a
+   * fair share timeout of 60s. After 60 seconds, this job will be starved
+   * of both min share (2 slots of each type) and fair share (2 slots of each
+   * type), and we test that it does not kill more than 2 tasks of each type
+   * in total.
+   */
+  public void testMinAndFairSharePreemption() throws Exception {
+    // Enable preemption in scheduler
+    scheduler.preemptionEnabled = true;
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give pool A a min share of 2 maps and 1 reduce, and a preemption
+    // timeout of 1 minute
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("<minReduces>2</minReduces>");
+    out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
+    out.println("</pool>");
+    out.println("<fairSharePreemptionTimeout>60</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+
+    // Submit job 1 and assign all slots to it. Sleep a bit before assigning
+    // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    advanceTime(100);
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+    
+    // Ten seconds later, submit job 2.
+    advanceTime(10000);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    
+    // Ten seconds later, check that job 2 is not able to preempt tasks.
+    advanceTime(10000);
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+        clock.getTime()));
+    
+    // Advance time by 49 more seconds, putting us at 59s after the
+    // submission of job 2. It should still not be able to preempt.
+    advanceTime(49000);
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.MAP,
+        clock.getTime()));
+    assertEquals(0, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+        clock.getTime()));
+    
+    // Advance time by 2 seconds, putting us at 61s after the submission
+    // of job 2. It should now be able to preempt 2 maps and 1 reduce.
+    advanceTime(2000);
+    assertEquals(2, scheduler.tasksToPreempt(job2, TaskType.MAP,
+        clock.getTime()));
+    assertEquals(2, scheduler.tasksToPreempt(job2, TaskType.REDUCE,
+        clock.getTime()));
+
+    // Test that the tasks actually get preempted and we can assign new ones
+    scheduler.preemptTasksIfNecessary();
+    scheduler.update();
+    assertEquals(2, scheduler.runningTasks(job1, TaskType.MAP));
+    assertEquals(2, scheduler.runningTasks(job1, TaskType.REDUCE));
+    checkAssignment("tt2", "attempt_test_0002_m_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000000_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+  }
+  
+  /**
+   * This is a copy of testMinAndFairSharePreemption that turns preemption
+   * off and verifies that no tasks get killed.
+   */
+  public void testNoPreemptionIfDisabled() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give pool A a min share of 2 maps and 1 reduce, and a preemption
+    // timeout of 1 minute
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("<minReduces>2</minReduces>");
+    out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
+    out.println("</pool>");
+    out.println("<fairSharePreemptionTimeout>60</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+
+    // Submit job 1 and assign all slots to it. Sleep a bit before assigning
+    // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    advanceTime(100);
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+    
+    // Ten seconds later, submit job 2.
+    advanceTime(10000);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    
+    // Advance time by 61s, putting us past the preemption timeout,
+    // and check that no tasks get preempted.
+    advanceTime(61000);
+    scheduler.preemptTasksIfNecessary();
+    scheduler.update();
+    assertEquals(4, scheduler.runningTasks(job1, TaskType.MAP));
+    assertEquals(4, scheduler.runningTasks(job1, TaskType.REDUCE));
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+  }
+
+  /**
+   * This is a copy of testMinAndFairSharePreemption that turns preemption
+   * on but also turns on mapred.fairscheduler.preemption.only.log (the
+   * "dry run" parameter for testing out preemption) and verifies that no
+   * tasks get killed.
+   */
+  public void testNoPreemptionIfOnlyLogging() throws Exception {
+    // Turn on preemption, but for logging only
+    scheduler.preemptionEnabled = true;
+    scheduler.onlyLogPreemption = true;
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Give pool A a min share of 2 maps and 1 reduce, and a preemption
+    // timeout of 1 minute
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>2</minMaps>");
+    out.println("<minReduces>2</minReduces>");
+    out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
+    out.println("</pool>");
+    out.println("<fairSharePreemptionTimeout>60</fairSharePreemptionTimeout>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+
+    // Submit job 1 and assign all slots to it. Sleep a bit before assigning
+    // tasks on tt1 and tt2 to ensure that the ones on tt2 get preempted first.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 10);
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    advanceTime(100);
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000003_0 on tt2");
+    
+    // Ten seconds later, submit job 2.
+    advanceTime(10000);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 10, "poolA");
+    
+    // Advance time by 61s, putting us past the preemption timeout,
+    // and check that no tasks get preempted.
+    advanceTime(61000);
+    scheduler.preemptTasksIfNecessary();
+    scheduler.update();
+    assertEquals(4, scheduler.runningTasks(job1, TaskType.MAP));
+    assertEquals(4, scheduler.runningTasks(job1, TaskType.REDUCE));
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+  }
   
   private void advanceTime(long time) {
     clock.advance(time);

Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml?rev=788922&r1=788921&r2=788922&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml Sat Jun 27 03:44:10 2009
@@ -39,52 +39,61 @@
         free up are assigned to the new jobs, so that each job gets
         roughly the same amount of CPU time. Unlike the default Hadoop
         scheduler, which forms a queue of jobs, this lets short jobs finish
-        in reasonable time while not starving long jobs. It is also a 
-        reasonable way to share a cluster between a number of users. Finally, 
-        fair sharing can also work with job priorities - the priorities are
+        in reasonable time while not starving long jobs. It is also an easy
+        way to share a cluster between multiple of users.
+        Fair sharing can also work with job priorities - the priorities are
         used as weights to determine the fraction of total compute time that
-        each job should get.
+        each job gets.
       </p>
       <p>
-        The scheduler actually organizes jobs further into "pools", and 
-        shares resources fairly between these pools. By default, there is a 
+        The fair scheduler organizes jobs into <em>pools</em>, and 
+        divides resources fairly between these pools. By default, there is a 
         separate pool for each user, so that each user gets the same share 
-        of the cluster no matter how many jobs they submit. However, it is 
-        also possible to set a job's pool based on the user's Unix group or
-        any other jobconf property, such as the queue name property used by 
-        <a href="capacity_scheduler.html">Capacity Scheduler</a>. 
-        Within each pool, fair sharing is used to share capacity between 
+        of the cluster no matter how many jobs they submit. It is also
+        possible to set a job's pool based on the user's Unix group or
+        any jobconf property. 
+        Within each pool, fair sharing is used to divide capacity between 
         the running jobs. Pools can also be given weights to share the 
-        cluster non-proportionally in the config file.
+        cluster non-proportionally.
       </p>
       <p>
         In addition to providing fair sharing, the Fair Scheduler allows
-        assigning guaranteed minimum shares to pools, which is useful for
-        ensuring that certain users, groups or production applications
+        assigning guaranteed <em>minimum shares</em> to pools, which is useful
+        for ensuring that certain users, groups or production applications
         always get sufficient resources. When a pool contains jobs, it gets
         at least its minimum share, but when the pool does not need its full
-        guaranteed share, the excess is split between other running jobs.
-        This lets the scheduler guarantee capacity for pools while utilizing
-        resources efficiently when these pools don't contain jobs.       
+        guaranteed share, the excess is split between other pools.
       </p>
       <p>
-        The Fair Scheduler lets all jobs run by default, but it is also
-        possible to limit the number of running jobs per user and per pool
-        through the config file. This can be useful when a user must submit
-        hundreds of jobs at once, or in general to improve performance if
-        running too many jobs at once would cause too much intermediate data
-        to be created or too much context-switching. Limiting the jobs does
-        not cause any subsequently submitted jobs to fail, only to wait in the
-        sheduler's queue until some of the user's earlier jobs finish. Jobs to
-        run from each user/pool are chosen in order of priority and then
-        submit time, as in the default FIFO scheduler in Hadoop.
+        In normal operation, when a new job is submitted, the scheduler 
+        waits for tasks from existing jobs to finish in order to free up
+        slots for the new job. However, the scheduler also optionally supports
+        <em>preemption</em> of running jobs after configurable timeouts.
+        If the new job's minimum share is not reached after
+        a certain amount of time, the job is allowed to kill tasks from
+        existing jobs to make room to run.
+        Preemption can thus be used to guarantee
+        that "production" jobs run at specified times while allowing
+        the Hadoop cluster to also be used for experimental and research jobs.
+        In addition, a job can also be allowed to preempt tasks if it is
+        below half of its fair share for a configurable timeout (generally
+        set larger than the minimum share timeout).
+        When choosing tasks to kill, the fair scheduler picks the
+        most-recently-launched tasks from over-allocated jobs, 
+        to minimize wasted computation.
+        Preemption does not cause the preempted jobs to fail, because Hadoop
+        jobs tolerate losing tasks; it only makes them take longer to finish.
       </p>
       <p>
-        Finally, the fair scheduler provides several extension points where
-        the basic functionality can be extended. For example, the weight
-        calculation can be modified to give a priority boost to new jobs,
-        implementing a "shortest job first" policy which reduces response
-        times for interactive jobs even further.
+        Finally, the Fair Scheduler can limit the number of concurrent
+        running jobs per user and per pool. This can be useful when a 
+        user must submit hundreds of jobs at once, and for ensuring that
+        intermediate data does not fill up disk space on a cluster if too many
+        concurrent jobs are running.
+        Setting job limits causes jobs submitted beyond the limit to wait in the
+        scheduler's queue until some of the user/pool's earlier jobs finish.
+        Jobs to run from each user/pool are chosen in order of priority and then
+        submit time.
       </p>
     </section>
 
@@ -94,20 +103,14 @@
         To run the fair scheduler in your Hadoop installation, you need to put
         it on the CLASSPATH. The easiest way is to copy the 
         <em>hadoop-*-fairscheduler.jar</em> from
-        <em>HADOOP_HOME/contrib/fairscheduler</em> to <em>HADOOP_HOME/lib</em>.
+        <em>HADOOP_HOME/build/contrib/fairscheduler</em> to <em>HADOOP_HOME/lib</em>.
         Alternatively you can modify <em>HADOOP_CLASSPATH</em> to include this jar, in
         <em>HADOOP_CONF_DIR/hadoop-env.sh</em>
       </p>
       <p>
-        In order to compile fair scheduler, from sources execute <em> ant 
-        package</em> in source folder and copy the 
-        <em>build/contrib/fair-scheduler/hadoop-*-fairscheduler.jar</em> 
-        to <em>HADOOP_HOME/lib</em>
-      </p>
-      <p>
        You will also need to set the following property in the Hadoop config 
        file  <em>HADOOP_CONF_DIR/mapred-site.xml</em> to have Hadoop use 
-       the fair scheduler: <br/>
+       the fair scheduler: <br/><br/>
        <code>&lt;property&gt;</code><br/> 
        <code>&nbsp;&nbsp;&lt;name&gt;mapred.jobtracker.taskScheduler&lt;/name&gt;</code><br/>
        <code>&nbsp;&nbsp;&lt;value&gt;org.apache.hadoop.mapred.FairScheduler&lt;/value&gt;</code><br/>
@@ -115,162 +118,309 @@
       </p>
       <p>
         Once you restart the cluster, you can check that the fair scheduler 
-        is running by going to http://&lt;jobtracker URL&gt;/scheduler 
+        is running by going to <em>http://&lt;jobtracker URL&gt;/scheduler</em> 
         on the JobTracker's web UI. A &quot;job scheduler administration&quot; page should 
         be visible there. This page is described in the Administration section.
       </p>
+      <p>
+        If you wish to compile the fair scheduler from source, run <em> ant 
+        package</em> in your HADOOP_HOME directory. This will build
+        <em>build/contrib/fair-scheduler/hadoop-*-fairscheduler.jar</em>.
+      </p>
     </section>
     
     <section>
-      <title>Configuring the Fair scheduler</title>
+      <title>Configuration</title>
+      <p>
+        The Fair Scheduler contains configuration in two places -- algorithm
+        parameters are set in <em>mapred-site.xml</em>, while a separate XML
+        file called the <em>allocation file</em> can be used to configure
+        pools, minimum shares, running job limits and preemption timeouts.
+        The allocation file is reloaded periodically at runtime, 
+        allowing you to change pool settings without restarting 
+        your Hadoop cluster.
+      </p>
       <p>
-      The following properties can be set in mapred-site.xml to configure 
-      the fair scheduler:
+        For a minimal installation, to just get equal sharing between users,
+        you will not need to set up an allocation file. If you do set up an
+        allocation file, you will need to tell the scheduler where to
+        find it by setting the <em>mapred.fairscheduler.allocation.file</em>
+        parameter in <em>mapred-site.xml</em> as described below.
       </p>
-      <table>
-        <tr>
-        <th>Name</th><th>Description</th>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.allocation.file
-        </td>
-        <td>
-          Specifies an absolute path to an XML file which contains the 
-          allocations for each pool, as well as the per-pool and per-user 
-          limits on number of running jobs. If this property is not 
-          provided, allocations are not used.<br/>
-          This file must be in XML format, and can contain three types of 
-          elements:
+      <section>
+      <title>Scheduler Parameters in mapred-site.xml</title>
+        <p>
+          The following parameters can be set in <em>mapred-site.xml</em>
+          to affect the behavior of the fair scheduler:
+        </p>
+        <p><strong>Basic Parameters:</strong></p>
+        <table>
+          <tr>
+          <th>Name</th><th>Description</th>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.allocation.file
+          </td>
+          <td>
+            Specifies an absolute path to an XML file which contains minimum
+            shares for each pool, per-pool and per-user limits on number of
+            running jobs, and preemption timeouts. If this property is not 
+            set, these features are not used.
+            The <a href="#Allocation+File+Format">allocation file
+            format</a> is described later.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.preemption
+          </td>
+          <td>
+            Boolean property for enabling preemption. Default: false.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.poolnameproperty
+          </td>
+          <td>
+            Specify which jobconf property is used to determine the pool that a
+            job belongs in. String, default: <em>user.name</em>
+            (i.e. one pool for each user). 
+            Another useful value is <em>group.name</em> to create a
+            pool per Unix group.
+            Finally, a common setting is to use a non-standard property
+            such as <em>pool.name</em> as the pool name property, and make it
+            default to <em>user.name</em> through the following setting:<br/>
+            <code>&lt;property&gt;</code><br/> 
+            <code>&nbsp;&nbsp;&lt;name&gt;pool.name&lt;/name&gt;</code><br/>
+            <code>&nbsp;&nbsp;&lt;value&gt;${user.name}&lt;/value&gt;</code><br/>
+            <code>&lt;/property&gt;</code><br/>
+            This allows you to specify the pool name explicitly for some jobs
+            through the jobconf (e.g. passing <em>-Dpool.name=&lt;name&gt;</em>
+            to <em>bin/hadoop jar</em>, while having the default be the user's
+            pool.
+          </td>
+          </tr>
+        </table>
+        <p><strong>Advanced Parameters:</strong></p>
+        <table>
+          <tr>
+          <th>Name</th><th>Description</th>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.sizebasedweight
+          </td>
+          <td>
+            Take into account job sizes in calculating their weights for fair 
+            sharing. By default, weights are only based on job priorities. 
+            Setting this flag to true will make them based on the size of the 
+            job (number of tasks needed) as well,though not linearly 
+            (the weight will be proportional to the log of the number of tasks 
+            needed). This lets larger jobs get larger fair shares while still 
+            providing enough of a share to small jobs to let them finish fast. 
+            Boolean value, default: false.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.preemption.only.log
+          </td>
+          <td>
+            This flag will cause the scheduler to run through the preemption
+            calculations but simply log when it wishes to preempt a task,
+            without actually preempting the task. 
+            Boolean property, default: false.
+            This property can be useful for
+            doing a "dry run" of preemption before enabling it to make sure
+            that you have not set timeouts too aggressively.
+            You will see preemption log messages in your JobTracker's output
+            log (<em>HADOOP_LOG_DIR/hadoop-jobtracker-*.log</em>).
+            The messages look as follows:<br/>
+            <code>Should preempt 2 tasks for job_20090101337_0001: tasksDueToMinShare = 2, tasksDueToFairShare = 0</code>
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.update.interval
+          </td>
+          <td>
+            Interval at which to update fair share calculations. The default
+            of 500ms works well for clusters with fewer than 500 nodes, 
+            but larger values reduce load on the JobTracker for larger clusters.
+            Integer value in milliseconds, default: 500.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.preemption.interval
+          </td>
+          <td>
+            Interval at which to check for tasks to preempt. The default
+            of 15s works well for timeouts on the order of minutes.
+            It is not recommended to set timeouts much smaller than this
+            amount, but you can use this value to make preemption computations
+            run more often if you do set such timeouts. A value of less than
+            5s will probably be too small, however, as it becomes less than
+            the inter-heartbeat interval.
+            Integer value in milliseconds, default: 15000.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.weightadjuster
+          </td>
+          <td>
+          An extension point that lets you specify a class to adjust the 
+          weights of running jobs. This class should implement the 
+          <em>WeightAdjuster</em> interface. There is currently one example 
+          implementation - <em>NewJobWeightBooster</em>, which increases the 
+          weight of jobs for the first 5 minutes of their lifetime to let 
+          short jobs finish faster. To use it, set the weightadjuster 
+          property to the full class name, 
+          <code>org.apache.hadoop.mapred.NewJobWeightBooster</code>.
+          NewJobWeightBooster itself provides two parameters for setting the 
+          duration and boost factor.
           <ul>
-          <li>pool elements, which may contain elements for minMaps, 
-          minReduces, maxRunningJobs (limit the number of jobs from the 
-          pool to run at once),and weight (to share the cluster 
-          non-proportionally with other pools).
-          </li>
-          <li>user elements, which may contain a maxRunningJobs to limit 
-          jobs. Note that by default, there is a separate pool for each 
-          user, so these may not be necessary; they are useful, however, 
-          if you create a pool per user group or manually assign jobs 
-          to pools.</li>
-          <li>A userMaxJobsDefault element, which sets the default running 
-          job limit for any users whose limit is not specified.</li>
+          <li><em>mapred.newjobweightbooster.factor</em>
+            Factor by which new jobs weight should be boosted. 
+            Default is 3.</li>
+          <li><em>mapred.newjobweightbooster.duration</em>
+            Boost duration in milliseconds. Default is 300000 for 5 minutes.</li>
           </ul>
-          <br/>
-          Example Allocation file is listed below :<br/>
-          <code>&lt;?xml version="1.0"?&gt; </code> <br/>
-          <code>&lt;allocations&gt;</code> <br/> 
-          <code>&nbsp;&nbsp;&lt;pool name="sample_pool"&gt;</code><br/>
-          <code>&nbsp;&nbsp;&nbsp;&nbsp;&lt;minMaps&gt;5&lt;/minMaps&gt;</code><br/>
-          <code>&nbsp;&nbsp;&nbsp;&nbsp;&lt;minReduces&gt;5&lt;/minReduces&gt;</code><br/>
-          <code>&nbsp;&nbsp;&nbsp;&nbsp;&lt;weight&gt;2.0&lt;/weight&gt;</code><br/>
-          <code>&nbsp;&nbsp;&lt;/pool&gt;</code><br/>
-          <code>&nbsp;&nbsp;&lt;user name="sample_user"&gt;</code><br/>
-          <code>&nbsp;&nbsp;&nbsp;&nbsp;&lt;maxRunningJobs&gt;6&lt;/maxRunningJobs&gt;</code><br/>
-          <code>&nbsp;&nbsp;&lt;/user&gt;</code><br/>
-          <code>&nbsp;&nbsp;&lt;userMaxJobsDefault&gt;3&lt;/userMaxJobsDefault&gt;</code><br/>
-          <code>&lt;/allocations&gt;</code>
-          <br/>
-          This example creates a pool sample_pool with a guarantee of 5 map 
-          slots and 5 reduce slots. The pool also has a weight of 2.0, meaning 
-          it has a 2x higher share of the cluster than other pools (the default 
-          weight is 1). Finally, the example limits the number of running jobs 
-          per user to 3, except for sample_user, who can run 6 jobs concurrently. 
-          Any pool not defined in the allocations file will have no guaranteed 
-          capacity and a weight of 1.0. Also, any pool or user with no max 
-          running jobs set in the file will be allowed to run an unlimited 
-          number of jobs.
-        </td>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.assignmultiple
-        </td>
-        <td>
-          Allows the scheduler to assign both a map task and a reduce task 
-          on each heartbeat, which improves cluster throughput when there 
-          are many small tasks to run. Boolean value, default: true.
-        </td>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.sizebasedweight
-        </td>
-        <td>
-          Take into account job sizes in calculating their weights for fair 
-          sharing.By default, weights are only based on job priorities. 
-          Setting this flag to true will make them based on the size of the 
-          job (number of tasks needed) as well,though not linearly 
-          (the weight will be proportional to the log of the number of tasks 
-          needed). This lets larger jobs get larger fair shares while still 
-          providing enough of a share to small jobs to let them finish fast. 
-          Boolean value, default: false.
-        </td>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.poolnameproperty
-        </td>
-        <td>
-          Specify which jobconf property is used to determine the pool that a
-          job belongs in. String, default: user.name (i.e. one pool for each 
-          user). Some other useful values to set this to are: <br/>
-          <ul> 
-            <li> group.name (to create a pool per Unix group).</li>
-            <li>mapred.job.queue.name (the same property as the queue name in 
-            <a href="capacity_scheduler.html">Capacity Scheduler</a>).</li>
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.loadmanager
+          </td>
+          <td>
+            An extension point that lets you specify a class that determines 
+            how many maps and reduces can run on a given TaskTracker. This class 
+            should implement the LoadManager interface. By default the task caps 
+            in the Hadoop config file are used, but this option could be used to 
+            make the load based on available memory and CPU utilization for example.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.taskselector
+          </td>
+          <td>
+          An extension point that lets you specify a class that determines 
+          which task from within a job to launch on a given tracker. This can be 
+          used to change either the locality policy (e.g. keep some jobs within 
+          a particular rack) or the speculative execution algorithm (select 
+          when to launch speculative tasks). The default implementation uses 
+          Hadoop's default algorithms from JobInProgress.
+          </td>
+          </tr>
+          <!--
+          <tr>
+          <td>
+            mapred.fairscheduler.eventlog.enabled
+          </td>
+          <td>
+            Enable a detailed log of fair scheduler events, useful for
+            debugging.
+            This log is stored in <em>HADOOP_LOG_DIR/fairscheduler</em>.
+            Boolean value, default: false.
+          </td>
+          </tr>
+          <tr>
+          <td>
+            mapred.fairscheduler.dump.interval
+          </td>
+          <td>
+            If using the event log, this is the interval at which to dump
+            complete scheduler state (list of pools and jobs) to the log.
+            Integer value in milliseconds, default: 10000.
+          </td>
+          </tr>
+          -->
+        </table>
+      </section>  
+      <section>
+        <title>Allocation File Format</title>
+        <p>
+        The allocation file configures minimum shares, running job
+        limits, weights and preemption timeouts for each pool.
+        An example is provided in 
+        <em>HADOOP_HOME/conf/fair-scheduler.xml.template</em>.
+        The allocation file can contain the following types of elements:
+        </p>
+        <ul>
+        <li><em>pool</em> elements, which configure each pool.
+        These may contain the following sub-elements:
+          <ul>
+          <li><em>minMaps</em> and <em>minReduces</em>,
+            to set the pool's minimum share of task slots.</li>
+          <li><em>maxRunningJobs</em>, 
+          to limit the number of jobs from the 
+          pool to run at once (defaults to infinite).</li>
+          <li><em>weight</em>, to share the cluster 
+          non-proportionally with other pools (defaults to 1.0).</li>
+          <li><em>minSharePreemptionTimeout</em>, the
+            number of seconds the pool will wait before
+            killing other pools' tasks if it is below its minimum share
+            (defaults to infinite).</li>
           </ul>
-        </td>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.weightadjuster
-        </td>
-        <td>
-        An extensibility point that lets you specify a class to adjust the 
-        weights of running jobs. This class should implement the 
-        <em>WeightAdjuster</em> interface. There is currently one example 
-        implementation - <em>NewJobWeightBooster</em>, which increases the 
-        weight of jobs for the first 5 minutes of their lifetime to let 
-        short jobs finish faster. To use it, set the weightadjuster 
-        property to the full class name, 
-        <code>org.apache.hadoop.mapred.NewJobWeightBooster</code> 
-        NewJobWeightBooster itself provides two parameters for setting the 
-        duration and boost factor. <br/>
-        <ol>
-        <li> <em>mapred.newjobweightbooster.factor</em>
-          Factor by which new jobs weight should be boosted. Default is 3</li>
-        <li><em>mapred.newjobweightbooster.duration</em>
-          Duration in milliseconds, default 300000 for 5 minutes</li>
-        </ol>
-        </td>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.loadmanager
-        </td>
-        <td>
-          An extensibility point that lets you specify a class that determines 
-          how many maps and reduces can run on a given TaskTracker. This class 
-          should implement the LoadManager interface. By default the task caps 
-          in the Hadoop config file are used, but this option could be used to 
-          make the load based on available memory and CPU utilization for example.
-        </td>
-        </tr>
-        <tr>
-        <td>
-          mapred.fairscheduler.taskselector:
-        </td>
-        <td>
-        An extensibility point that lets you specify a class that determines 
-        which task from within a job to launch on a given tracker. This can be 
-        used to change either the locality policy (e.g. keep some jobs within 
-        a particular rack) or the speculative execution algorithm (select 
-        when to launch speculative tasks). The default implementation uses 
-        Hadoop's default algorithms from JobInProgress.
-        </td>
-        </tr>
-      </table>      
+        </li>
+        <li><em>user</em> elements, which may contain a 
+        <em>maxRunningJobs</em> element to limit 
+        jobs. Note that by default, there is a pool for each 
+        user, so per-user limits are not necessary.</li>
+        <li><em>poolMaxJobsDefault</em>, which sets the default running 
+        job limit for any pools whose limit is not specified.</li>
+        <li><em>userMaxJobsDefault</em>, which sets the default running 
+        job limit for any users whose limit is not specified.</li>
+        <li><em>defaultMinSharePreemptionTimeout</em>, 
+        which sets the default minimum share preemption timeout 
+        for any pools where it is not specified.</li>
+        <li><em>fairSharePreemptionTimeout</em>, 
+        which sets the preemption timeout used when jobs are below half
+        their fair share.</li>
+        </ul>
+        <p>
+        Pool and user elements only required if you are setting
+        non-default values for the pool/user. That is, you do not need to
+        declare all users and all pools in your config file before running
+        the fair scheduler. If a user or pool is not listed in the config file,
+        the default values for limits, preemption timeouts, etc will be used.
+        </p>
+        <p>
+        An example allocation file is given below : </p>
+        <p>
+        <code>&lt;?xml version="1.0"?&gt; </code> <br/>
+        <code>&lt;allocations&gt;</code> <br/> 
+        <code>&nbsp;&nbsp;&lt;pool name="sample_pool"&gt;</code><br/>
+        <code>&nbsp;&nbsp;&nbsp;&nbsp;&lt;minMaps&gt;5&lt;/minMaps&gt;</code><br/>
+        <code>&nbsp;&nbsp;&nbsp;&nbsp;&lt;minReduces&gt;5&lt;/minReduces&gt;</code><br/>
+        <code>&nbsp;&nbsp;&nbsp;&nbsp;&lt;weight&gt;2.0&lt;/weight&gt;</code><br/>
+        <code>&nbsp;&nbsp;&lt;/pool&gt;</code><br/>
+        <code>&nbsp;&nbsp;&lt;user name="sample_user"&gt;</code><br/>
+        <code>&nbsp;&nbsp;&nbsp;&nbsp;&lt;maxRunningJobs&gt;6&lt;/maxRunningJobs&gt;</code><br/>
+        <code>&nbsp;&nbsp;&lt;/user&gt;</code><br/>
+        <code>&nbsp;&nbsp;&lt;userMaxJobsDefault&gt;3&lt;/userMaxJobsDefault&gt;</code><br/>
+        <code>&lt;/allocations&gt;</code>
+        </p>
+        <p>
+        This example creates a pool sample_pool with a guarantee of 5 map 
+        slots and 5 reduce slots. The pool also has a weight of 2.0, meaning 
+        it has a 2x higher share of the cluster than other pools (the default 
+        weight is 1). Finally, the example limits the number of running jobs 
+        per user to 3, except for sample_user, who can run 6 jobs concurrently. 
+        Any pool not defined in the allocation file will have no guaranteed 
+        capacity and a weight of 1.0. Also, any pool or user with no max 
+        running jobs set in the file will be allowed to run an unlimited 
+        number of jobs.
+        </p>
+        <p>
+        A more detailed example file, setting preemption timeouts as well,
+        is available in <em>HADOOP_HOME/conf/fair-scheduler.xml.template</em>.
+        </p>
+      </section>
     </section>
     <section>
     <title> Administration</title>
@@ -280,14 +430,15 @@
     </p> 
     <ol>
     <li>
-      It is possible to modify pools' allocations 
-      and user and pool running job limits at runtime by editing the allocation 
-      config file. The scheduler will reload this file 10-15 seconds after it 
+      It is possible to modify minimum shares, limits, weights and preemption
+      timeouts at runtime by editing the allocation file.
+      The scheduler will reload this file 10-15 seconds after it 
       sees that it was modified.
      </li>
      <li>
      Current jobs, pools, and fair shares  can be examined through the 
-     JobTracker's web interface, at  http://&lt;jobtracker URL&gt;/scheduler. 
+     JobTracker's web interface, at
+     <em>http://&lt;JobTracker URL&gt;/scheduler</em>. 
      On this interface, it is also possible to modify jobs' priorities or 
      move jobs from one pool to another and see the effects on the fair 
      shares (this requires JavaScript).
@@ -312,9 +463,9 @@
      the job has had, but on average it will get its fair share amount.</li>
      </ul>
      <p>
-     In addition, it is possible to turn on an "advanced" view for the web UI,
-     by going to http://&lt;jobtracker URL&gt;/scheduler?advanced. This view shows 
-     four more columns used for calculations internally:
+     In addition, it is possible to view an "advanced" version of the web 
+     UI by going to <em>http://&lt;JobTracker URL&gt;/scheduler?advanced</em>. 
+     This view shows four more columns:
      </p>
      <ul>
      <li><em>Maps/Reduce Weight</em>: Weight of the job in the fair sharing 
@@ -359,13 +510,30 @@
      This capacity is divided among the jobs in that pool according again to 
      their weights.
      </p>
-     <p>Finally, when limits on a user's running jobs or a pool's running jobs 
+     <p>When limits on a user's running jobs or a pool's running jobs 
      are in place, we choose which jobs get to run by sorting all jobs in order 
      of priority and then submit time, as in the standard Hadoop scheduler. Any 
      jobs that fall after the user/pool's limit in this ordering are queued up 
      and wait idle until they can be run. During this time, they are ignored 
      from the fair sharing calculations and do not gain or lose deficit (their 
      fair share is set to zero).</p>
+     <p>
+     Preemption is implemented by periodically checking whether jobs are
+     below their minimum share or below half their fair share. If a job has
+     been below its share for sufficiently long, it is allowed to kill
+     other jobs' tasks. The tasks chosen are the most-recently-launched
+     tasks from over-allocated jobs, to minimize the amount of wasted
+     computation.
+     </p>
+     <p>
+     Finally, the fair scheduler provides several extension points where
+     the basic functionality can be extended. For example, the weight
+     calculation can be modified to give a priority boost to new jobs,
+     implementing a "shortest job first" policy which reduces response
+     times for interactive jobs even further.
+     These extension points are listed in
+     <a href="#Advanced+Parameters">advanced mapred-site.xml properties</a>.
+     </p>
     </section>
   </body>  
 </document>

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=788922&r1=788921&r2=788922&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java Sat Jun 27 03:44:10 2009
@@ -88,6 +88,16 @@
    * @return jobInProgress object
    */
   public JobInProgress getJob(JobID jobid);
+
+  /**
+   * Mark the task attempt identified by taskid to be killed
+   * 
+   * @param taskid task to kill
+   * @param shouldFail whether to count the task as failed
+   * @return true if the task was found and successfully marked to kill
+   */
+  public boolean killTask(TaskAttemptID taskid, boolean shouldFail)
+      throws IOException;
   
   /**
    * Initialize the Job

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=788922&r1=788921&r2=788922&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Sat Jun 27 03:44:10 2009
@@ -198,6 +198,11 @@
       return null;
     }
 
+    @Override
+    public boolean killTask(TaskAttemptID attemptId, boolean shouldFail) {
+      return true;
+    }
+
     public void initJob(JobInProgress job) {
       // do nothing
     }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=788922&r1=788921&r2=788922&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java Sat Jun 27 03:44:10 2009
@@ -136,6 +136,10 @@
       return null;
     }
 
+    public boolean killTask(TaskAttemptID attemptId, boolean shouldFail) {
+      return true;
+    }
+
     public void initJob(JobInProgress job) {
       JobStatus prevStatus = (JobStatus)job.getStatus().clone();
       try {
@@ -153,6 +157,7 @@
         }
       }
     }
+
     // Test methods
     
     public void submitJob(JobInProgress job) throws IOException {



Mime
View raw message