hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r788922 [1/2] - in /hadoop/mapreduce/trunk: ./ conf/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/java/org/apache/hadoop/map...
Date Sat, 27 Jun 2009 03:44:11 GMT
Author: matei
Date: Sat Jun 27 03:44:10 2009
New Revision: 788922

URL: http://svn.apache.org/viewvc?rev=788922&view=rev
Log:
MAPREDUCE-551. Add preemption to the fair scheduler.


Added:
    hadoop/mapreduce/trunk/conf/fair-scheduler.xml.template
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=788922&r1=788921&r2=788922&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sat Jun 27 03:44:10 2009
@@ -13,6 +13,8 @@
 
   NEW FEATURES
 
+    MAPREDUCE-551. Preemption support in the Fair Scheduler. (Matei Zaharia)
+
     HADOOP-5887. Sqoop should create tables in Hive metastore after importing
     to HDFS. (Aaron Kimball via tomwhite)
 

Added: hadoop/mapreduce/trunk/conf/fair-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/conf/fair-scheduler.xml.template?rev=788922&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/conf/fair-scheduler.xml.template (added)
+++ hadoop/mapreduce/trunk/conf/fair-scheduler.xml.template Sat Jun 27 03:44:10 2009
@@ -0,0 +1,70 @@
+<?xml version="1.0"?>
+
+<!--
+  This is a sample configuration file for the Fair Scheduler. For details
+  on the options, please refer to the fair scheduler documentation at
+  http://hadoop.apache.org/core/docs/r0.21.0/fair_scheduler.html.
+
+  To create your own configuration, copy this file to conf/fair-scheduler.xml
+  and add the following property in mapred-site.xml to point Hadoop to the
+  file, replacing [HADOOP_HOME] with the path to your installation directory:
+    <property>
+      <name>mapred.fairscheduler.allocation.file</name>
+      <value>[HADOOP_HOME]/conf/fair-scheduler.xml</value>
+    </property>
+
+  Note that all the parameters in the configuration file below are optional,
+  including the parameters inside <pool> and <user> elements. It is only
+  necessary to set the ones you want to differ from the defaults.
+-->
+
+<allocations>
+
+  <!-- Example element for configuring a pool -->
+  <pool name="pool1">
+    <!-- Minimum shares of map and reduce slots. Defaults to 0. -->
+    <minMaps>10</minMaps>
+    <minReduces>5</minReduces>
+
+    <!-- Limit on running jobs in the pool. If more jobs are submitted,
+      only the first <maxRunningJobs> will be scheduled at any given time.
+      Defaults to infinity or the global poolMaxJobsDefault value below. -->
+    <maxRunningJobs>5</maxRunningJobs>
+
+    <!-- Number of seconds after which the pool can preempt other pools'
+      tasks to achieve its min share. Requires preemption to be enabled in
+      mapred-site.xml by setting mapred.fairscheduler.preemption to true.
+      Defaults to infinity (no preemption). -->
+    <minSharePreemptionTimeout>300</minSharePreemptionTimeout>
+
+    <!-- Pool's weight in fair sharing calculations. Defaulti is 1.0. -->
+    <weight>1.0</weight>
+  </pool>
+
+  <!-- Example element for configuring a user -->
+  <user name="user1">
+    <!-- Limit on running jobs for the user across all pools. If more
+      jobs than this are submitted, only the first <maxRunningJobs> will
+      be scheduled at any given time. Defaults to infinity or the
+      userMaxJobsDefault value set below. -->
+    <maxRunningJobs>10</maxRunningJobs>
+  </user>
+
+  <!-- Default running job limit pools where it is not explicitly set. -->
+  <poolMaxJobsDefault>20</poolMaxJobsDefault>
+
+  <!-- Default running job limit users where it is not explicitly set. -->
+  <userMaxJobsDefault>10</userMaxJobsDefault>
+
+  <!-- Default min share preemption timeout for pools where it is not
+    explicitly configured, in seconds. Requires mapred.fairscheduler.preemption
+    to be set to true in your mapred-site.xml. -->
+  <defaultMinSharePreemptionTimeout>600</defaultMinSharePreemptionTimeout>
+
+  <!-- Preemption timeout for jobs below their fair share, in seconds. 
+    If a job is below half its fair share for this amount of time, it
+    is allowed to kill tasks from other jobs to go up to its fair share.
+    Requires mapred.fairscheduler.preemption to be true in mapred-site.xml. -->
+  <fairSharePreemptionTimeout>600</fairSharePreemptionTimeout>
+
+</allocations>

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=788922&r1=788921&r2=788922&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Sat Jun 27 03:44:10 2009
@@ -659,6 +659,11 @@
     public QueueManager getQueueManager() {
       return qm;
     }
+
+    @Override
+    public boolean killTask(TaskAttemptID taskid, boolean shouldFail) {
+      return true;
+    }
   }
   
   // represents a fake queue configuration info

Modified: hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java?rev=788922&r1=788921&r2=788922&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java Sat Jun 27 03:44:10 2009
@@ -102,6 +102,9 @@
     }
     public void startTask(String taskTrackerName, final Task t) {
     }
+    public boolean killTask(TaskAttemptID attemptId, boolean shouldFail) {
+      return true;
+    }
     void addQueues(String[] arr) {
       Set<String> queues = new HashSet<String>();
       queues.addAll(Arrays.asList(arr));

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=788922&r1=788921&r2=788922&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Sat Jun 27 03:44:10 2009
@@ -35,7 +35,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpServer;
-import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -44,10 +43,22 @@
  * A {@link TaskScheduler} that implements fair sharing.
  */
 public class FairScheduler extends TaskScheduler {
-  /** How often fair shares are re-calculated */
-  public static final long UPDATE_INTERVAL = 500;
   public static final Log LOG = LogFactory.getLog(
       "org.apache.hadoop.mapred.FairScheduler");
+
+  // How often fair shares are re-calculated
+  protected long updateInterval = 500;
+
+  // How often to dump scheduler state to the event log
+  protected long dumpInterval = 10000;
+  
+  // How often tasks are preempted (must be longer than a couple
+  // of heartbeats to give task-kill commands a chance to act).
+  protected long preemptionInterval = 15000;
+  
+  // Used to iterate through map and reduce task types
+  private static final TaskType[] MAP_AND_REDUCE = 
+    new TaskType[] {TaskType.MAP, TaskType.REDUCE};
   
   protected PoolManager poolMgr;
   protected LoadManager loadMgr;
@@ -62,10 +73,17 @@
   protected boolean assignMultiple; // Simultaneously assign map and reduce?
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected boolean waitForMapsBeforeLaunchingReduces = true;
+  protected boolean preemptionEnabled;
+  protected boolean onlyLogPreemption; // Only log when tasks should be killed
   private Clock clock;
-  private boolean runBackgroundUpdates; // Can be set to false for testing
   private EagerTaskInitializationListener eagerInitListener;
   private JobListener jobListener;
+  private boolean mockMode; // Used for unit tests; disables background updates
+                            // and scheduler event log
+  private FairSchedulerEventLog eventLog;
+  protected long lastDumpTime;       // Time when we last dumped state to log
+  protected long lastHeartbeatTime;  // Time we last ran assignTasks 
+  private long lastPreemptCheckTime; // Time we last ran preemptTasksIfNecessary
   
   /**
    * A class for holding per-job scheduler variables. These always contain the
@@ -86,18 +104,30 @@
     int minReduces = 0;         // Minimum reduces as guaranteed by pool
     double mapFairShare = 0;    // Fair share of map slots at last update
     double reduceFairShare = 0; // Fair share of reduce slots at last update
+    // Variables used for preemption
+    long lastTimeAtMapMinShare;      // When was the job last at its min maps?
+    long lastTimeAtReduceMinShare;   // Similar for reduces.
+    long lastTimeAtMapHalfFairShare; // When was the job last at half fair maps?
+    long lastTimeAtReduceHalfFairShare;  // Similar for reduces.
+    
+    public JobInfo(long currentTime) {
+      lastTimeAtMapMinShare = currentTime;
+      lastTimeAtReduceMinShare = currentTime;
+      lastTimeAtMapHalfFairShare = currentTime;
+      lastTimeAtReduceHalfFairShare = currentTime;
+    }
   }
   
   public FairScheduler() {
-    this(new Clock(), true);
+    this(new Clock(), false);
   }
   
   /**
    * Constructor used for tests, which can change the clock and disable updates.
    */
-  protected FairScheduler(Clock clock, boolean runBackgroundUpdates) {
+  protected FairScheduler(Clock clock, boolean mockMode) {
     this.clock = clock;
-    this.runBackgroundUpdates = runBackgroundUpdates;
+    this.mockMode = mockMode;
     this.jobListener = new JobListener();
   }
 
@@ -105,16 +135,31 @@
   public void start() {
     try {
       Configuration conf = getConf();
-      this.eagerInitListener = new EagerTaskInitializationListener(conf);
-      eagerInitListener.setTaskTrackerManager(taskTrackerManager);
-      eagerInitListener.start();
-      taskTrackerManager.addJobInProgressListener(eagerInitListener);
+      // Create scheduling log and initialize it if it is enabled
+      eventLog = new FairSchedulerEventLog();
+      boolean logEnabled = conf.getBoolean(
+          "mapred.fairscheduler.eventlog.enabled", false);
+      if (!mockMode && logEnabled) {
+        String hostname = "localhost";
+        if (taskTrackerManager instanceof JobTracker) {
+          hostname = ((JobTracker) taskTrackerManager).getJobTrackerMachine();
+        }
+        eventLog.init(conf, hostname);
+      }
+      // Initialize other pieces of the scheduler
       taskTrackerManager.addJobInProgressListener(jobListener);
+      if (!mockMode) {
+        eagerInitListener = new EagerTaskInitializationListener(conf);
+        eagerInitListener.setTaskTrackerManager(taskTrackerManager);
+        eagerInitListener.start();
+        taskTrackerManager.addJobInProgressListener(eagerInitListener);
+      }
       poolMgr = new PoolManager(conf);
       loadMgr = (LoadManager) ReflectionUtils.newInstance(
           conf.getClass("mapred.fairscheduler.loadmanager", 
               CapBasedLoadManager.class, LoadManager.class), conf);
       loadMgr.setTaskTrackerManager(taskTrackerManager);
+      loadMgr.setEventLog(eventLog);
       loadMgr.start();
       taskSelector = (TaskSelector) ReflectionUtils.newInstance(
           conf.getClass("mapred.fairscheduler.taskselector", 
@@ -127,16 +172,27 @@
         weightAdjuster = (WeightAdjuster) ReflectionUtils.newInstance(
             weightAdjClass, conf);
       }
-      assignMultiple = conf.getBoolean("mapred.fairscheduler.assignmultiple",
-          true);
-      sizeBasedWeight = conf.getBoolean("mapred.fairscheduler.sizebasedweight",
-          false);
+      updateInterval = conf.getLong(
+          "mapred.fairscheduler.update.interval", 500);
+      dumpInterval = conf.getLong(
+          "mapred.fairscheduler.dump.interval", 10000);
+      preemptionInterval = conf.getLong(
+          "mapred.fairscheduler.preemption.interval", 15000);
+      assignMultiple = conf.getBoolean(
+          "mapred.fairscheduler.assignmultiple", true);
+      sizeBasedWeight = conf.getBoolean(
+          "mapred.fairscheduler.sizebasedweight", false);
+      preemptionEnabled = conf.getBoolean(
+          "mapred.fairscheduler.preemption", false);
+      onlyLogPreemption = conf.getBoolean(
+          "mapred.fairscheduler.preemption.only.log", false);
       initialized = true;
       running = true;
       lastUpdateTime = clock.getTime();
       // Start a thread to update deficits every UPDATE_INTERVAL
-      if (runBackgroundUpdates)
+      if (!mockMode) {
         new UpdateThread().start();
+      }
       // Register servlet with JobTracker's Jetty server
       if (taskTrackerManager instanceof JobTracker) {
         JobTracker jobTracker = (JobTracker) taskTrackerManager;
@@ -145,6 +201,7 @@
         infoServer.addServlet("scheduler", "/scheduler",
             FairSchedulerServlet.class);
       }
+      eventLog.log("INITIALIZED");
     } catch (Exception e) {
       // Can't load one of the managers - crash the JobTracker now while it is
       // starting up so that the user notices.
@@ -155,11 +212,15 @@
 
   @Override
   public void terminate() throws IOException {
+    if (eventLog != null)
+      eventLog.log("SHUTDOWN");
     running = false;
     if (jobListener != null)
       taskTrackerManager.removeJobInProgressListener(jobListener);
     if (eagerInitListener != null)
       taskTrackerManager.removeJobInProgressListener(eagerInitListener);
+    if (eventLog != null)
+      eventLog.shutdown();
   }
   
   /**
@@ -169,8 +230,9 @@
     @Override
     public void jobAdded(JobInProgress job) {
       synchronized (FairScheduler.this) {
+        eventLog.log("JOB_ADDED", job.getJobID());
         poolMgr.addJob(job);
-        JobInfo info = new JobInfo();
+        JobInfo info = new JobInfo(clock.getTime());
         infos.put(job, info);
         update();
       }
@@ -179,6 +241,7 @@
     @Override
     public void jobRemoved(JobInProgress job) {
       synchronized (FairScheduler.this) {
+        eventLog.log("JOB_REMOVED", job.getJobID());
         poolMgr.removeJob(job);
         infos.remove(job);
       }
@@ -186,6 +249,7 @@
   
     @Override
     public void jobUpdated(JobChangeEvent event) {
+      eventLog.log("JOB_UPDATED", event.getJobInProgress().getJobID());
     }
   }
 
@@ -201,10 +265,12 @@
     public void run() {
       while (running) {
         try {
-          Thread.sleep(UPDATE_INTERVAL);
+          Thread.sleep(updateInterval);
           update();
+          dumpIfNecessary();
+          preemptTasksIfNecessary();
         } catch (Exception e) {
-          LOG.error("Failed to update fair share calculations", e);
+          LOG.error("Exception in fair scheduler UpdateThread", e);
         }
       }
     }
@@ -215,16 +281,22 @@
       throws IOException {
     if (!initialized) // Don't try to assign tasks if we haven't yet started up
       return null;
+    String trackerName = tracker.getTrackerName();
+    eventLog.log("HEARTBEAT", trackerName);
     
     // Reload allocations file if it hasn't been loaded in a while
     poolMgr.reloadAllocsIfNecessary();
     
-    // Compute total runnable maps and reduces
+    // Compute total runnable maps and reduces, and currently running ones
     int runnableMaps = 0;
+    int runningMaps = 0;
     int runnableReduces = 0;
+    int runningReduces = 0;
     for (JobInProgress job: infos.keySet()) {
       runnableMaps += runnableTasks(job, TaskType.MAP);
+      runningMaps += runningTasks(job, TaskType.MAP);
       runnableReduces += runnableTasks(job, TaskType.REDUCE);
+      runningReduces += runningTasks(job, TaskType.REDUCE);
     }
 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
@@ -234,11 +306,19 @@
     int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus);
     int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus);
     
+    eventLog.log("RUNNABLE_TASKS", 
+        runnableMaps, runningMaps, runnableReduces, runningReduces);
+
+    TaskTrackerStatus trackerStatus = tracker.getStatus();
+
     // Scan to see whether any job needs to run a map, then a reduce
     ArrayList<Task> tasks = new ArrayList<Task>();
-    TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE};
-    TaskTrackerStatus trackerStatus = tracker.getStatus();
-    for (TaskType taskType: types) {
+    for (TaskType taskType: MAP_AND_REDUCE) {
+      // Continue if all runnable tasks of this type are already running
+      if (taskType == TaskType.MAP && runningMaps == runnableMaps ||
+          taskType == TaskType.REDUCE && runningReduces == runnableReduces)
+        continue;
+      // Continue if the node can't support another task of the given type
       boolean canAssign = (taskType == TaskType.MAP) ? 
           loadMgr.canAssignMap(trackerStatus, runnableMaps, totalMapSlots) :
           loadMgr.canAssignReduce(trackerStatus, runnableReduces, totalReduceSlots);
@@ -256,10 +336,17 @@
             new FifoJobComparator() : new DeficitComparator(taskType);
         Collections.sort(candidates, comparator);
         for (JobInProgress job: candidates) {
-          Task task = (taskType == TaskType.MAP ? 
-              taskSelector.obtainNewMapTask(trackerStatus, job) :
-              taskSelector.obtainNewReduceTask(trackerStatus, job));
+          eventLog.log("INFO", 
+              "Checking for " + taskType + " task in " + job.getJobID());
+          Task task;
+          if (taskType == TaskType.MAP) {
+            task = taskSelector.obtainNewMapTask(trackerStatus, job);
+          } else {
+            task = taskSelector.obtainNewReduceTask(trackerStatus, job);
+          }
           if (task != null) {
+            eventLog.log("ASSIGN", trackerName, taskType,
+                job.getJobID(), task.getTaskID());
             // Update the JobInfo for this job so we account for the launched
             // tasks during this update interval and don't try to launch more
             // tasks than the job needed on future heartbeats
@@ -271,12 +358,17 @@
               info.runningReduces++;
               info.neededReduces--;
             }
+            // Add task to the list of assignments
             tasks.add(task);
+            // If not allowed to assign multiple tasks per heartbeat, return
             if (!assignMultiple)
               return tasks;
             break;
           }
         }
+      } else {
+        eventLog.log("INFO", 
+            "Can't assign another " + taskType + " to " + trackerName);
       }
     }
     
@@ -302,7 +394,7 @@
       // by deficit so as to put jobs with higher deficit ahead.
       JobInfo j1Info = infos.get(j1);
       JobInfo j2Info = infos.get(j2);
-      long deficitDif;
+      double deficitDif;
       boolean j1Needy, j2Needy;
       if (taskType == TaskType.MAP) {
         j1Needy = j1.runningMaps() < Math.floor(j1Info.minMaps);
@@ -355,6 +447,8 @@
       updateWeights();
       updateMinSlots();
       updateFairShares(clusterStatus);
+      if (preemptionEnabled)
+        updatePreemptionVariables();
       lastUpdateTime = now;
     }
   }
@@ -400,8 +494,10 @@
     for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
       JobInProgress job = entry.getKey();
       JobInfo info = entry.getValue();
-      if (job.getStatus().getRunState() != JobStatus.RUNNING)
-        continue; // Job is still in PREP state and tasks aren't initialized
+      if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+        // Job is still in PREP state and tasks aren't initialized; skip it.
+        continue;
+      }
       // Count maps
       int totalMaps = job.numMapTasks;
       int finishedMaps = 0;
@@ -519,10 +615,7 @@
     // need all its allocation, we leave the leftover slots for general use.
     PoolManager poolMgr = getPoolManager();
     for (Pool pool: poolMgr.getPools()) {
-      for (final TaskType type: TaskType.values()) {
-        if (type != TaskType.MAP && type != TaskType.REDUCE) {
-          continue;
-        }
+      for (final TaskType type: MAP_AND_REDUCE) {
         Set<JobInProgress> jobs = new HashSet<JobInProgress>(pool.getJobs());
         int slotsLeft = poolMgr.getAllocation(pool.getName(), type);
         // Keep assigning slots until none are left
@@ -612,10 +705,7 @@
     // left over. This continues until all jobs' minSlots are less than their
     // fair allocation, and at this point we know that we've met everyone's
     // guarantee and we've split the excess capacity fairly among jobs left.
-    for (TaskType type: TaskType.values()) {
-      if (type != TaskType.MAP && type != TaskType.REDUCE) {
-        continue;
-      }
+    for (TaskType type: MAP_AND_REDUCE) {
       // Select only jobs that still need this type of task
       HashSet<JobInfo> jobsLeft = new HashSet<JobInfo>();
       for (Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
@@ -709,6 +799,228 @@
       clusterStatus.getMaxMapTasks() : clusterStatus.getMaxReduceTasks());
   }
 
+  /**
+   * Update the preemption JobInfo fields for all jobs, i.e. the times since
+   * each job last was at its guaranteed share and at > 1/2 of its fair share
+   * for each type of task.
+   */
+  private void updatePreemptionVariables() {
+    long now = clock.getTime();
+    for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
+      JobInProgress job = entry.getKey();
+      JobInfo info = entry.getValue();
+      if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+        // Job is still in PREP state and tasks aren't initialized. Count it as
+        // both at min and fair share since we shouldn't start any timeouts now.
+        info.lastTimeAtMapMinShare = now;
+        info.lastTimeAtReduceMinShare = now;
+        info.lastTimeAtMapHalfFairShare = now;
+        info.lastTimeAtReduceHalfFairShare = now;
+      } else {
+        if (!isStarvedForMinShare(job, TaskType.MAP))
+          info.lastTimeAtMapMinShare = now;
+        if (!isStarvedForMinShare(job, TaskType.REDUCE))
+          info.lastTimeAtReduceMinShare = now;
+        if (!isStarvedForFairShare(job, TaskType.MAP))
+          info.lastTimeAtMapHalfFairShare = now;
+        if (!isStarvedForFairShare(job, TaskType.REDUCE))
+          info.lastTimeAtReduceHalfFairShare = now;
+      }
+      eventLog.log("PREEMPT_VARS", job.getJobID(),
+          now - info.lastTimeAtMapMinShare,
+          now - info.lastTimeAtMapHalfFairShare);
+    }
+  }
+
+  /**
+   * Is a job below its min share for the given task type?
+   */
+  boolean isStarvedForMinShare(JobInProgress job, TaskType taskType) {
+    return runningTasks(job, taskType) < minTasks(job, taskType);
+  }
+  
+  /**
+   * Is a job being starved for fair share for the given task type?
+   * This is defined as being below half its fair share *and* having a
+   * positive deficit.
+   */
+  boolean isStarvedForFairShare(JobInProgress job, TaskType type) {
+    int desiredFairShare = (int) Math.floor(Math.min(
+        fairTasks(job, type) / 2, neededTasks(job, type)));
+    return (runningTasks(job, type) < desiredFairShare &&
+            deficit(job, type) > 0);
+  }
+  
+  /**
+   * Check for jobs that need tasks preempted, either because they have been
+   * below their guaranteed share for their pool's preemptionTimeout or they
+   * have been below half their fair share for the fairSharePreemptionTimeout.
+   * If such jobs exist, compute how many tasks of each type need to be
+   * preempted and then select the right ones using selectTasksToPreempt.
+   * 
+   * This method computes and logs the number of tasks we want to preempt even
+   * if preemption is disabled, for debugging purposes.
+   */
+  protected void preemptTasksIfNecessary() {
+    if (!preemptionEnabled || useFifo)
+      return;
+    
+    long curTime = clock.getTime();
+    if (curTime - lastPreemptCheckTime < preemptionInterval)
+      return;
+    lastPreemptCheckTime = curTime;
+    
+    // Acquire locks on both the JobTracker (task tracker manager) and this
+    // because we might need to call some JobTracker methods (killTask).
+    synchronized (taskTrackerManager) {
+      synchronized (this) {
+        List<JobInProgress> jobs = new ArrayList<JobInProgress>(infos.keySet());
+        for (TaskType type: MAP_AND_REDUCE) {
+          int tasksToPreempt = 0;
+          for (JobInProgress job: jobs) {
+            tasksToPreempt += tasksToPreempt(job, type, curTime);
+          }
+          if (tasksToPreempt > 0) {
+            eventLog.log("SHOULD_PREEMPT", type, tasksToPreempt);
+            if (!onlyLogPreemption) {
+              // Actually preempt the tasks. The policy for this is to pick
+              // tasks from jobs that are above their min share and have very 
+              // negative deficits (meaning they've been over-scheduled). 
+              // However, we also want to minimize the amount of computation
+              // wasted by preemption, so prefer tasks that started recently.
+              // We go through all jobs in order of deficit (highest first), 
+              // and for each job, we preempt tasks in order of start time 
+              // until we hit either minTasks or fairTasks tasks left (so as
+              // not to create a new starved job).
+              Collections.sort(jobs, new DeficitComparator(type));
+              for (int i = jobs.size() - 1; i >= 0; i--) {
+                JobInProgress job = jobs.get(i);
+                int tasksPreempted = preemptTasks(job, type, tasksToPreempt);
+                tasksToPreempt -= tasksPreempted;
+                if (tasksToPreempt == 0) break;
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Count how many tasks of a given type the job needs to preempt, if any.
+   * If the job has been below its min share for at least its pool's preemption
+   * timeout, it should preempt the difference between its current share and
+   * this min share. If it has been below half its fair share for at least the
+   * fairSharePreemptionTimeout, it should preempt enough tasks to get up to
+   * its full fair share. If both situations hold, we preempt the max of the
+   * two amounts (this shouldn't happen unless someone sets the timeouts to
+   * be identical for some reason).
+   */
+  protected int tasksToPreempt(JobInProgress job, TaskType type, long curTime) {
+    JobInfo info = infos.get(job);
+    if (info == null) return 0;
+    String pool = poolMgr.getPoolName(job);
+    long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool);
+    long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout();
+    int tasksDueToMinShare = 0;
+    int tasksDueToFairShare = 0;
+    if (type == TaskType.MAP) {
+      if (curTime - info.lastTimeAtMapMinShare > minShareTimeout) {
+        tasksDueToMinShare = info.minMaps - info.runningMaps;
+      }
+      if (curTime - info.lastTimeAtMapHalfFairShare > fairShareTimeout) {
+        double fairShare = Math.min(info.mapFairShare, info.neededMaps);
+        tasksDueToFairShare = (int) (fairShare - info.runningMaps);
+      }
+    } else { // type == TaskType.REDUCE
+      if (curTime - info.lastTimeAtReduceMinShare > minShareTimeout) {
+        tasksDueToMinShare = info.minReduces - info.runningReduces;
+      }
+      if (curTime - info.lastTimeAtReduceHalfFairShare > fairShareTimeout) {
+        double fairShare = Math.min(info.reduceFairShare, info.neededReduces);
+        tasksDueToFairShare = (int) (fairShare - info.runningReduces);
+      }
+    }
+    int tasksToPreempt = Math.max(tasksDueToMinShare, tasksDueToFairShare);
+    if (tasksToPreempt > 0) {
+      String message = "Should preempt " + tasksToPreempt + " " 
+          + type + " tasks for " + job.getJobID() 
+          + ": tasksDueToMinShare = " + tasksDueToMinShare
+          + ", tasksDueToFairShare = " + tasksDueToFairShare;
+      eventLog.log("INFO", message);
+      LOG.info(message);
+    }
+    return tasksToPreempt;
+  }
+
+  /**
+   * Preempt up to maxToPreempt tasks of the given type from the given job,
+   * without having it go below its min share or below half its fair share.
+   * Selects the tasks so as to preempt the least recently launched one first,
+   * thus minimizing wasted compute time. Returns the number of tasks preempted.
+   */
+  private int preemptTasks(JobInProgress job, TaskType type, int maxToPreempt) {
+    // Figure out how many tasks to preempt. NOTE: We use the runningTasks, etc
+    // values in JobInfo rather than re-counting them, but this should be safe
+    // because we are being called only inside update(), which has a lock on
+    // the JobTracker, so all the values are fresh.
+    int desiredFairShare = (int) Math.floor(Math.min(
+        fairTasks(job, type) / 2, neededTasks(job, type)));
+    int tasksToLeave = Math.max(minTasks(job, type), desiredFairShare);
+    int tasksToPreempt = Math.min(
+        maxToPreempt, runningTasks(job, type) - tasksToLeave);
+    if (tasksToPreempt == 0)
+      return 0;
+    // Create a list of all running TaskInProgress'es in the job
+    List<TaskInProgress> tips = new ArrayList<TaskInProgress>();
+    if (type == TaskType.MAP) {
+      // Jobs may have both "non-local maps" which have a split with no locality
+      // info (e.g. the input file is not in HDFS), and maps with locality info,
+      // which are stored in the runningMapCache map from location to task list
+      tips.addAll(job.nonLocalRunningMaps);
+      for (Set<TaskInProgress> set: job.runningMapCache.values()) {
+        tips.addAll(set);
+      }
+    }
+    else {
+      tips.addAll(job.runningReduces);
+    }
+    // Get the active TaskStatus'es for each TaskInProgress (there may be
+    // more than one if the task has multiple copies active due to speculation)
+    List<TaskStatus> statuses = new ArrayList<TaskStatus>();
+    for (TaskInProgress tip: tips) {
+      for (TaskAttemptID id: tip.getActiveTasks().keySet()) {
+        statuses.add(tip.getTaskStatus(id));
+      }
+    }
+    // Sort the statuses in order of start time, with the latest launched first
+    Collections.sort(statuses, new Comparator<TaskStatus>() {
+      public int compare(TaskStatus t1, TaskStatus t2) {
+        return (int) Math.signum(t2.getStartTime() - t1.getStartTime());
+      }
+    });
+    // Preempt the tasks in order of start time until we've done enough
+    int numKilled = 0;
+    for (int i = 0; i < tasksToPreempt; i++) {
+      if (i > statuses.size() - tasksToLeave) {
+        // Sanity check in case we computed maxToPreempt incorrectly due to
+        // stale data in JobInfos. Shouldn't happen if we are called from update.
+        LOG.error("Stale task counts in the JobInfos in preemptTasks - "
+            + "probaly due to calling preemptTasks() from outside update(). ");
+        break;
+      }
+      TaskStatus status = statuses.get(i);
+      eventLog.log("PREEMPT", status.getTaskID(), status.getTaskTracker());
+      try {
+        taskTrackerManager.killTask(status.getTaskID(), false);
+        numKilled++;
+      } catch (IOException e) {
+        LOG.error("Failed to kill task " + status.getTaskID(), e);
+      }
+    }
+    return numKilled;
+  }
+
   public synchronized boolean getUseFifo() {
     return useFifo;
   }
@@ -742,6 +1054,12 @@
     return (type == TaskType.MAP) ? info.minMaps : info.minReduces;
   }
 
+  protected double fairTasks(JobInProgress job, TaskType type) {
+    JobInfo info = infos.get(job);
+    if (info == null) return 0;
+    return (type == TaskType.MAP) ? info.mapFairShare : info.reduceFairShare;
+  }
+
   protected double weight(JobInProgress job, TaskType taskType) {
     JobInfo info = infos.get(job);
     if (info == null) return 0;
@@ -765,4 +1083,70 @@
     Pool myJobPool = poolMgr.getPool(queueName);
     return myJobPool.getJobs();
   }
+
+  protected void dumpIfNecessary() {
+    long now = clock.getTime();
+    long timeDelta = now - lastDumpTime;
+    if (timeDelta > dumpInterval && eventLog.isEnabled()) {
+      dump();
+      lastDumpTime = now;
+    }
+  }
+
+  /**
+   * Dump scheduler state to the fairscheduler log.
+   */
+  private synchronized void dump() {
+    synchronized (eventLog) {
+      eventLog.log("BEGIN_DUMP");
+      // List jobs in order of submit time
+      ArrayList<JobInProgress> jobs = 
+        new ArrayList<JobInProgress>(infos.keySet());
+      Collections.sort(jobs, new Comparator<JobInProgress>() {
+        public int compare(JobInProgress j1, JobInProgress j2) {
+          return (int) Math.signum(j1.getStartTime() - j2.getStartTime());
+        }
+      });
+      // Dump info for each job
+      for (JobInProgress job: jobs) {
+        JobProfile profile = job.getProfile();
+        JobInfo info = infos.get(job);
+        eventLog.log("JOB",
+            profile.getJobID(), profile.name, profile.user,
+            job.getPriority(), poolMgr.getPoolName(job),
+            job.numMapTasks, info.runningMaps, info.neededMaps, 
+            info.mapFairShare, info.mapWeight, info.mapDeficit,
+            job.numReduceTasks, info.runningReduces, info.neededReduces, 
+            info.reduceFairShare, info.reduceWeight, info.reduceDeficit);
+      }
+      // List pools in alphabetical order
+      List<Pool> pools = new ArrayList<Pool>(poolMgr.getPools());
+      Collections.sort(pools, new Comparator<Pool>() {
+        public int compare(Pool p1, Pool p2) {
+          if (p1.isDefaultPool())
+            return 1;
+          else if (p2.isDefaultPool())
+            return -1;
+          else return p1.getName().compareTo(p2.getName());
+        }});
+      for (Pool pool: pools) {
+        int runningMaps = 0;
+        int runningReduces = 0;
+        for (JobInProgress job: pool.getJobs()) {
+          JobInfo info = infos.get(job);
+          if (info != null) {
+            runningMaps += info.runningMaps;
+            runningReduces += info.runningReduces;
+          }
+        }
+        String name = pool.getName();
+        eventLog.log("POOL",
+            name, poolMgr.getPoolWeight(name), pool.getJobs().size(),
+            poolMgr.getAllocation(name, TaskType.MAP), runningMaps,
+            poolMgr.getAllocation(name, TaskType.REDUCE), runningReduces);
+      }
+      // Dump info for each pool
+      eventLog.log("END_DUMP");
+    }
+  }
 }

Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java?rev=788922&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java Sat Jun 27 03:44:10 2009
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.DailyRollingFileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * Event log used by the fair scheduler for machine-readable debug info.
+ * This class uses a log4j rolling file appender to write the log, but uses
+ * a custom tab-separated event format of the form:
+ * <pre>
+ * DATE    EVENT_TYPE   PARAM_1   PARAM_2   ...
+ * </pre>
+ * Various event types are used by the fair scheduler. The purpose of logging
+ * in this format is to enable tools to parse the history log easily and read
+ * internal scheduler variables, rather than trying to make the log human
+ * readable. The fair scheduler also logs human readable messages in the
+ * JobTracker's main log.
+ * 
+ * Constructing this class creates a disabled log. It must be initialized
+ * using {@link FairSchedulerEventLog#init(Configuration, String)} to begin
+ * writing to the file.
+ */
+class FairSchedulerEventLog {
+  private static final Log LOG = LogFactory.getLog(
+    "org.apache.hadoop.mapred.FairSchedulerEventLog");
+  
+  /** Set to true if logging is disabled due to an error. */
+  private boolean logDisabled = true;
+  
+  /**
+   * Log directory, set by mapred.fairscheduler.eventlog.location in conf file;
+   * defaults to {hadoop.log.dir}/fairscheduler.
+   */
+  private String logDir;
+  
+  /** 
+   * Active log file, which is {LOG_DIR}/hadoop-{user}-fairscheduler.{host}.log.
+   * Older files are also stored as {LOG_FILE}.date (date format YYYY-MM-DD).
+   */ 
+  private String logFile;
+  
+  /** Log4j appender used to write to the log file */
+  private DailyRollingFileAppender appender;
+
+  boolean init(Configuration conf, String jobtrackerHostname) {
+    try {
+      logDir = conf.get("mapred.fairscheduler.eventlog.location",
+          new File(System.getProperty("hadoop.log.dir")).getAbsolutePath()
+          + File.separator + "fairscheduler");
+      Path logDirPath = new Path(logDir);
+      FileSystem fs = logDirPath.getFileSystem(conf);
+      if (!fs.exists(logDirPath)) {
+        if (!fs.mkdirs(logDirPath)) {
+          throw new IOException(
+              "Mkdirs failed to create " + logDirPath.toString());
+        }
+      }
+      String username = System.getProperty("user.name");
+      logFile = String.format("%s%shadoop-%s-fairscheduler-%s.log",
+          logDir, File.separator, username, jobtrackerHostname);
+      logDisabled = false;
+      PatternLayout layout = new PatternLayout("%d{ISO8601}\t%m%n");
+      appender = new DailyRollingFileAppender(layout, logFile, "'.'yyyy-MM-dd");
+      appender.activateOptions();
+      LOG.info("Initialized fair scheduler event log, logging to " + logFile);
+    } catch (IOException e) {
+      LOG.error(
+          "Failed to initialize fair scheduler event log. Disabling it.", e);
+      logDisabled = true;
+    }
+    return !(logDisabled);
+  }
+  
+  /**
+   * Log an event, writing a line in the log file of the form
+   * <pre>
+   * DATE    EVENT_TYPE   PARAM_1   PARAM_2   ...
+   * </pre>
+   */
+  synchronized void log(String eventType, Object... params) {
+    try {
+      if (logDisabled)
+        return;
+      StringBuffer buffer = new StringBuffer();
+      buffer.append(eventType);
+      for (Object param: params) {
+        buffer.append("\t");
+        buffer.append(param);
+      }
+      String message = buffer.toString();
+      Logger logger = Logger.getLogger(getClass());
+      appender.append(new LoggingEvent("", logger, Level.INFO, message, null));
+    } catch (Exception e) {
+      LOG.error("Failed to append to fair scheduler event log", e);
+      logDisabled = true;
+    }
+  }
+  
+  /**
+   * Flush and close the log.
+   */
+  void shutdown() {
+    try {
+      if (appender != null)
+        appender.close();
+    } catch (Exception e) {}
+    logDisabled = true;
+  }
+
+  boolean isEnabled() {
+    return !logDisabled;
+  }
+}

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java?rev=788922&r1=788921&r2=788922&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java Sat Jun 27 03:44:10 2009
@@ -218,7 +218,7 @@
         JobProfile profile = job.getProfile();
         JobInfo info = scheduler.infos.get(job);
         if (info == null) { // Job finished, but let's show 0's for info
-          info = new JobInfo();
+          info = new JobInfo(0);
         }
         out.print("<tr>\n");
         out.printf("<td>%s</td>\n", DATE_FORMAT.format(

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java?rev=788922&r1=788921&r2=788922&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java Sat Jun 27 03:44:10 2009
@@ -30,6 +30,7 @@
 public abstract class LoadManager implements Configurable {
   protected Configuration conf;
   protected TaskTrackerManager taskTrackerManager;
+  protected FairSchedulerEventLog schedulingLog;
   
   public Configuration getConf() {
     return conf;
@@ -43,6 +44,10 @@
       TaskTrackerManager taskTrackerManager) {
     this.taskTrackerManager = taskTrackerManager;
   }
+
+  public void setEventLog(FairSchedulerEventLog schedulingLog) {
+    this.schedulingLog = schedulingLog;
+  }
   
   /**
    * Lifecycle method to allow the LoadManager to start any work in separate

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=788922&r1=788921&r2=788922&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Sat Jun 27 03:44:10 2009
@@ -43,7 +43,8 @@
 import org.xml.sax.SAXException;
 
 /**
- * Maintains a hierarchy of pools.
+ * Maintains a list of pools as well as scheduling parameters for each pool,
+ * such as guaranteed share allocations, from the fair scheduler config file.
  */
 public class PoolManager {
   public static final Log LOG = LogFactory.getLog(
@@ -72,9 +73,23 @@
   private int userMaxJobsDefault = Integer.MAX_VALUE;
   private int poolMaxJobsDefault = Integer.MAX_VALUE;
 
+  // Min share preemption timeout for each pool in seconds. If a job in the pool
+  // waits this long without receiving its guaranteed share, it is allowed to
+  // preempt other jobs' tasks.
+  private Map<String, Long> minSharePreemptionTimeouts =
+    new HashMap<String, Long>();
+  
+  // Default min share preemption timeout for pools where it is not set
+  // explicitly.
+  private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+  
+  // Preemption timeout for jobs below fair share in seconds. If a job remains
+  // below half its fair share for this long, it is allowed to preempt tasks.
+  private long fairSharePreemptionTimeout = Long.MAX_VALUE;
+  
   private String allocFile; // Path to XML file containing allocations
   private String poolNameProperty; // Jobconf property to use for determining a
-                                   // job's pool name (default: mapred.job.queue.name)
+                                   // job's pool name (default: user.name)
   
   private Map<String, Pool> pools = new HashMap<String, Pool>();
   
@@ -133,7 +148,7 @@
         // We log the error only on the first failure so we don't fill up the
         // JobTracker's log with these messages.
         if (!lastReloadAttemptFailed) {
-          LOG.error("Failed to reload allocations file - " +
+          LOG.error("Failed to reload fair scheduler config file - " +
               "will use existing allocations.", e);
         }
         lastReloadAttemptFailed = true;
@@ -168,8 +183,11 @@
     Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
     Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
     Map<String, Double> poolWeights = new HashMap<String, Double>();
+    Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
     int userMaxJobsDefault = Integer.MAX_VALUE;
     int poolMaxJobsDefault = Integer.MAX_VALUE;
+    long fairSharePreemptionTimeout = Long.MAX_VALUE;
+    long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
     
     // Remember all pool names so we can display them on web UI, etc.
     List<String> poolNamesInAllocFile = new ArrayList<String>();
@@ -182,8 +200,8 @@
     Document doc = builder.parse(new File(allocFile));
     Element root = doc.getDocumentElement();
     if (!"allocations".equals(root.getTagName()))
-      throw new AllocationConfigurationException("Bad allocations file: " + 
-          "top-level element not <allocations>");
+      throw new AllocationConfigurationException("Bad fair scheduler config " + 
+          "file: top-level element not <allocations>");
     NodeList elements = root.getChildNodes();
     for (int i = 0; i < elements.getLength(); i++) {
       Node node = elements.item(i);
@@ -215,7 +233,11 @@
             String text = ((Text)field.getFirstChild()).getData().trim();
             double val = Double.parseDouble(text);
             poolWeights.put(poolName, val);
-          }
+          } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            long val = Long.parseLong(text) * 1000L;
+            minSharePreemptionTimeouts.put(poolName, val);
+          } 
         }
       } else if ("user".equals(element.getTagName())) {
         String userName = element.getAttribute("name");
@@ -239,6 +261,14 @@
         String text = ((Text)element.getFirstChild()).getData().trim();
         int val = Integer.parseInt(text);
         poolMaxJobsDefault = val;
+      } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        long val = Long.parseLong(text) * 1000L;
+        fairSharePreemptionTimeout = val;
+      } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        long val = Long.parseLong(text) * 1000L;
+        defaultMinSharePreemptionTimeout = val;
       } else {
         LOG.warn("Bad element in allocations file: " + element.getTagName());
       }
@@ -251,9 +281,12 @@
       this.reduceAllocs = reduceAllocs;
       this.poolMaxJobs = poolMaxJobs;
       this.userMaxJobs = userMaxJobs;
+      this.poolWeights = poolWeights;
+      this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
       this.userMaxJobsDefault = userMaxJobsDefault;
       this.poolMaxJobsDefault = poolMaxJobsDefault;
-      this.poolWeights = poolWeights;
+      this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
+      this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
       for (String name: poolNamesInAllocFile) {
         getPool(name);
       }
@@ -306,7 +339,7 @@
    * "mapred.fairscheduler.poolnameproperty".
    */
   public String getPoolName(JobInProgress job) {
-    JobConf conf = job.getJobConf();
+    Configuration conf = job.getJobConf();
     return conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME).trim();
   }
 
@@ -346,4 +379,26 @@
       return 1.0;
     }
   }
+
+  /**
+   * Get a pool's min share preemption timeout, in milliseconds. This is the
+   * time after which jobs in the pool may kill other pools' tasks if they
+   * are below their min share.
+   */
+  public long getMinSharePreemptionTimeout(String pool) {
+    if (minSharePreemptionTimeouts.containsKey(pool)) {
+      return minSharePreemptionTimeouts.get(pool);
+    } else {
+      return defaultMinSharePreemptionTimeout;
+    }
+  }
+  
+  /**
+   * Get the fair share preemption, in milliseconds. This is the time
+   * after which any job may kill other jobs' tasks if it is below half
+   * its fair share.
+   */
+  public long getFairSharePreemptionTimeout() {
+    return fairSharePreemptionTimeout;
+  }
 }



Mime
View raw message