hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1169585 [3/5] - in /hadoop/common/branches/branch-0.20-security: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/ src/co...
Date Sun, 11 Sep 2011 23:57:38 GMT
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class JobSchedulable extends Schedulable {
+  private FairScheduler scheduler;
+  private JobInProgress job;
+  private TaskType taskType;
+  private int demand = 0;
+
+  public JobSchedulable(FairScheduler scheduler, JobInProgress job, 
+      TaskType taskType) {
+    this.scheduler = scheduler;
+    this.job = job;
+    this.taskType = taskType;
+    
+    initMetrics();
+  }
+  
+  @Override
+  public TaskType getTaskType() {
+    return taskType;
+  }
+  
+  @Override
+  public String getName() {
+    return job.getJobID().toString();
+  }
+
+  public JobInProgress getJob() {
+    return job;
+  }
+  
+  @Override
+  public void updateDemand() {
+    demand = 0;
+    if (isRunnable()) {
+      // For reduces, make sure enough maps are done that reduces can launch
+      if (taskType == TaskType.REDUCE && !job.scheduleReduces())
+        return;
+      // Add up demand from each TaskInProgress; each TIP can either
+      // - have no attempts running, in which case it demands 1 slot
+      // - have N attempts running, in which case it demands N slots, and may
+      //   potentially demand one more slot if it needs to be speculated
+      TaskInProgress[] tips = (taskType == TaskType.MAP ? 
+          job.getTasks(TaskType.MAP) : job.getTasks(TaskType.REDUCE));
+      boolean speculationEnabled = (taskType == TaskType.MAP ?
+          job.getMapSpeculativeExecution() : job.getReduceSpeculativeExecution());
+      double avgProgress = (taskType == TaskType.MAP ?
+          job.getStatus().mapProgress() : job.getStatus().reduceProgress());
+      long time = scheduler.getClock().getTime();
+      for (TaskInProgress tip: tips) {
+        if (!tip.isComplete()) {
+          if (tip.isRunning()) {
+            // Count active tasks and any speculative task we want to launch
+            demand += tip.getActiveTasks().size();
+            if (speculationEnabled && tip.hasSpeculativeTask(time, avgProgress))
+              demand += 1;
+          } else {
+            // Need to launch 1 task
+            demand += 1;
+          }
+        }
+      }
+    }
+  }
+
+  private boolean isRunnable() {
+    JobInfo info = scheduler.getJobInfo(job);
+    int runState = job.getStatus().getRunState();
+    return (info != null && info.runnable && runState == JobStatus.RUNNING);
+  }
+
+  @Override
+  public int getDemand() {
+    return demand;
+  }
+  
+  @Override
+  public void redistributeShare() {}
+
+  @Override
+  public JobPriority getPriority() {
+    return job.getPriority();
+  }
+
+  @Override
+  public int getRunningTasks() {
+    if (!job.inited()) {
+      return 0;
+    }
+    return taskType == TaskType.MAP ? job.runningMaps() : job.runningReduces();
+  }
+
+  @Override
+  public long getStartTime() {
+    return job.startTime;
+  }
+  
+  @Override
+  public double getWeight() {
+    return scheduler.getJobWeight(job, taskType);
+  }
+  
+  @Override
+  public int getMinShare() {
+    return 0;
+  }
+
+  @Override
+  public Task assignTask(TaskTrackerStatus tts, long currentTime,
+      Collection<JobInProgress> visited) throws IOException {
+    if (isRunnable()) {
+      visited.add(job);
+      TaskTrackerManager ttm = scheduler.taskTrackerManager;
+      ClusterStatus clusterStatus = ttm.getClusterStatus();
+      int numTaskTrackers = clusterStatus.getTaskTrackers();
+
+      // check with the load manager whether it is safe to 
+      // launch this task on this taskTracker.
+      LoadManager loadMgr = scheduler.getLoadManager();
+      if (!loadMgr.canLaunchTask(tts, job, taskType)) {
+        return null;
+      }
+      if (taskType == TaskType.MAP) {
+        LocalityLevel localityLevel = scheduler.getAllowedLocalityLevel(
+            job, currentTime);
+        scheduler.getEventLog().log(
+            "ALLOWED_LOC_LEVEL", job.getJobID(), localityLevel);
+        switch (localityLevel) {
+          case NODE:
+            return job.obtainNewNodeLocalMapTask(tts, numTaskTrackers,
+                ttm.getNumberOfUniqueHosts());
+          case RACK:
+            return job.obtainNewNodeOrRackLocalMapTask(tts, numTaskTrackers,
+                ttm.getNumberOfUniqueHosts());
+          default:
+            return job.obtainNewMapTask(tts, numTaskTrackers,
+                ttm.getNumberOfUniqueHosts());
+        }
+      } else {
+        return job.obtainNewReduceTask(tts, numTaskTrackers,
+            ttm.getNumberOfUniqueHosts());
+      }
+    } else {
+      return null;
+    }
+  }
+
+  
+  @Override
+  protected String getMetricsContextName() {
+    return "jobs";
+  }
+  
+  @Override
+  void updateMetrics() {
+    assert metrics != null;
+    
+    super.setMetricValues(metrics);
+    metrics.update();
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java Sun Sep 11 23:57:37 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskType;
 
 /**
  * A pluggable object that manages the load on each {@link TaskTracker}, telling
@@ -30,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
 public abstract class LoadManager implements Configurable {
   protected Configuration conf;
   protected TaskTrackerManager taskTrackerManager;
+  protected FairSchedulerEventLog schedulingLog;
   
   public Configuration getConf() {
     return conf;
@@ -43,6 +45,10 @@ public abstract class LoadManager implem
       TaskTrackerManager taskTrackerManager) {
     this.taskTrackerManager = taskTrackerManager;
   }
+
+  public void setEventLog(FairSchedulerEventLog schedulingLog) {
+    this.schedulingLog = schedulingLog;
+  }
   
   /**
    * Lifecycle method to allow the LoadManager to start any work in separate
@@ -61,6 +67,8 @@ public abstract class LoadManager implem
   
   /**
    * Can a given {@link TaskTracker} run another map task?
+   * This method may check whether the specified tracker has
+   * enough resources to run another map task.
    * @param tracker The machine we wish to run a new map on
    * @param totalRunnableMaps Set of running jobs in the cluster
    * @param totalMapSlots The total number of map slots in the cluster
@@ -71,6 +79,8 @@ public abstract class LoadManager implem
 
   /**
    * Can a given {@link TaskTracker} run another reduce task?
+   * This method may check whether the specified tracker has
+   * enough resources to run another reduce task.
    * @param tracker The machine we wish to run a new map on
    * @param totalRunnableReduces Set of running jobs in the cluster
    * @param totalReduceSlots The total number of reduce slots in the cluster
@@ -78,4 +88,16 @@ public abstract class LoadManager implem
    */
   public abstract boolean canAssignReduce(TaskTrackerStatus tracker,
       int totalRunnableReduces, int totalReduceSlots);
+
+  /**
+   * Can a given {@link TaskTracker} run another new task from a given job? 
+   * This method is provided for use by LoadManagers that take into 
+   * account jobs' individual resource needs when placing tasks.
+   * @param tracker The machine we wish to run a new map on
+   * @param job The job from which we want to run a task on this machine
+   * @param type The type of task that we want to run on
+   * @return true if this task can be launched on <code>tracker</code>
+   */
+  public abstract boolean canLaunchTask(TaskTrackerStatus tracker,
+      JobInProgress job,  TaskType type);
 }

Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * Represents the level of data-locality at which a job in the fair scheduler
+ * is allowed to launch tasks. By default, jobs are not allowed to launch
+ * non-data-local tasks until they have waited a small number of seconds to
+ * find a slot on a node that they have data on. If a job has waited this
+ * long, it is allowed to launch rack-local tasks as well (on nodes that may
+ * not have the task's input data, but share a rack with a node that does).
+ * Finally, after a further wait, jobs are allowed to launch tasks anywhere
+ * in the cluster.
+ * 
+ * This enum defines three levels - NODE, RACK and ANY (for allowing tasks
+ * to be launched on any node). A map task's level can be obtained from
+ * its job through {@link #fromTask(JobInProgress, Task, TaskTrackerStatus)}. In
+ * addition, for any locality level, it is possible to get a "level cap" to pass
+ * to {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int, int, int)}
+ * to ensure that only tasks at this level or lower are launched, through
+ * the {@link #toCacheLevelCap()} method.
+ */
+public enum LocalityLevel {
+  NODE, RACK, ANY;
+  
+  public static LocalityLevel fromTask(JobInProgress job, Task mapTask,
+      TaskTrackerStatus tracker) {
+    TaskID tipID = mapTask.getTaskID().getTaskID();
+    TaskInProgress tip = job.getTaskInProgress(tipID);
+    switch (job.getLocalityLevel(tip, tracker)) {
+    case 0: return LocalityLevel.NODE;
+    case 1: return LocalityLevel.RACK;
+    default: return LocalityLevel.ANY;
+    }
+  }
+  
+  /**
+   * Obtain a JobInProgress cache level cap to pass to
+   * {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int, int, int)}
+   * to ensure that only tasks of this locality level and lower are launched.
+   */
+  public int toCacheLevelCap() {
+    switch(this) {
+    case NODE: return 1;
+    case RACK: return 2;
+    default: return Integer.MAX_VALUE;
+    }
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java Sun Sep 11 23:57:37 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.TaskType;
 
 /**
  * A {@link WeightAdjuster} implementation that gives a weight boost to new jobs

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java Sun Sep 11 23:57:37 2011
@@ -21,6 +21,9 @@ package org.apache.hadoop.mapred;
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.metrics.MetricsContext;
+
 /**
  * A schedulable pool of jobs.
  */
@@ -33,9 +36,17 @@ public class Pool {
   
   /** Jobs in this specific pool; does not include children pools' jobs. */
   private Collection<JobInProgress> jobs = new ArrayList<JobInProgress>();
+  
+  /** Scheduling mode for jobs inside the pool (fair or FIFO) */
+  private SchedulingMode schedulingMode;
 
-  public Pool(String name) {
+  private PoolSchedulable mapSchedulable;
+  private PoolSchedulable reduceSchedulable;
+
+  public Pool(FairScheduler scheduler, String name) {
     this.name = name;
+    mapSchedulable = new PoolSchedulable(scheduler, this, TaskType.MAP);
+    reduceSchedulable = new PoolSchedulable(scheduler, this, TaskType.REDUCE);
   }
   
   public Collection<JobInProgress> getJobs() {
@@ -44,17 +55,46 @@ public class Pool {
   
   public void addJob(JobInProgress job) {
     jobs.add(job);
+    mapSchedulable.addJob(job);
+    reduceSchedulable.addJob(job);
   }
   
   public void removeJob(JobInProgress job) {
     jobs.remove(job);
+    mapSchedulable.removeJob(job);
+    reduceSchedulable.removeJob(job);
   }
   
   public String getName() {
     return name;
   }
 
+  public SchedulingMode getSchedulingMode() {
+    return schedulingMode;
+  }
+  
+  public void setSchedulingMode(SchedulingMode schedulingMode) {
+    this.schedulingMode = schedulingMode;
+  }
+
   public boolean isDefaultPool() {
     return Pool.DEFAULT_POOL_NAME.equals(name);
   }
+  
+  public PoolSchedulable getMapSchedulable() {
+    return mapSchedulable;
+  }
+  
+  public PoolSchedulable getReduceSchedulable() {
+    return reduceSchedulable;
+  }
+  
+  public PoolSchedulable getSchedulable(TaskType type) {
+    return type == TaskType.MAP ? mapSchedulable : reduceSchedulable;
+  }
+
+  public void updateMetrics() {
+    mapSchedulable.updateMetrics();
+    reduceSchedulable.updateMetrics();
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Sun Sep 11 23:57:37 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.mapred;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URL;
+import java.net.URLConnection;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -34,6 +36,8 @@ import javax.xml.parsers.ParserConfigura
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.metrics.MetricsContext;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
@@ -42,7 +46,8 @@ import org.w3c.dom.Text;
 import org.xml.sax.SAXException;
 
 /**
- * Maintains a hierarchy of pools.
+ * Maintains a list of pools as well as scheduling parameters for each pool,
+ * such as guaranteed share allocations, from the fair scheduler config file.
  */
 public class PoolManager {
   public static final Log LOG = LogFactory.getLog(
@@ -56,11 +61,19 @@ public class PoolManager {
    * (this is done to prevent loading a file that hasn't been fully written).
    */
   public static final long ALLOC_RELOAD_WAIT = 5 * 1000; 
+
+  public static final String EXPLICIT_POOL_PROPERTY = "mapred.fairscheduler.pool";
+
+  private final FairScheduler scheduler;
   
   // Map and reduce minimum allocations for each pool
   private Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
   private Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
 
+  // If set, cap number of map and reduce tasks in a pool
+  private Map<String, Integer> poolMaxMaps = new HashMap<String, Integer>();
+  private Map<String, Integer> poolMaxReduces = new HashMap<String, Integer>();
+
   // Sharing weights for each pool
   private Map<String, Double> poolWeights = new HashMap<String, Double>();
   
@@ -69,10 +82,31 @@ public class PoolManager {
   private Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
   private Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
   private int userMaxJobsDefault = Integer.MAX_VALUE;
+  private int poolMaxJobsDefault = Integer.MAX_VALUE;
 
-  private String allocFile; // Path to XML file containing allocations
+  // Min share preemption timeout for each pool in seconds. If a job in the pool
+  // waits this long without receiving its guaranteed share, it is allowed to
+  // preempt other jobs' tasks.
+  private Map<String, Long> minSharePreemptionTimeouts =
+    new HashMap<String, Long>();
+  
+  // Default min share preemption timeout for pools where it is not set
+  // explicitly.
+  private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+  
+  // Preemption timeout for jobs below fair share in seconds. If a job remains
+  // below half its fair share for this long, it is allowed to preempt tasks.
+  private long fairSharePreemptionTimeout = Long.MAX_VALUE;
+  
+  SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
+  
+  private Object allocFile; // Path to XML file containing allocations. This
+                            // is either a URL to specify a classpath resource
+                            // (if the fair-scheduler.xml on the classpath is
+                            // used) or a String to specify an absolute path (if
+                            // mapred.fairscheduler.allocation.file is used).
   private String poolNameProperty; // Jobconf property to use for determining a
-                                   // job's pool name (default: mapred.job.queue.name)
+                                   // job's pool name (default: user.name)
   
   private Map<String, Pool> pools = new HashMap<String, Pool>();
   
@@ -80,14 +114,25 @@ public class PoolManager {
   private long lastSuccessfulReload; // Last time we successfully reloaded pools
   private boolean lastReloadAttemptFailed = false;
 
-  public PoolManager(Configuration conf) throws IOException, SAXException,
+  public PoolManager(FairScheduler scheduler) {
+    this.scheduler = scheduler;
+  }
+  
+  public void initialize() throws IOException, SAXException,
       AllocationConfigurationException, ParserConfigurationException {
+    Configuration conf = scheduler.getConf();
     this.poolNameProperty = conf.get(
         "mapred.fairscheduler.poolnameproperty", "user.name");
     this.allocFile = conf.get("mapred.fairscheduler.allocation.file");
     if (allocFile == null) {
-      LOG.warn("No mapred.fairscheduler.allocation.file given in jobconf - " +
-          "the fair scheduler will not use any queues.");
+      // No allocation file specified in jobconf. Use the default allocation
+      // file, fair-scheduler.xml, looking for it on the classpath.
+      allocFile = new Configuration().getResource("fair-scheduler.xml");
+      if (allocFile == null) {
+        LOG.error("The fair scheduler allocation file fair-scheduler.xml was "
+            + "not found on the classpath, and no other config file is given "
+            + "through mapred.fairscheduler.allocation.file.");
+      }
     }
     reloadAllocs();
     lastSuccessfulReload = System.currentTimeMillis();
@@ -102,11 +147,19 @@ public class PoolManager {
   public synchronized Pool getPool(String name) {
     Pool pool = pools.get(name);
     if (pool == null) {
-      pool = new Pool(name);
+      pool = new Pool(scheduler, name);
+      pool.setSchedulingMode(defaultSchedulingMode);
       pools.put(name, pool);
     }
     return pool;
   }
+  
+  /**
+   * Get the pool that a given job is in.
+   */
+  public Pool getPool(JobInProgress job) {
+    return getPool(getPoolName(job));
+  }
 
   /**
    * Reload allocations file if it hasn't been loaded in a while
@@ -115,9 +168,20 @@ public class PoolManager {
     long time = System.currentTimeMillis();
     if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) {
       lastReloadAttempt = time;
+      if (null == allocFile) {
+        return;
+      }
       try {
-        File file = new File(allocFile);
-        long lastModified = file.lastModified();
+        // Get last modified time of alloc file depending whether it's a String
+        // (for a path name) or an URL (for a classloader resource)
+        long lastModified;
+        if (allocFile instanceof String) {
+          File file = new File((String) allocFile);
+          lastModified = file.lastModified();
+        } else { // allocFile is an URL
+          URLConnection conn = ((URL) allocFile).openConnection();
+          lastModified = conn.getLastModified();
+        }
         if (lastModified > lastSuccessfulReload &&
             time > lastModified + ALLOC_RELOAD_WAIT) {
           reloadAllocs();
@@ -131,7 +195,7 @@ public class PoolManager {
         // We log the error only on the first failure so we don't fill up the
         // JobTracker's log with these messages.
         if (!lastReloadAttemptFailed) {
-          LOG.error("Failed to reload allocations file - " +
+          LOG.error("Failed to reload fair scheduler config file - " +
               "will use existing allocations.", e);
         }
         lastReloadAttemptFailed = true;
@@ -165,8 +229,16 @@ public class PoolManager {
     Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
     Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
     Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
+    Map<String, Integer> poolMaxMaps = new HashMap<String, Integer>();
+    Map<String, Integer> poolMaxReduces = new HashMap<String, Integer>();
     Map<String, Double> poolWeights = new HashMap<String, Double>();
+    Map<String, SchedulingMode> poolModes = new HashMap<String, SchedulingMode>();
+    Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
     int userMaxJobsDefault = Integer.MAX_VALUE;
+    int poolMaxJobsDefault = Integer.MAX_VALUE;
+    long fairSharePreemptionTimeout = Long.MAX_VALUE;
+    long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+    SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
     
     // Remember all pool names so we can display them on web UI, etc.
     List<String> poolNamesInAllocFile = new ArrayList<String>();
@@ -176,11 +248,16 @@ public class PoolManager {
       DocumentBuilderFactory.newInstance();
     docBuilderFactory.setIgnoringComments(true);
     DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
-    Document doc = builder.parse(new File(allocFile));
+    Document doc;
+    if (allocFile instanceof String) {
+      doc = builder.parse(new File((String) allocFile));
+    } else {
+      doc = builder.parse(allocFile.toString());
+    }
     Element root = doc.getDocumentElement();
     if (!"allocations".equals(root.getTagName()))
-      throw new AllocationConfigurationException("Bad allocations file: " + 
-          "top-level element not <allocations>");
+      throw new AllocationConfigurationException("Bad fair scheduler config " + 
+          "file: top-level element not <allocations>");
     NodeList elements = root.getChildNodes();
     for (int i = 0; i < elements.getLength(); i++) {
       Node node = elements.item(i);
@@ -204,6 +281,14 @@ public class PoolManager {
             String text = ((Text)field.getFirstChild()).getData().trim();
             int val = Integer.parseInt(text);
             reduceAllocs.put(poolName, val);
+          } else if ("maxMaps".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            poolMaxMaps.put(poolName, val);
+          } else if ("maxReduces".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            poolMaxReduces.put(poolName, val);
           } else if ("maxRunningJobs".equals(field.getTagName())) {
             String text = ((Text)field.getFirstChild()).getData().trim();
             int val = Integer.parseInt(text);
@@ -212,8 +297,25 @@ public class PoolManager {
             String text = ((Text)field.getFirstChild()).getData().trim();
             double val = Double.parseDouble(text);
             poolWeights.put(poolName, val);
+          } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            long val = Long.parseLong(text) * 1000L;
+            minSharePreemptionTimeouts.put(poolName, val);
+          } else if ("schedulingMode".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            poolModes.put(poolName, parseSchedulingMode(text));
           }
         }
+        if (poolMaxMaps.containsKey(poolName) && mapAllocs.containsKey(poolName)
+            && poolMaxMaps.get(poolName) < mapAllocs.get(poolName)) {
+          LOG.warn(String.format("Pool %s has max maps %d less than min maps %d",
+              poolName, poolMaxMaps.get(poolName), mapAllocs.get(poolName)));        
+        }
+        if(poolMaxReduces.containsKey(poolName) && reduceAllocs.containsKey(poolName)
+            && poolMaxReduces.get(poolName) < reduceAllocs.get(poolName)) {
+          LOG.warn(String.format("Pool %s has max reduces %d less than min reduces %d",
+              poolName, poolMaxReduces.get(poolName), reduceAllocs.get(poolName)));        
+        }
       } else if ("user".equals(element.getTagName())) {
         String userName = element.getAttribute("name");
         NodeList fields = element.getChildNodes();
@@ -232,6 +334,21 @@ public class PoolManager {
         String text = ((Text)element.getFirstChild()).getData().trim();
         int val = Integer.parseInt(text);
         userMaxJobsDefault = val;
+      } else if ("poolMaxJobsDefault".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        int val = Integer.parseInt(text);
+        poolMaxJobsDefault = val;
+      } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        long val = Long.parseLong(text) * 1000L;
+        fairSharePreemptionTimeout = val;
+      } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        long val = Long.parseLong(text) * 1000L;
+        defaultMinSharePreemptionTimeout = val;
+      } else if ("defaultPoolSchedulingMode".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        defaultSchedulingMode = parseSchedulingMode(text);
       } else {
         LOG.warn("Bad element in allocations file: " + element.getTagName());
       }
@@ -242,17 +359,61 @@ public class PoolManager {
     synchronized(this) {
       this.mapAllocs = mapAllocs;
       this.reduceAllocs = reduceAllocs;
+      this.poolMaxMaps = poolMaxMaps;
+      this.poolMaxReduces = poolMaxReduces;
       this.poolMaxJobs = poolMaxJobs;
       this.userMaxJobs = userMaxJobs;
-      this.userMaxJobsDefault = userMaxJobsDefault;
       this.poolWeights = poolWeights;
+      this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
+      this.userMaxJobsDefault = userMaxJobsDefault;
+      this.poolMaxJobsDefault = poolMaxJobsDefault;
+      this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
+      this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
+      this.defaultSchedulingMode = defaultSchedulingMode;
       for (String name: poolNamesInAllocFile) {
-        getPool(name);
+        Pool pool = getPool(name);
+        if (poolModes.containsKey(name)) {
+          pool.setSchedulingMode(poolModes.get(name));
+        } else {
+          pool.setSchedulingMode(defaultSchedulingMode);
+        }
       }
     }
   }
 
   /**
+   * Does the pool have incompatible max and min allocation set.
+   * 
+   * @param type
+   *          {@link TaskType#MAP} or {@link TaskType#REDUCE}
+   * @param pool
+   *          the pool name
+   * @return true if the max is less than the min
+   */
+  boolean invertedMinMax(TaskType type, String pool) {
+    Map<String, Integer> max = TaskType.MAP == type ? poolMaxMaps : poolMaxReduces;
+    Map<String, Integer> min = TaskType.MAP == type ? mapAllocs : reduceAllocs;
+    if (max.containsKey(pool) && min.containsKey(pool)
+        && max.get(pool) < min.get(pool)) {
+      return true;
+    }
+    return false;
+  }
+
+  private SchedulingMode parseSchedulingMode(String text)
+      throws AllocationConfigurationException {
+    text = text.toLowerCase();
+    if (text.equals("fair")) {
+      return SchedulingMode.FAIR;
+    } else if (text.equals("fifo")) {
+      return SchedulingMode.FIFO;
+    } else {
+      throw new AllocationConfigurationException(
+          "Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'");
+    }
+  }
+
+  /**
    * Get the allocation for a particular pool
    */
   public int getAllocation(String pool, TaskType taskType) {
@@ -261,7 +422,20 @@ public class PoolManager {
     Integer alloc = allocationMap.get(pool);
     return (alloc == null ? 0 : alloc);
   }
-  
+
+  /**
+   * Get the maximum map or reduce slots for the given pool.
+   * @return the cap set on this pool, or Integer.MAX_VALUE if not set.
+   */
+  int getMaxSlots(String poolName, TaskType taskType) {
+    Map<String, Integer> maxMap = (taskType == TaskType.MAP ? poolMaxMaps : poolMaxReduces);
+    if (maxMap.containsKey(poolName)) {
+      return maxMap.get(poolName);
+    } else {
+      return Integer.MAX_VALUE;
+    }
+  }
+ 
   /**
    * Add a job in the appropriate pool
    */
@@ -281,7 +455,7 @@ public class PoolManager {
    */
   public synchronized void setPool(JobInProgress job, String pool) {
     removeJob(job);
-    job.getJobConf().set(poolNameProperty, pool);
+    job.getJobConf().set(EXPLICIT_POOL_PROPERTY, pool);
     addJob(job);
   }
 
@@ -293,13 +467,16 @@ public class PoolManager {
   }
   
   /**
-   * Get the pool name for a JobInProgress from its configuration. This uses
-   * the "project" property in the jobconf by default, or the property set with
-   * "mapred.fairscheduler.poolnameproperty".
+   * Get the pool name for a JobInProgress from its configuration.  This uses
+   * the value of mapred.fairscheduler.pool if specified, otherwise the value 
+   * of the property named in mapred.fairscheduler.poolnameproperty if that is
+   * specified.  Otherwise if neither is specified it uses the "user.name" property 
+   * in the jobconf by default.
    */
   public String getPoolName(JobInProgress job) {
-    JobConf conf = job.getJobConf();
-    return conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME).trim();
+    Configuration conf = job.getJobConf();
+    return conf.get(EXPLICIT_POOL_PROPERTY,
+      conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME)).trim();
   }
 
   /**
@@ -327,7 +504,7 @@ public class PoolManager {
     if (poolMaxJobs.containsKey(pool)) {
       return poolMaxJobs.get(pool);
     } else {
-      return Integer.MAX_VALUE;
+      return poolMaxJobsDefault;
     }
   }
 
@@ -338,4 +515,32 @@ public class PoolManager {
       return 1.0;
     }
   }
+
+  /**
+   * Get a pool's min share preemption timeout, in milliseconds. This is the
+   * time after which jobs in the pool may kill other pools' tasks if they
+   * are below their min share.
+   */
+  public long getMinSharePreemptionTimeout(String pool) {
+    if (minSharePreemptionTimeouts.containsKey(pool)) {
+      return minSharePreemptionTimeouts.get(pool);
+    } else {
+      return defaultMinSharePreemptionTimeout;
+    }
+  }
+  
+  /**
+   * Get the fair share preemption, in milliseconds. This is the time
+   * after which any job may kill other jobs' tasks if it is below half
+   * its fair share.
+   */
+  public long getFairSharePreemptionTimeout() {
+    return fairSharePreemptionTimeout;
+  }
+
+  synchronized void updateMetrics() {
+    for (Pool pool : pools.values()) {
+      pool.updateMetrics();
+    }
+  }
 }

Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class PoolSchedulable extends Schedulable {
+  public static final Log LOG = LogFactory.getLog(
+      PoolSchedulable.class.getName());
+  
+  private FairScheduler scheduler;
+  private Pool pool;
+  private TaskType taskType;
+  private PoolManager poolMgr;
+  private List<JobSchedulable> jobScheds = new LinkedList<JobSchedulable>();
+  private int demand = 0;
+  
+  // Variables used for preemption
+  long lastTimeAtMinShare;
+  long lastTimeAtHalfFairShare;
+
+  public PoolSchedulable(FairScheduler scheduler, Pool pool, TaskType type) {
+    this.scheduler = scheduler;
+    this.pool = pool;
+    this.taskType = type;
+    this.poolMgr = scheduler.getPoolManager();
+    long currentTime = scheduler.getClock().getTime();
+    this.lastTimeAtMinShare = currentTime;
+    this.lastTimeAtHalfFairShare = currentTime;
+    
+    initMetrics();
+  }
+
+  public void addJob(JobInProgress job) {
+    JobInfo info = scheduler.getJobInfo(job);
+    jobScheds.add(taskType == TaskType.MAP ?
+        info.mapSchedulable : info.reduceSchedulable);
+  }
+  
+  public void removeJob(JobInProgress job) {
+    for (Iterator<JobSchedulable> it = jobScheds.iterator(); it.hasNext();) {
+      JobSchedulable jobSched = it.next();
+      if (jobSched.getJob() == job) {
+        it.remove();
+        break;
+      }
+    }
+  }
+
+  /**
+   * Update demand by asking jobs in the pool to update
+   */
+  @Override
+  public void updateDemand() {
+    demand = 0;
+    for (JobSchedulable sched: jobScheds) {
+      sched.updateDemand();
+      demand += sched.getDemand();
+    }
+    // if demand exceeds the cap for this pool, limit to the max
+    int maxTasks = poolMgr.getMaxSlots(pool.getName(), taskType);
+    if(demand > maxTasks) {
+      demand = maxTasks;
+    }
+  }
+  
+  /**
+   * Distribute the pool's fair share among its jobs
+   */
+  @Override
+  public void redistributeShare() {
+    if (pool.getSchedulingMode() == SchedulingMode.FAIR) {
+      SchedulingAlgorithms.computeFairShares(jobScheds, getFairShare());
+    } else {
+      for (JobSchedulable sched: jobScheds) {
+        sched.setFairShare(0);
+      }
+    } 
+  }
+
+  @Override
+  public int getDemand() {
+    return demand;
+  }
+
+  @Override
+  public int getMinShare() {
+    return poolMgr.getAllocation(pool.getName(), taskType);
+  }
+
+  @Override
+  public double getWeight() {
+    return poolMgr.getPoolWeight(pool.getName());
+  }
+
+  @Override
+  public JobPriority getPriority() {
+    return JobPriority.NORMAL;
+  }
+
+  @Override
+  public int getRunningTasks() {
+    int ans = 0;
+    for (JobSchedulable sched: jobScheds) {
+      ans += sched.getRunningTasks();
+    }
+    return ans;
+  }
+
+  @Override
+  public long getStartTime() {
+    return 0;
+  }
+
+  @Override
+  public Task assignTask(TaskTrackerStatus tts, long currentTime,
+      Collection<JobInProgress> visited) throws IOException {
+    int runningTasks = getRunningTasks();
+    if (runningTasks >= poolMgr.getMaxSlots(pool.getName(), taskType)) {
+      return null;
+    }
+    SchedulingMode mode = pool.getSchedulingMode();
+    Comparator<Schedulable> comparator;
+    if (mode == SchedulingMode.FIFO) {
+      comparator = new SchedulingAlgorithms.FifoComparator();
+    } else if (mode == SchedulingMode.FAIR) {
+      comparator = new SchedulingAlgorithms.FairShareComparator();
+    } else {
+      throw new RuntimeException("Unsupported pool scheduling mode " + mode);
+    }
+    Collections.sort(jobScheds, comparator);
+    for (JobSchedulable sched: jobScheds) {
+      Task task = sched.assignTask(tts, currentTime, visited);
+      if (task != null)
+        return task;
+    }
+    return null;
+  }
+  
+  @Override
+  public String getName() {
+    return pool.getName();
+  }
+
+  Pool getPool() {
+    return pool;
+  }
+
+  @Override
+  public TaskType getTaskType() {
+    return taskType;
+  }
+  
+  public Collection<JobSchedulable> getJobSchedulables() {
+    return jobScheds;
+  }
+  
+  public long getLastTimeAtMinShare() {
+    return lastTimeAtMinShare;
+  }
+  
+  public void setLastTimeAtMinShare(long lastTimeAtMinShare) {
+    this.lastTimeAtMinShare = lastTimeAtMinShare;
+  }
+  
+  public long getLastTimeAtHalfFairShare() {
+    return lastTimeAtHalfFairShare;
+  }
+  
+  public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
+    this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
+  }
+
+  protected String getMetricsContextName() {
+    return "pools";
+  }
+  
+  @Override
+  public void updateMetrics() {
+    super.setMetricValues(metrics);
+    
+    if (scheduler.isPreemptionEnabled()) {
+      // These won't be set if preemption is off
+      long lastCheck = scheduler.getLastPreemptionUpdateTime();
+      metrics.setMetric("millisSinceAtMinShare", lastCheck - lastTimeAtMinShare);
+      metrics.setMetric("millisSinceAtHalfFairShare", lastCheck - lastTimeAtHalfFairShare);
+    }
+    metrics.update();
+
+    for (JobSchedulable job : jobScheds) {
+      job.updateMetrics();
+    }
+  }
+}

Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+
+/**
+ * A Schedulable represents an entity that can launch tasks, such as a job
+ * or a pool. It provides a common interface so that algorithms such as fair
+ * sharing can be applied both within a pool and across pools. There are 
+ * currently two types of Schedulables: JobSchedulables, which represent a
+ * single job, and PoolSchedulables, which allocate among jobs in their pool.
+ * 
+ * Separate sets of Schedulables are used for maps and reduces. Each pool has
+ * both a mapSchedulable and a reduceSchedulable, and so does each job.
+ * 
+ * A Schedulable is responsible for three roles:
+ * 1) It can launch tasks through assignTask().
+ * 2) It provides information about the job/pool to the scheduler, including:
+ *    - Demand (maximum number of tasks required)
+ *    - Number of currently running tasks
+ *    - Minimum share (for pools)
+ *    - Job/pool weight (for fair sharing)
+ *    - Start time and priority (for FIFO)
+ * 3) It can be assigned a fair share, for use with fair scheduling.
+ * 
+ * Schedulable also contains two methods for performing scheduling computations:
+ * - updateDemand() is called periodically to compute the demand of the various
+ *   jobs and pools, which may be expensive (e.g. jobs must iterate through all
+ *   their tasks to count failed tasks, tasks that can be speculated, etc).
+ * - redistributeShare() is called after demands are updated and a Schedulable's
+ *   fair share has been set by its parent to let it distribute its share among
+ *   the other Schedulables within it (e.g. for pools that want to perform fair
+ *   sharing among their jobs).
+ */
+abstract class Schedulable {
+  /** Fair share assigned to this Schedulable */
+  private double fairShare = 0;
+  protected MetricsRecord metrics;
+  
+  /**
+   * Name of job/pool, used for debugging as well as for breaking ties in
+   * scheduling order deterministically. 
+   */
+  public abstract String getName();
+  
+  /**
+   * @return the type of tasks that this pool schedules
+   */
+  public abstract TaskType getTaskType();
+  
+  /**
+   * Maximum number of tasks required by this Schedulable. This is defined as
+   * number of currently running tasks + number of unlaunched tasks (tasks that
+   * are either not yet launched or need to be speculated).
+   */
+  public abstract int getDemand();
+  
+  /** Number of tasks the schedulable is currently running. */
+  public abstract int getRunningTasks();
+  
+  /** Minimum share slots assigned to the schedulable. */
+  public abstract int getMinShare();
+  
+  /** Job/pool weight in fair sharing. */
+  public abstract double getWeight();
+  
+  /** Job priority for jobs in FIFO pools; meaningless for PoolSchedulables. */
+  public abstract JobPriority getPriority();
+  
+  /** Start time for jobs in FIFO pools; meaningless for PoolSchedulables. */
+  public abstract long getStartTime();
+  
+  /** Refresh the Schedulable's demand and those of its children if any. */
+  public abstract void updateDemand();
+  
+  /** 
+   * Distribute the fair share assigned to this Schedulable among its 
+   * children (used in pools where the internal scheduler is fair sharing). 
+   */
+  public abstract void redistributeShare();
+  
+  /**
+   * Obtain a task for a given TaskTracker, or null if the Schedulable has
+   * no tasks to launch at this moment or does not wish to launch a task on
+   * this TaskTracker (e.g. is waiting for a TaskTracker with local data). 
+   * In addition, if a job is skipped during this search because it is waiting
+   * for a TaskTracker with local data, this method is expected to add it to
+   * the <tt>visited</tt> collection passed in, so that the scheduler can
+   * properly mark it as skipped during this heartbeat. Please see
+   * {@link FairScheduler#getAllowedLocalityLevel(JobInProgress, long)}
+   * for details of delay scheduling (waiting for trackers with local data).
+   * 
+   * @param tts      TaskTracker that the task will be launched on
+   * @param currentTime Cached time (to prevent excessive calls to gettimeofday)
+   * @param visited  A Collection to which this method must add all jobs that
+   *                 were considered during the search for a job to assign.
+   * @return Task to launch, or null if Schedulable cannot currently launch one.
+   * @throws IOException Possible if obtainNew(Map|Reduce)Task throws exception.
+   */
+  public abstract Task assignTask(TaskTrackerStatus tts, long currentTime,
+      Collection<JobInProgress> visited) throws IOException;
+
+  /** Assign a fair share to this Schedulable. */
+  public void setFairShare(double fairShare) {
+    this.fairShare = fairShare;
+  }
+  
+  /** Get the fair share assigned to this Schedulable. */
+  public double getFairShare() {
+    return fairShare;
+  }
+  
+  /** Return the name of the metrics context for this schedulable */
+  protected abstract String getMetricsContextName();
+  
+  /**
+   * Set up metrics context
+   */
+  protected void initMetrics() {
+    MetricsContext metricsContext = MetricsUtil.getContext("fairscheduler");
+    this.metrics = MetricsUtil.createRecord(metricsContext,
+        getMetricsContextName());
+    metrics.setTag("name", getName());
+    metrics.setTag("taskType", getTaskType().toString());
+  }
+
+  void cleanupMetrics() {
+    metrics.remove();
+    metrics = null;
+  }
+
+  protected void setMetricValues(MetricsRecord metrics) {
+    metrics.setMetric("fairShare", (float)getFairShare());
+    metrics.setMetric("minShare", getMinShare());
+    metrics.setMetric("demand", getDemand());
+    metrics.setMetric("weight", (float)getWeight());
+    metrics.setMetric("runningTasks", getRunningTasks());
+  }
+  
+  abstract void updateMetrics();
+  
+  /** Convenient toString implementation for debugging. */
+  @Override
+  public String toString() {
+    return String.format("[%s, demand=%d, running=%d, share=%.1f, w=%.1f]",
+        getName(), getDemand(), getRunningTasks(), fairShare, getWeight());
+  }
+}

Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Utility class containing scheduling algorithms used in the fair scheduler.
+ */
+class SchedulingAlgorithms {
+  public static final Log LOG = LogFactory.getLog(
+      SchedulingAlgorithms.class.getName());
+  
+  /**
+   * Compare Schedulables in order of priority and then submission time, as in
+   * the default FIFO scheduler in Hadoop.
+   */
+  public static class FifoComparator implements Comparator<Schedulable> {
+    @Override
+    public int compare(Schedulable s1, Schedulable s2) {
+      int res = s1.getPriority().compareTo(s2.getPriority());
+      if (res == 0) {
+        res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+      }
+      if (res == 0) {
+        // In the rare case where jobs were submitted at the exact same time,
+        // compare them by name (which will be the JobID) to get a deterministic
+        // ordering, so we don't alternately launch tasks from different jobs.
+        res = s1.getName().compareTo(s2.getName());
+      }
+      return res;
+    }
+  }
+
+  /**
+   * Compare Schedulables via weighted fair sharing. In addition, Schedulables
+   * below their min share get priority over those whose min share is met. 
+   * 
+   * Schedulables below their min share are compared by how far below it they
+   * are as a ratio. For example, if job A has 8 out of a min share of 10 tasks
+   * and job B has 50 out of a min share of 100, then job B is scheduled next, 
+   * because B is at 50% of its min share and A is at 80% of its min share.
+   * 
+   * Schedulables above their min share are compared by (runningTasks / weight).
+   * If all weights are equal, slots are given to the job with the fewest tasks;
+   * otherwise, jobs with more weight get proportionally more slots.
+   */
+  public static class FairShareComparator implements Comparator<Schedulable> {
+    @Override
+    public int compare(Schedulable s1, Schedulable s2) {
+      double minShareRatio1, minShareRatio2;
+      double tasksToWeightRatio1, tasksToWeightRatio2;
+      int minShare1 = Math.min(s1.getMinShare(), s1.getDemand());
+      int minShare2 = Math.min(s2.getMinShare(), s2.getDemand());
+      boolean s1Needy = s1.getRunningTasks() < minShare1;
+      boolean s2Needy = s2.getRunningTasks() < minShare2;
+      minShareRatio1 = s1.getRunningTasks() / Math.max(minShare1, 1.0);
+      minShareRatio2 = s2.getRunningTasks() / Math.max(minShare2, 1.0);
+      tasksToWeightRatio1 = s1.getRunningTasks() / s1.getWeight();
+      tasksToWeightRatio2 = s2.getRunningTasks() / s2.getWeight();
+      int res = 0;
+      if (s1Needy && !s2Needy)
+        res = -1;
+      else if (s2Needy && !s1Needy)
+        res = 1;
+      else if (s1Needy && s2Needy)
+        res = (int) Math.signum(minShareRatio1 - minShareRatio2);
+      else // Neither schedulable is needy
+        res = (int) Math.signum(tasksToWeightRatio1 - tasksToWeightRatio2);
+      if (res == 0) {
+        // Jobs are tied in fairness ratio. Break the tie by submit time and job 
+        // name to get a deterministic ordering, which is useful for unit tests.
+        res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+        if (res == 0)
+          res = s1.getName().compareTo(s2.getName());
+      }
+      return res;
+    }
+  }
+
+  /** 
+   * Number of iterations for the binary search in computeFairShares. This is 
+   * equivalent to the number of bits of precision in the output. 25 iterations 
+   * gives precision better than 0.1 slots in clusters with one million slots.
+   */
+  private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
+  
+  /**
+   * Given a set of Schedulables and a number of slots, compute their weighted
+   * fair shares. The min shares and demands of the Schedulables are assumed to
+   * be set beforehand. We compute the fairest possible allocation of shares 
+   * to the Schedulables that respects their min shares and demands.
+   * 
+   * To understand what this method does, we must first define what weighted
+   * fair sharing means in the presence of minimum shares and demands. If there
+   * were no minimum shares and every Schedulable had an infinite demand (i.e.
+   * could launch infinitely many tasks), then weighted fair sharing would be
+   * achieved if the ratio of slotsAssigned / weight was equal for each
+   * Schedulable and all slots were assigned. Minimum shares and demands add
+   * two further twists:
+   * - Some Schedulables may not have enough tasks to fill all their share.
+   * - Some Schedulables may have a min share higher than their assigned share.
+   * 
+   * To deal with these possibilities, we define an assignment of slots as
+   * being fair if there exists a ratio R such that:
+   * - Schedulables S where S.demand < R * S.weight are assigned share S.demand
+   * - Schedulables S where S.minShare > R * S.weight are given share S.minShare
+   * - All other Schedulables S are assigned share R * S.weight
+   * - The sum of all the shares is totalSlots.
+   * 
+   * We call R the weight-to-slots ratio because it converts a Schedulable's
+   * weight to the number of slots it is assigned.
+   * 
+   * We compute a fair allocation by finding a suitable weight-to-slot ratio R.
+   * To do this, we use binary search. Given a ratio R, we compute the number
+   * of slots that would be used in total with this ratio (the sum of the shares
+   * computed using the conditions above). If this number of slots is less than
+   * totalSlots, then R is too small and more slots could be assigned. If the
+   * number of slots is more than totalSlots, then R is too large. 
+   * 
+   * We begin the binary search with a lower bound on R of 0 (which means that
+   * all Schedulables are only given their minShare) and an upper bound computed
+   * to be large enough that too many slots are given (by doubling R until we
+   * either use more than totalSlots slots or we fulfill all jobs' demands).
+   * The helper method slotsUsedWithWeightToSlotRatio computes the total number
+   * of slots used with a given value of R.
+   * 
+   * The running time of this algorithm is linear in the number of Schedulables,
+   * because slotsUsedWithWeightToSlotRatio is linear-time and the number of
+   * iterations of binary search is a constant (dependent on desired precision).
+   */
+  public static void computeFairShares(
+      Collection<? extends Schedulable> schedulables, double totalSlots) {
+    // Find an upper bound on R that we can use in our binary search. We start 
+    // at R = 1 and double it until we have either used totalSlots slots or we
+    // have met all Schedulables' demands (if total demand < totalSlots).
+    double totalDemand = 0;
+    for (Schedulable sched: schedulables) {
+      totalDemand += sched.getDemand();
+    }
+    double cap = Math.min(totalDemand, totalSlots);
+    double rMax = 1.0;
+    while (slotsUsedWithWeightToSlotRatio(rMax, schedulables) < cap) {
+      rMax *= 2.0;
+    }
+    // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
+    double left = 0;
+    double right = rMax;
+    for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
+      double mid = (left + right) / 2.0;
+      if (slotsUsedWithWeightToSlotRatio(mid, schedulables) < cap) {
+        left = mid;
+      } else {
+        right = mid;
+      }
+    }
+    // Set the fair shares based on the value of R we've converged to
+    for (Schedulable sched: schedulables) {
+      sched.setFairShare(computeShare(sched, right));
+    }
+  }
+  
+  /**
+   * Compute the number of slots that would be used given a weight-to-slot
+   * ratio w2sRatio, for use in the computeFairShares algorithm as described
+   * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+   */
+  private static double slotsUsedWithWeightToSlotRatio(double w2sRatio,
+      Collection<? extends Schedulable> schedulables) {
+    double slotsTaken = 0;
+    for (Schedulable sched: schedulables) {
+      double share = computeShare(sched, w2sRatio);
+      slotsTaken += share;
+    }
+    return slotsTaken;
+  }
+
+  /**
+   * Compute the number of slots assigned to a Schedulable given a particular
+   * weight-to-slot ratio w2sRatio, for use in computeFairShares as described
+   * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+   */
+  private static double computeShare(Schedulable sched, double w2sRatio) {
+    double share = sched.getWeight() * w2sRatio;
+    share = Math.max(share, sched.getMinShare());
+    share = Math.min(share, sched.getDemand());
+    return share;
+  }
+}

Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * Internal scheduling modes for pools.
+ */
+public enum SchedulingMode {
+  FAIR, FIFO
+}

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java Sun Sep 11 23:57:37 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * A pluggable object for selecting tasks to run from a {@link JobInProgress} on
@@ -86,7 +87,7 @@ public abstract class TaskSelector imple
    * @throws IOException 
    */
   public abstract Task obtainNewMapTask(TaskTrackerStatus taskTracker,
-      JobInProgress job) throws IOException;
+      JobInProgress job, int localityLevel) throws IOException;
 
   /**
    * Choose a reduce task to run from the given job on the given TaskTracker.

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java Sun Sep 11 23:57:37 2011
@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.mapreduce.TaskType;
 
 /**
  * A pluggable object for altering the weights of jobs in the fair scheduler,

Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Dummy implementation of Schedulable for unit testing.
+ */
+public class FakeSchedulable extends Schedulable {
+  private int demand;
+  private int runningTasks;
+  private int minShare;
+  private double weight;
+  private JobPriority priority;
+  private long startTime;
+  
+  public FakeSchedulable() {
+    this(0, 0, 1, 0, 0, JobPriority.NORMAL, 0);
+  }
+  
+  public FakeSchedulable(int demand) {
+    this(demand, 0, 1, 0, 0, JobPriority.NORMAL, 0);
+  }
+  
+  public FakeSchedulable(int demand, int minShare) {
+    this(demand, minShare, 1, 0, 0, JobPriority.NORMAL, 0);
+  }
+  
+  public FakeSchedulable(int demand, int minShare, double weight) {
+    this(demand, minShare, weight, 0, 0, JobPriority.NORMAL, 0);
+  }
+  
+  public FakeSchedulable(int demand, int minShare, double weight, int fairShare,
+      int runningTasks, JobPriority priority, long startTime) {
+    this.demand = demand;
+    this.minShare = minShare;
+    this.weight = weight;
+    setFairShare(fairShare);
+    this.runningTasks = runningTasks;
+    this.priority = priority;
+    this.startTime = startTime;
+  }
+  
+  @Override
+  public Task assignTask(TaskTrackerStatus tts, long currentTime,
+      Collection<JobInProgress> visited) throws IOException {
+    return null;
+  }
+
+  @Override
+  public int getDemand() {
+    return demand;
+  }
+
+  @Override
+  public String getName() {
+    return "FakeSchedulable" + this.hashCode();
+  }
+
+  @Override
+  public JobPriority getPriority() {
+    return priority;
+  }
+
+  @Override
+  public int getRunningTasks() {
+    return runningTasks;
+  }
+
+  @Override
+  public long getStartTime() {
+    return startTime;
+  }
+  
+  @Override
+  public double getWeight() {
+    return weight;
+  }
+  
+  @Override
+  public int getMinShare() {
+    return minShare;
+  }
+
+  @Override
+  public void redistributeShare() {}
+
+  @Override
+  public void updateDemand() {}
+
+  @Override
+  public TaskType getTaskType() {
+    return TaskType.MAP;
+  }
+
+  @Override
+  protected String getMetricsContextName() {
+    return "fake";
+  }
+
+  @Override
+  void updateMetrics() {
+  }
+}

Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskStatus.State;
+
+import junit.framework.TestCase;
+
+/**
+ * Exercise the canAssignMap and canAssignReduce methods in 
+ * CapBasedLoadManager.
+ */
+public class TestCapBasedLoadManager extends TestCase {
+  
+  /**
+   * Returns a running MapTaskStatus.
+   */
+  private TaskStatus getRunningMapTaskStatus() {
+    TaskStatus ts = new MapTaskStatus();
+    ts.setRunState(State.RUNNING);
+    return ts;
+  }
+
+  /**
+   * Returns a running ReduceTaskStatus.
+   */
+  private TaskStatus getRunningReduceTaskStatus() {
+    TaskStatus ts = new ReduceTaskStatus();
+    ts.setRunState(State.RUNNING);
+    return ts;
+  }
+  
+  /**
+   * Returns a TaskTrackerStatus with the specified statistics. 
+   * @param mapCap        The capacity of map tasks 
+   * @param reduceCap     The capacity of reduce tasks
+   * @param runningMap    The number of running map tasks
+   * @param runningReduce The number of running reduce tasks
+   */
+  private TaskTrackerStatus getTaskTrackerStatus(int mapCap, int reduceCap, 
+      int runningMap, int runningReduce) {
+    List<TaskStatus> ts = new ArrayList<TaskStatus>();
+    for (int i = 0; i < runningMap; i++) {
+      ts.add(getRunningMapTaskStatus());
+    }
+    for (int i = 0; i < runningReduce; i++) {
+      ts.add(getRunningReduceTaskStatus());
+    }
+    TaskTrackerStatus tracker = new TaskTrackerStatus("tracker", 
+        "tracker_host", 1234, ts, 0, mapCap, reduceCap);
+    return tracker;
+  }
+
+  /**
+   * A single test of canAssignMap.
+   */
+  private void oneTestCanAssignMap(float maxDiff, int mapCap, int runningMap,
+      int totalMapSlots, int totalRunnableMap, boolean expected) {
+    
+    CapBasedLoadManager manager = new CapBasedLoadManager();
+    Configuration conf = new Configuration();
+    conf.setFloat("mapred.fairscheduler.load.max.diff", maxDiff);
+    manager.setConf(conf);
+    
+    TaskTrackerStatus ts = getTaskTrackerStatus(mapCap, 1, runningMap, 1);
+    
+    assertEquals( "When maxDiff=" + maxDiff + ", with totalRunnableMap=" 
+        + totalRunnableMap + " and totalMapSlots=" + totalMapSlots
+        + ", a tracker with runningMap=" + runningMap + " and mapCap="
+        + mapCap + " should " + (expected ? "" : "not ")
+        + "be able to take more Maps.",
+        expected,
+        manager.canAssignMap(ts, totalRunnableMap, totalMapSlots)
+        );
+  }
+  
+  
+  /** 
+   * Test canAssignMap method.
+   */
+  public void testCanAssignMap() {
+    oneTestCanAssignMap(0.0f, 5, 0, 50, 1, true);
+    oneTestCanAssignMap(0.0f, 5, 1, 50, 10, false);
+    oneTestCanAssignMap(0.2f, 5, 1, 50, 10, true);
+    oneTestCanAssignMap(0.0f, 5, 1, 50, 11, true);
+    oneTestCanAssignMap(0.0f, 5, 2, 50, 11, false);
+    oneTestCanAssignMap(0.3f, 5, 2, 50, 6, true);
+    oneTestCanAssignMap(1.0f, 5, 5, 50, 50, false);
+  }
+  
+  
+  /**
+   * A single test of canAssignReduce.
+   */
+  private void oneTestCanAssignReduce(float maxDiff, int ReduceCap,
+      int runningReduce, int totalReduceSlots, int totalRunnableReduce,
+      boolean expected) {
+    
+    CapBasedLoadManager manager = new CapBasedLoadManager();
+    Configuration conf = new Configuration();
+    conf.setFloat("mapred.fairscheduler.load.max.diff", maxDiff);
+    manager.setConf(conf);
+    
+    TaskTrackerStatus ts = getTaskTrackerStatus(1, ReduceCap, 1,
+        runningReduce);
+    
+    assertEquals( "When maxDiff=" + maxDiff + ", with totalRunnableReduce=" 
+        + totalRunnableReduce + " and totalReduceSlots=" + totalReduceSlots
+        + ", a tracker with runningReduce=" + runningReduce
+        + " and ReduceCap=" + ReduceCap + " should "
+        + (expected ? "" : "not ") + "be able to take more Reduces.",
+        expected,
+        manager.canAssignReduce(ts, totalRunnableReduce, totalReduceSlots)
+        );
+  }
+    
+  /** 
+   * Test canAssignReduce method.
+   */
+  public void testCanAssignReduce() {
+    oneTestCanAssignReduce(0.0f, 5, 0, 50, 1, true);
+    oneTestCanAssignReduce(0.0f, 5, 1, 50, 10, false);
+    oneTestCanAssignReduce(0.2f, 5, 1, 50, 10, true);
+    oneTestCanAssignReduce(0.0f, 5, 1, 50, 11, true);
+    oneTestCanAssignReduce(0.0f, 5, 2, 50, 11, false);
+    oneTestCanAssignReduce(0.3f, 5, 2, 50, 6, true);
+    oneTestCanAssignReduce(1.0f, 5, 5, 50, 50, false);
+  }
+  
+}

Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+/**
+ * Exercise the computeFairShares method in SchedulingAlgorithms.
+ */
+public class TestComputeFairShares extends TestCase {
+  private List<Schedulable> scheds;
+  
+  @Override
+  protected void setUp() throws Exception {
+    scheds = new ArrayList<Schedulable>();
+  }
+  
+  /** 
+   * Basic test - pools with different demands that are all higher than their
+   * fair share (of 10 slots) should each get their fair share.
+   */
+  public void testEqualSharing() {
+    scheds.add(new FakeSchedulable(100));
+    scheds.add(new FakeSchedulable(50));
+    scheds.add(new FakeSchedulable(30));
+    scheds.add(new FakeSchedulable(20));
+    SchedulingAlgorithms.computeFairShares(scheds, 40);
+    verifyShares(10, 10, 10, 10);
+  }
+  
+  /**
+   * In this test, pool 4 has a smaller demand than the 40 / 4 = 10 slots that
+   * it would be assigned with equal sharing. It should only get the 3 slots
+   * it demands. The other pools must then split the remaining 37 slots, but
+   * pool 3, with 11 slots demanded, is now below its share of 37/3 ~= 12.3,
+   * so it only gets 11 slots. Pools 1 and 2 split the rest and get 13 each. 
+   */
+  public void testLowDemands() {
+    scheds.add(new FakeSchedulable(100));
+    scheds.add(new FakeSchedulable(50));
+    scheds.add(new FakeSchedulable(11));
+    scheds.add(new FakeSchedulable(3));
+    SchedulingAlgorithms.computeFairShares(scheds, 40);
+    verifyShares(13, 13, 11, 3);
+  }
+  
+  /**
+   * In this test, some pools have minimum shares set. Pool 1 has a min share
+   * of 20 so it gets 20 slots. Pool 2 also has a min share of 20, but its
+   * demand is only 10 so it can only get 10 slots. The remaining pools have
+   * 10 slots to split between them. Pool 4 gets 3 slots because its demand is
+   * only 3, and pool 3 gets the remaining 7 slots. Pool 4 also had a min share
+   * of 2 slots but this should not affect the outcome.
+   */
+  public void testMinShares() {
+    scheds.add(new FakeSchedulable(100, 20));
+    scheds.add(new FakeSchedulable(10, 20));
+    scheds.add(new FakeSchedulable(10, 0));
+    scheds.add(new FakeSchedulable(3, 2));
+    SchedulingAlgorithms.computeFairShares(scheds, 40);
+    verifyShares(20, 10, 7, 3);
+  }
+  
+  /**
+   * Basic test for weighted shares with no minimum shares and no low demands.
+   * Each pool should get slots in proportion to its weight.
+   */
+  public void testWeightedSharing() {
+    scheds.add(new FakeSchedulable(100, 0, 2.0));
+    scheds.add(new FakeSchedulable(50,  0, 1.0));
+    scheds.add(new FakeSchedulable(30,  0, 1.0));
+    scheds.add(new FakeSchedulable(20,  0, 0.5));
+    SchedulingAlgorithms.computeFairShares(scheds, 45);
+    verifyShares(20, 10, 10, 5);
+  }
+
+  /**
+   * Weighted sharing test where pools 1 and 2 are now given lower demands than
+   * above. Pool 1 stops at 10 slots, leaving 35. If the remaining pools split
+   * this into a 1:1:0.5 ratio, they would get 14:14:7 slots respectively, but
+   * pool 2's demand is only 11, so it only gets 11. The remaining 2 pools split
+   * the 24 slots left into a 1:0.5 ratio, getting 16 and 8 slots respectively.
+   */
+  public void testWeightedSharingWithLowDemands() {
+    scheds.add(new FakeSchedulable(10, 0, 2.0));
+    scheds.add(new FakeSchedulable(11, 0, 1.0));
+    scheds.add(new FakeSchedulable(30, 0, 1.0));
+    scheds.add(new FakeSchedulable(20, 0, 0.5));
+    SchedulingAlgorithms.computeFairShares(scheds, 45);
+    verifyShares(10, 11, 16, 8);
+  }
+
+  /**
+   * Weighted fair sharing test with min shares. As in the min share test above,
+   * pool 1 has a min share greater than its demand so it only gets its demand.
+   * Pool 3 has a min share of 15 even though its weight is very small, so it
+   * gets 15 slots. The remaining pools share the remaining 20 slots equally,
+   * getting 10 each. Pool 3's min share of 5 slots doesn't affect this.
+   */
+  public void testWeightedSharingWithMinShares() {
+    scheds.add(new FakeSchedulable(10, 20, 2.0));
+    scheds.add(new FakeSchedulable(11, 0, 1.0));
+    scheds.add(new FakeSchedulable(30, 5, 1.0));
+    scheds.add(new FakeSchedulable(20, 15, 0.5));
+    SchedulingAlgorithms.computeFairShares(scheds, 45);
+    verifyShares(10, 10, 10, 15);
+  }
+
+  /**
+   * Test that shares are computed accurately even when there are many more
+   * frameworks than available slots.
+   */
+  public void testSmallShares() {
+    scheds.add(new FakeSchedulable(10));
+    scheds.add(new FakeSchedulable(5));
+    scheds.add(new FakeSchedulable(3));
+    scheds.add(new FakeSchedulable(2));
+    SchedulingAlgorithms.computeFairShares(scheds, 1);
+    verifyShares(0.25, 0.25, 0.25, 0.25);
+  }
+
+  /**
+   * Test that shares are computed accurately even when the number of slots is
+   * very large.
+   */  
+  public void testLargeShares() {
+    int million = 1000 * 1000;
+    scheds.add(new FakeSchedulable(100 * million));
+    scheds.add(new FakeSchedulable(50 * million));
+    scheds.add(new FakeSchedulable(30 * million));
+    scheds.add(new FakeSchedulable(20 * million));
+    SchedulingAlgorithms.computeFairShares(scheds, 40 * million);
+    verifyShares(10 * million, 10 * million, 10 * million, 10 * million);
+  }
+
+  /**
+   * Test that having a pool with 0 demand doesn't confuse the algorithm.
+   */
+  public void testZeroDemand() {
+    scheds.add(new FakeSchedulable(100));
+    scheds.add(new FakeSchedulable(50));
+    scheds.add(new FakeSchedulable(30));
+    scheds.add(new FakeSchedulable(0));
+    SchedulingAlgorithms.computeFairShares(scheds, 30);
+    verifyShares(10, 10, 10, 0);
+  }
+  
+  /**
+   * Test that being called on an empty list doesn't confuse the algorithm.
+   */
+  public void testEmptyList() {
+    SchedulingAlgorithms.computeFairShares(scheds, 40);
+    verifyShares();
+  }
+  
+  /**
+   * Check that a given list of shares have been assigned to this.scheds.
+   */
+  private void verifyShares(double... shares) {
+    assertEquals(scheds.size(), shares.length);
+    for (int i = 0; i < shares.length; i++) {
+      assertEquals(shares[i], scheds.get(i).getFairShare(), 0.01);
+    }
+  }
+}



Mime
View raw message