hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject svn commit: r804284 [1/4] - in /hadoop/mapreduce/trunk: ./ src/contrib/fairscheduler/designdoc/ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/docs/src/documentation/content...
Date Fri, 14 Aug 2009 16:32:05 GMT
Author: matei
Date: Fri Aug 14 16:32:04 2009
New Revision: 804284

URL: http://svn.apache.org/viewvc?rev=804284&view=rev
Log:
MAPREDUCE-706. Support for FIFO pools in the fair scheduler.


Added:
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/designdoc/
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/designdoc/fsdesigndoc.pdf   (with props)
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/designdoc/fsdesigndoc.tex
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Aug 14 16:32:04 2009
@@ -22,6 +22,9 @@
 
   NEW FEATURES
 
+    MAPREDUCE-706. Support for FIFO pools in the fair scheduler.
+    (Matei Zaharia)
+
     MAPREDUCE-546. Provide sample fair scheduler config file in conf/ and use
     it by default if no other config file is specified. (Matei Zaharia)
 

Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/designdoc/fsdesigndoc.pdf
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/designdoc/fsdesigndoc.pdf?rev=804284&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/mapreduce/trunk/src/contrib/fairscheduler/designdoc/fsdesigndoc.pdf
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/designdoc/fsdesigndoc.tex
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/designdoc/fsdesigndoc.tex?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/designdoc/fsdesigndoc.tex (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/designdoc/fsdesigndoc.tex Fri Aug 14 16:32:04 2009
@@ -0,0 +1,238 @@
+\documentclass[11pt]{article}
+\usepackage{geometry}
+\geometry{letterpaper}
+
+
+\begin{document}
+
+\title{Hadoop Fair Scheduler Design Document}
+\author{}
+\maketitle
+\tableofcontents
+
+\section{Introduction}
+
+The Hadoop Fair Scheduler started as a simple means to share MapReduce clusters. Over time, it has grown in functionality to support hierarchical scheduling, preemption, and multiple ways of organizing and weighing jobs. This document explains the goals and features of the Fair Scheduler and its internal design.
+
+\section{Fair Scheduler Goals}
+
+The Fair Scheduler was designed with four main goals:
+\begin{enumerate}
+  \item Run small jobs quickly even if they are sharing a cluster with large jobs. Unlike Hadoop's built-in FIFO scheduler, fair scheduling lets small jobs make progress even if a large job is running, without starving the large job.
+  \item Provide guaranteed service levels to ``production" jobs, to let them run alongside experimental jobs in a shared cluster.
+  \item Be simple to administer and configure. The scheduler should do something reasonable ``out of the box," and users should only need to configure it as they discover that they want to use more advanced features.
+  \item Support reconfiguration at runtime, without requiring a cluster restart.
+\end{enumerate}
+
+\section{Scheduler Features}
+
+This section provides a quick overview of the features of the Fair Scheduler. A detailed usage guide is available in the Hadoop documentation in {\tt build/docs/fair\_scheduler.html}.
+
+\subsection{Pools}
+
+The Fair Scheduler groups jobs into ``pools" and performs fair sharing between these pools. Each pool can use either FIFO or fair sharing to schedule jobs internal to the pool. The pool that a job is placed in is determined by a JobConf property, the ``pool name property". By default, this is {\tt user.name}, so that there is one pool per user. However, different properties can be used, e.g.~{\tt group.name} to have one pool per Unix group.
+
+A common trick is to set the pool name property to an unused property name such as {\tt pool.name} and make this default to {\tt user.name}, so that there is one pool per user but it is also possible to place jobs into ``special" pools by setting their {\tt pool.name} directly. The {\tt mapred-site.xml} snippet below shows how to do this:
+
+\begin{verbatim}
+<property>
+  <name>mapred.fairscheduler.poolnameproperty</name>
+  <value>pool.name</value>
+</property>
+
+<property>
+  <name>pool.name</name>
+  <value>${user.name}</value>
+</property>
+\end{verbatim}
+
+\subsection{Minimum Shares}
+
+Normally, active pools (those that contain jobs) will get equal shares of the map and reduce task slots in the cluster. However, it is also possible to set a \emph{minimum share} of map and reduce slots on a given pool, which is a number of slots that it will always get when it is active, even if its fair share would be below this number. This is useful for guaranteeing that production jobs get a certain desired level of service when sharing a cluster with non-production jobs. Minimum shares have three effects:
+\begin{enumerate}
+  \item The pool's fair share will always be at least as large as its minimum share. Slots are taken from the share of other pools to achieve this. The only exception is if the minimum shares of the active pools add up to more than the total number of slots in the cluster; in this case, each pool's share will be scaled down proportionally.
+  \item Pools whose running task count is below their minimum share get assigned slots first when slots are available.
+  \item It is possible to set a \emph{preemption timeout} on the pool after which, if it has not received enough task slots to meet its minimum share, it is allowed to kill tasks in other jobs to meet its share. Minimum shares with preemption timeouts thus act like SLAs.
+\end{enumerate}
+
+Note that when a pool is inactive (contains no jobs), its minimum share is not ``reserved" for it -- the slots are split up among the other pools.
+
+\subsection{Preemption}
+
+As explained above, the scheduler may kill tasks from a job in one pool in order to meet the minimum share of another pool. We call this preemption, although this usage of the word is somewhat strange given the normal definition of preemption as pausing; really it is the \emph{job} that gets preempted, while the task gets killed. The feature explained above is called \emph{min share preemption}. In addition, the scheduler supports \emph{fair share preemption}, to kill tasks when a pool's fair share is not being met. Fair share preemption is much more conservative than min share preemption, because pools without min shares are expected to be non-production jobs where some amount of unfairness is tolerable. In particular, fair share preemption activates if a pool has been below \emph{half} of its fair share for a configurable fair share preemption timeout, which is recommended to be set fairly high (e.g. 10 minutes).
+
+In both types of preemption, the scheduler kills the most recently launched tasks from over-scheduled pools, to minimize the amount of computation wasted by preemption.
+
+\subsection{Running Job Limits}
+
+The fair scheduler can limit the number of concurrently running jobs from each user and from each pool. This is useful for limiting the amount of intermediate data generated on the cluster. The jobs that will run are chosen in order of submit time and priority. Jobs submitted beyond the limit wait for one of the running jobs to finish.
+
+\subsection{Job Priorities}
+
+Within a pool, job priorities can be used to control the scheduling of jobs, whether the pool's internal scheduling mode is FIFO or fair sharing:
+\begin{itemize}
+  \item In FIFO pools, jobs are ordered first by priority and then by submit time, as in Hadoop's default scheduler.
+  \item In fair sharing pools, job priorities are used as weights to control how much share a job gets. The normal priority corresponds to a weight of 1.0, and each level gives 2x more weight. For example, a high-priority job gets a weight of 2.0, and will therefore get 2x the share of a normal-priority job. 
+\end{itemize}
+
+\subsection{Pool Weights}
+
+Pools can be given weights to achieve unequal sharing of the cluster. For example, a pool with weight 2.0 gets 2x the share of a pool with weight 1.0.
+
+\subsection{Delay Scheduling}
+
+The Fair Scheduler contains an algorithm called delay scheduling to improve data locality. Jobs that cannot launch a data-local map task wait for some period of time before they are allowed to launch non-data-local tasks, ensuring that they will run locally if some node in the cluster has the relevant data. Delay scheduling is described in detail in Section \ref{sec:delay-scheduling}.
+
+\subsection{Administration}
+
+The Fair Scheduler includes a web UI displaying the active pools and jobs and their fair shares, moving jobs between pools, and changing job priorities.
+In addition, the Fair Scheduler's allocation file (specifying min shares and preemption timeouts for the pools) is automatically reloaded if it is modified on disk, to allow runtime reconfiguration.
+
+\section{Implementation}
+
+\subsection{Hadoop Scheduling Background}
+
+Hadoop jobs consist of a number of map and reduce \emph{tasks}. These task run in \emph{slots} on the nodes on the cluster. Each node is configured with a number of map slots and reduce slots based on its computational resources (typically one slot per core). The role of the scheduler is to assign tasks to any slots that are free.
+
+All schedulers in Hadoop, including the Fair Scheduler, inherit from the {\tt TaskScheduler} abstract class. This class provides access to a {\tt TaskTrackerManager} -- an interface to the JobTracker -- as well as a {\tt Configuration} instance. It also ask the scheduler to implement three abstract methods: the lifecycle methods {\tt start} and {\tt terminate}, and a method called {\tt assignTasks} to launch tasks on a given TaskTracker.
+Task assignment in Hadoop is reactive. TaskTrackers periodically send heartbeats to the JobTracker with their {\tt TaskTrackerStatus}, which contains a list of running tasks, the number of slots on the node, and other information. The JobTracker then calls {\tt assignTasks} on the scheduler to obtain tasks to launch. These are returned with the heartbeat response.
+
+Apart from reacting to heartbeats through {\tt assignTasks}, schedulers can also be notified when jobs have been submitted to the cluster, killed, or removed by adding listeners to the {\tt TaskTrackerManager}. The Fair Scheduler sets up these listeners in its {\tt start} method. An important role of the listeners is to initialize jobs that are submitted -- until a job is initialized, it cannot launch tasks. The Fair Scheduler currently initializes all jobs right away, but it may also be desirable to hold off initializing jobs if too many are submitted to limit memory usage on the JobTracker.
+
+Selection of tasks \emph{within} a job is mostly done by the {\tt JobInProgress} class, and not by individual schedulers. {\tt JobInProgress} exposes two methods, {\tt obtainNewMapTask} and {\tt obtainNewReduceTask}, to launch a task of either type. Both methods may either return a {\tt Task} object or {\tt null} if the job does not wish to launch a task. Whether a job wishes to launch a task may change back and forth during its lifetime. Even after all tasks in the job have been started, the job may wish to run another task for speculative execution. In addition, if the node containing a map task failed, the job will wish to re-run it to rebuild its output for use in the reduce tasks. Schedulers may therefore need to poll multiple jobs until they find one with a task to run.
+
+Finally, for map tasks, an important scheduling criterion is data locality: running the task on a node or rack that contains its input data. Normally, {\tt JobInProgress.obtainNewMapTask} returns the ``closest" map task to a given node. However, to give schedulers slightly more control over data locality, there is also a version of {\tt obtainNewMapTask} that allow the scheduler to cap the level of non-locality allowed for the task (e.g.~request a task only on the same node, or {\tt null} if none is available). The Fair Scheduler uses this method with an algorithm called delay scheduling (Section \ref{sec:delay-scheduling}) to optimize data locality.
+
+\subsection{Fair Scheduler Basics}
+
+At a high level, the Fair Scheduler uses hierarchical scheduling to assign tasks. First it selects a pool to assign a task to according to the fair sharing algorithm in Section \ref{sec:fair-sharing-alg}. Then it asks the pool obtain a task. The pool chooses among its jobs according to its internal scheduling order (FIFO or fair sharing).
+
+In fact, because jobs might not have tasks to launch ({\tt obtainNew(Map|Reduce)Task} can return null), the scheduler actually establishes an ordering on jobs and asks them for tasks in turn. Within a pool, jobs are sorted either by priority and start time (for FIFO) or by distance below fair share. If the first job in the ordering does not have a task to launch, the pool will ask the second, third, etc jobs. Pools themselves are sorted by distance below min share and fair share, so if the first pool does not have any jobs that can launch tasks, the second pool is asked, etc. This makes it straightforward to implement features like delay scheduling (Section \ref{sec:delay-scheduling}) that may cause jobs to ``pass" on a slot.
+
+Apart from the assign tasks code path, the Fair Scheduler also has a periodic update thread that calls {\tt update} every few seconds. This thread is responsible for recomputing fair shares to display them on the UI (Section \ref{sec:fair-share-computation}), checking whether jobs need to be preempted (Section \ref{sec:preemption}), and checking whether the allocations file has changed to reload pool allocations (through {\tt PoolManager}).
+
+\subsection{The {\tt Schedulable} Class}
+
+To allow the same fair sharing algorithm to be used both between pools and within a pool, the Fair Scheduler uses an abstract class called {\tt Schedulable} to represent both pools and jobs. Its subclasses for these roles are {\tt PoolSchedulable} and {\tt JobSchedulable}. A {\tt Schedulable} is responsible for three roles:
+\begin{enumerate}
+  \item It can be asked to obtain a task through {\tt assignTask}. This may return {\tt null} if the {\tt Schedulable} has no tasks to launch.
+  \item It can be queried for information about the pool/job to use in scheduling, such as:
+  \begin{itemize}
+    \item Number of running tasks.
+    \item Demand (number of tasks the {\tt Schedulable} \emph{wants} to run; this is equal to number of running tasks + number of unlaunched tasks).
+    \item Min share assigned through config file.
+    \item Weight (for fair sharing).
+    \item Priority and start time (for FIFO scheduling).
+  \end{itemize}
+  \item It can be assigned a fair share through {\tt setFairShare}.
+\end{enumerate}
+
+There are separate {\tt Schedulable}s for map and reduce tasks, to make it possible to use the same algorithm on both types of tasks.
+
+\subsection{Fair Sharing Algorithm}
+\label{sec:fair-sharing-alg}
+
+A simple way to achieve fair sharing is the following: whenever a slot is available, assign it to the pool that has the fewest running tasks. This will ensure that all pool get an equal number of slots, unless a pool's demand is less than its fair share, in which case the extra slots are divided evenly among the other pools. Two features of the Fair Scheduler complicate this algorithm a little:
+\begin{itemize}
+  \item Pool weights mean that some pools should get more slots than others. For example, a pool with weight 2 should get 2x more slots than a pool with weight 1. This is accomplished by changing the scheduling rule to ``assign the slot to the pool whose value of $runningTasks/weight$ is smallest."
+  \item Minimum shares mean that pools below their min share should get slots first. When we sort pools to choose which ones to schedule next, we place pools below their min share ahead of pools above their min share. We order the pools below their min share by how far they are below it as a percentage of the share.
+\end{itemize}
+
+This fair sharing algorithm is implemented in {\tt FairShareComparator} in the {\tt SchedulingAlgorithms} class. The comparator orders jobs by distance below min share and then by $runningTasks/weight$.
+
+\subsection{Preemption}
+\label{sec:preemption}
+
+To determine when to preempt tasks, the Fair Schedulers maintains two values for each {\tt PoolSchedulable}: the last time when the pool was at its min share, and the last time when the pool was at half its fair share. These conditions are checked periodically by the update thread in {\tt FairScheduler.updatePreemptionVariables}, using the methods {\tt isStarvedForMinShare} and {\tt isStarvedForFairShare}. These methods also take into account the demand of the pool, so that a pool is not counted as starving if its demand is below its min/fair share but is otherwise met.
+
+When preempting tasks, the scheduler kills the most recently launched tasks from over-scheduled pools. This minimizes the amount of computation wasted by preemption and ensures that all jobs can eventually finish (it is as if the preempted jobs just never got their last few slots). The tasks are chosen and preempted in {\tt preemptTasks}.
+
+Note that for min share preemption, it is clear when a pool is below its min share because the min share is given as a number of slots, but for fair share preemption, we must be able to compute a pool's fair share to determine when it is being starved. This computation is trickier than dividing the number of slots by the number of pools due to weights, min shares and demands. Section \ref{sec:fair-share-computation} explains how fair shares are computed.
+
+\subsection{Fair Share Computation}
+\label{sec:fair-share-computation}
+
+The scheduling algorithm in Section \ref{sec:fair-sharing-alg} achieves fair shares without actually needing to compute pools' numerical shares beforehand. However, for preemption and for displaying shares in the Web UI, we want to know what a pool's fair share is even if the pool is not currently at its share. That is, we want to know how many slots the pool \emph{would} get if we started with all slots being empty and ran the algorithm in Section \ref{sec:fair-sharing-alg} until we filled them.
+One way to compute these shares would be to simulate starting out with empty slots and calling {\tt assignTasks} repeatedly until they filled, but this is expensive, because each scheduling decision takes $O(numJobs)$ time and we need to make $O(numSlots)$ decisions.
+
+To compute fair shares efficiently, the Fair Scheduler includes an algorithm based on binary search in {\tt SchedulingAlgorithms.computeFairShares}. This algorithm is based on the following observation. If all slots had been assigned according to weighted fair sharing respecting pools' demands and min shares, then there would exist a ratio $r$ such that:
+\begin{enumerate}
+  \item Pools whose demand $d_i$ is less than $r w_i$ (where $w_i$ is the weight of the pool) are assigned $d_i$ slots.
+  \item Pools whose min share $m_i$ is more than $r w_i$ are assigned $\min(m_i, d_i)$ slots.
+  \item All other pools are assigned $r w_i$ slots.
+  \item The pools' shares sum up to the total number of slots $t$.
+\end{enumerate}
+
+The Fair Scheduler uses binary search to compute the correct $r$. We define a function $f(r)$ as the number of slots that would be used for a given $r$ if conditions 1-3 above were met, and then find a value of $r$ that makes $f(r)=t$. More precisely, $f(r)$ is defined as:
+$$f(r) = \sum_i{\min(d_i, \max(r w_i, m_i)).}$$
+
+Note that $f(r)$ is increasing in $r$ because every term of the sum is increasing, so the equation $f(r) = t$ can be solved by binary search. We choose 0 as a lower bound of our binary search because with $r=0$, only min shares are assigned. (An earlier check in {\tt computeFairShares} checks whether the min shares add up to more than the total number of slots, and if so, computes fair shares by scaling down the min shares proportionally and returns.) To compute an upper bound for the binary search, we try $r=1,2,4,8,\dots$ until we find a value large enough that either more than $t$ slots are used or all pools' demands are met (in case the demands added up to less than $t$).
+
+The steps of the algorithm are explained in detail in {\tt SchedulingAlgorithms.java}.
+
+This algorithm runs in time $O(NP)$, where $N$ is the number of jobs/pools and $P$ is the desired number of bits of precision in the computed values (number of iterations of binary search), which we've set to 25. It thus scales linearly in the number of jobs and pools.
+
+\subsection{Running Job Limits}
+
+Running job limits are implemented by marking jobs as not runnable if there are too many jobs submitted by the same user or pool. This is done in {\tt FairScheduler.updateRunnability}. A job that is not runnable declares its demand as 0 and always returns {\tt null} from {\tt assignTasks}.
+
+\subsection{Delay Scheduling}
+\label{sec:delay-scheduling}
+
+In Hadoop, running map tasks on the nodes or racks that contain their input data is critical for performance, because it avoids shipping the data over the network. However, always assigning slots to the first job in order of pool shares and in-pool ordering (the ``head-of-line job") can sometimes lead to poor locality:
+\begin{itemize}
+  \item If the head-of-line job is small, the chance of it having data on the node that a heartbeat was received from is small. Therefore, locality would be poor in a small-job workload if we always assigned slots to the head-of-line job.
+  \item When fair sharing is used, there is a strong bias for a job to be reassigned into a slot that it just finished a task in, because when it finishes the task, the job falls below its fair share. This can mean that jobs have a difficult time running in slots that other jobs have taken and thus achieve poor locality.
+\end{itemize}
+
+To deal with both of these situations, the Fair Scheduler can sacrifice fairness temporarily to improve locality through an algorithm called delay scheduling. If the head-of-line job cannot launch a local task on the TaskTracker that sent a heartbeat, then it is skipped, and other running jobs are looked at in order of pool shares and in-pool scheduling rules to find a job with a local task. However, if the head-of-line job has been skipped for a sufficiently long time, it is allowed to launch rack-local tasks. Then, if it is skipped for a longer time, it is also allowed to launch off-rack tasks. These skip times are called locality delays. Delays of a few seconds are sufficient to drastically increase locality.
+
+The Fair Scheduler allows locality delays to be set through {\tt mapred-site.xml} or to be turned off by setting them to zero. However, by default, it computes the delay automatically based on the heartbeat interval of the cluster. The delay is set to 1.5x the heartbeat interval.
+
+When a job that has been allowed to launch non-local tasks ends up launching a local task again, its ``locality level" resets and it must wait again before launching non-local tasks. This is done so that a job that gets ``unlucky" early in its lifetime does not continue to launch non-local tasks throughout its life.
+
+Delay scheduling is implemented by keeping track of two variables on each job: the locality level of the last map it launched (0 for node-local, 1 for rack-local and 2 for off-rack) and the time it has spent being skipped for a task. These are kept in a {\tt JobInfo} structure associated with each job in {\tt FairScheduler.java}. Whenever a job is asked for tasks, it checks the locality level it is allowed to launch them at through {\tt FairScheduler.getAllowedLocalityLevel}. If it does not launch a task, it is marked as ``visited" on that heartbeat by appending itself to a {\tt visited} job list that is passed around between calls to {\tt assignTasks} on the same heartbeat. Jobs that are visited on a heartbeat but do not launch any tasks during it are considered as skipped for the time interval between this heartbeat and the next. Code at the beginning of {\tt FairScheduler.assignTasks} increments the wait time of each skipped job by the time elapsed since the last heartbea
 t. Once a job has been skipped for more than the locality delay, {\tt getAllowedLocalityLevel} starts returning higher locality so that it is allowed to launch less-local tasks. Whenever the job launches a task, its wait time is reset, but we remember the locality level of the launched task so that the job is allowed to launch more tasks at this level without further waiting.
+
+\subsection{Locking Order}
+
+Fair Scheduler data structures can be touched by several threads. Most commonly, the JobTracker invokes {\tt assignTasks}. This happens inside a block of code where the JobTracker has locked itself already. Therefore, to prevent deadlocks, we always ensure that \emph{if both the FairScheduler and the JobTracker must be locked, the JobTracker is locked first}. Other threads that can lock the FairScheduler include the update thread and the web UI.
+
+\subsection{Unit Tests}
+
+The Fair Scheduler contains extensive unit tests using mock {\tt TaskTrackerManager}, {\tt JobInProgress}, {\tt TaskInProgress}, and {\tt Schedulable} objects. Scheduler tests are in {\tt TestFairScheduler.java}. The {\tt computeFairShares} algorithm is tested separately in {\tt TestComputeFairShares.java}. All tests use accelerated time via a fake {\tt Clock} class.
+
+\pagebreak
+\section{Code Guide}
+
+The following table lists some key source files in the Fair Scheduler:
+
+\begin{center}
+\begin{tabular}{|l|p{0.7\columnwidth}|}
+  \hline
+  {\bf File} & {\bf Contents} 
+  \\ \hline
+  {\tt FairScheduler.java} & Scheduler entry point. Also contains update thread, and logic for preemption, delay scheduling, and running job limits.
+  \\ \hline
+  {\tt Schedulable.java} & Definition of the {\tt Schedulable} class. Extended by {\tt JobSchedulable} and {\tt PoolSchedulable}.
+  \\ \hline
+  {\tt SchedulingAlgorithms.java} & Contains FIFO and fair sharing comparators, as well as the {\tt computeFairShares} algorithm in Section \ref{sec:fair-share-computation}.
+  \\ \hline
+  {\tt PoolManager.java} & Reads pool properties from the allocation file and maintains a collection of {\tt Pool} objects. Pools are created on demand.
+  \\ \hline
+  {\tt Pool.java} & Represents a pool and stores its map and reduce {\tt Schedulables}.
+  \\ \hline
+  {\tt FairSchedulerServlet.java} & Implements the scheduler's web UI.
+  \\ \hline
+  {\tt FairSchedulerEventLog.java} & An easy-to-parse event log for debugging. Must be enabled through {\tt mapred.fairscheduler.eventlog.enabled}.
+  If enabled, logs are placed in {\tt \$HADOOP\_LOG\_DIR/fairscheduler}.
+  \\ \hline
+  {\tt TaskSelector.java} & A pluggable class responsible for picking tasks within a job. Currently, {\tt DefaultTaskSelector} delegates to {\tt JobInProgress}, but this would be a useful place to experiment with new algorithms for speculative execution and locality.
+  \\ \hline
+  {\tt LoadManager.java} & A pluggable class responsible for determining when to launch more tasks on a TaskTracker. Currently, {\tt CapBasedLoadManager} uses slot counts, but this would be a useful place to experiment with scheduling based on machine load.
+  \\ \hline
+  {\tt WeightAdjuster.java} & A pluggable class responsible for setting job weights. An example, {\tt NewJobWeightBooster}, is provided, which increases weight temporarily for new jobs.
+  \\ \hline
+\end{tabular}
+\end{center}
+
+\end{document}
\ No newline at end of file

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java Fri Aug 14 16:32:04 2009
@@ -55,12 +55,12 @@
   }
 
   @Override
-  public Task obtainNewMapTask(TaskTrackerStatus taskTracker, JobInProgress job)
-      throws IOException {
+  public Task obtainNewMapTask(TaskTrackerStatus taskTracker, JobInProgress job,
+      int localityLevel) throws IOException {
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     int numTaskTrackers = clusterStatus.getTaskTrackers();
     return job.obtainNewMapTask(taskTracker, numTaskTrackers,
-        taskTrackerManager.getNumberOfUniqueHosts());
+        taskTrackerManager.getNumberOfUniqueHosts(), localityLevel);
   }
 
   @Override

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Fri Aug 14 16:32:04 2009
@@ -25,19 +25,17 @@
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * A {@link TaskScheduler} that implements fair sharing.
@@ -60,6 +58,9 @@
   private static final TaskType[] MAP_AND_REDUCE = 
     new TaskType[] {TaskType.MAP, TaskType.REDUCE};
   
+  // Maximum locality delay when auto-computing locality delays
+  private static final long MAX_AUTOCOMPUTED_LOCALITY_DELAY = 15000;
+  
   protected PoolManager poolMgr;
   protected LoadManager loadMgr;
   protected TaskSelector taskSelector;
@@ -69,8 +70,12 @@
   protected long lastUpdateTime;           // Time when we last updated infos
   protected boolean initialized;  // Are we initialized?
   protected volatile boolean running; // Are we running?
-  protected boolean useFifo;      // Set if we want to revert to FIFO behavior
   protected boolean assignMultiple; // Simultaneously assign map and reduce?
+  protected int mapAssignCap = -1;    // Max maps to launch per heartbeat
+  protected int reduceAssignCap = -1; // Max reduces to launch per heartbeat
+  protected long localityDelay;       // Time to wait for node and rack locality
+  protected boolean autoComputeLocalityDelay = false; // Compute locality delay
+                                                      // from heartbeat interval
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected boolean waitForMapsBeforeLaunchingReduces = true;
   protected boolean preemptionEnabled;
@@ -92,29 +97,17 @@
    */
   static class JobInfo {
     boolean runnable = false;   // Can the job run given user/pool limits?
-    double mapWeight = 0;       // Weight of job in calculation of map share
-    double reduceWeight = 0;    // Weight of job in calculation of reduce share
-    long mapDeficit = 0;        // Time deficit for maps
-    long reduceDeficit = 0;     // Time deficit for reduces
-    int runningMaps = 0;        // Maps running at last update
-    int runningReduces = 0;     // Reduces running at last update
-    int neededMaps;             // Maps needed at last update
-    int neededReduces;          // Reduces needed at last update
-    int minMaps = 0;            // Minimum maps as guaranteed by pool
-    int minReduces = 0;         // Minimum reduces as guaranteed by pool
-    double mapFairShare = 0;    // Fair share of map slots at last update
-    double reduceFairShare = 0; // Fair share of reduce slots at last update
-    // Variables used for preemption
-    long lastTimeAtMapMinShare;      // When was the job last at its min maps?
-    long lastTimeAtReduceMinShare;   // Similar for reduces.
-    long lastTimeAtMapHalfFairShare; // When was the job last at half fair maps?
-    long lastTimeAtReduceHalfFairShare;  // Similar for reduces.
-    
-    public JobInfo(long currentTime) {
-      lastTimeAtMapMinShare = currentTime;
-      lastTimeAtReduceMinShare = currentTime;
-      lastTimeAtMapHalfFairShare = currentTime;
-      lastTimeAtReduceHalfFairShare = currentTime;
+    public JobSchedulable mapSchedulable;
+    public JobSchedulable reduceSchedulable;
+    // Variables used for delay scheduling
+    LocalityLevel lastMapLocalityLevel; // Locality level of last map launched
+    long timeWaitedForLocalMap; // Time waiting for local map since last map
+    boolean skippedAtLastHeartbeat;  // Was job skipped at previous assignTasks?
+                                     // (used to update timeWaitedForLocalMap)
+    public JobInfo(JobSchedulable mapSched, JobSchedulable reduceSched) {
+      this.mapSchedulable = mapSched;
+      this.reduceSchedulable = reduceSched;
+      this.lastMapLocalityLevel = LocalityLevel.NODE;
     }
   }
   
@@ -154,7 +147,8 @@
         eagerInitListener.start();
         taskTrackerManager.addJobInProgressListener(eagerInitListener);
       }
-      poolMgr = new PoolManager(conf);
+      poolMgr = new PoolManager(this);
+      poolMgr.initialize();
       loadMgr = (LoadManager) ReflectionUtils.newInstance(
           conf.getClass("mapred.fairscheduler.loadmanager", 
               CapBasedLoadManager.class, LoadManager.class), conf);
@@ -180,12 +174,20 @@
           "mapred.fairscheduler.preemption.interval", 15000);
       assignMultiple = conf.getBoolean(
           "mapred.fairscheduler.assignmultiple", true);
+      mapAssignCap = conf.getInt(
+          "mapred.fairscheduler.assignmultiple.maps", -1);
+      reduceAssignCap = conf.getInt(
+          "mapred.fairscheduler.assignmultiple.reduces", -1);
       sizeBasedWeight = conf.getBoolean(
           "mapred.fairscheduler.sizebasedweight", false);
       preemptionEnabled = conf.getBoolean(
           "mapred.fairscheduler.preemption", false);
       onlyLogPreemption = conf.getBoolean(
           "mapred.fairscheduler.preemption.only.log", false);
+      localityDelay = conf.getLong(
+          "mapred.fairscheduler.locality.delay", -1);
+      if (localityDelay == -1)
+        autoComputeLocalityDelay = true; // Compute from heartbeat interval
       initialized = true;
       running = true;
       lastUpdateTime = clock.getTime();
@@ -231,9 +233,10 @@
     public void jobAdded(JobInProgress job) {
       synchronized (FairScheduler.this) {
         eventLog.log("JOB_ADDED", job.getJobID());
-        poolMgr.addJob(job);
-        JobInfo info = new JobInfo(clock.getTime());
+        JobInfo info = new JobInfo(new JobSchedulable(FairScheduler.this, job, TaskType.MAP),
+            new JobSchedulable(FairScheduler.this, job, TaskType.REDUCE));
         infos.put(job, info);
+        poolMgr.addJob(job); // Also adds job into the right PoolScheduable
         update();
       }
     }
@@ -283,20 +286,18 @@
       return null;
     String trackerName = tracker.getTrackerName();
     eventLog.log("HEARTBEAT", trackerName);
-    
-    // Reload allocations file if it hasn't been loaded in a while
-    poolMgr.reloadAllocsIfNecessary();
+    long currentTime = clock.getTime();
     
     // Compute total runnable maps and reduces, and currently running ones
     int runnableMaps = 0;
     int runningMaps = 0;
     int runnableReduces = 0;
     int runningReduces = 0;
-    for (JobInProgress job: infos.keySet()) {
-      runnableMaps += runnableTasks(job, TaskType.MAP);
-      runningMaps += runningTasks(job, TaskType.MAP);
-      runnableReduces += runnableTasks(job, TaskType.REDUCE);
-      runningReduces += runningTasks(job, TaskType.REDUCE);
+    for (Pool pool: poolMgr.getPools()) {
+      runnableMaps += pool.getMapSchedulable().getDemand();
+      runningMaps += pool.getMapSchedulable().getRunningTasks();
+      runnableReduces += pool.getReduceSchedulable().getDemand();
+      runningReduces += pool.getReduceSchedulable().getRunningTasks();
     }
 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
@@ -309,108 +310,174 @@
     eventLog.log("RUNNABLE_TASKS", 
         runnableMaps, runningMaps, runnableReduces, runningReduces);
 
-    TaskTrackerStatus trackerStatus = tracker.getStatus();
+    // Update time waited for local maps for jobs skipped on last heartbeat
+    updateLocalityWaitTimes(currentTime);
+    
+    TaskTrackerStatus tts = tracker.getStatus();
 
     // Scan to see whether any job needs to run a map, then a reduce
     ArrayList<Task> tasks = new ArrayList<Task>();
     for (TaskType taskType: MAP_AND_REDUCE) {
-      // Continue if all runnable tasks of this type are already running
-      if (taskType == TaskType.MAP && runningMaps == runnableMaps ||
-          taskType == TaskType.REDUCE && runningReduces == runnableReduces)
-        continue;
-      // Continue if the node can't support another task of the given type
-      boolean canAssign = (taskType == TaskType.MAP) ? 
-          loadMgr.canAssignMap(trackerStatus, runnableMaps, totalMapSlots) :
-          loadMgr.canAssignReduce(trackerStatus, runnableReduces, totalReduceSlots);
-      if (canAssign) {
-        // Figure out the jobs that need this type of task
-        List<JobInProgress> candidates = new ArrayList<JobInProgress>();
-        for (JobInProgress job: infos.keySet()) {
-          if (job.getStatus().getRunState() == JobStatus.RUNNING && 
-              neededTasks(job, taskType) > 0) {
-            candidates.add(job);
-          }
+      // Keep track of which jobs were visited and which had tasks launched,
+      // so that we can later mark skipped jobs for delay scheduling
+      Set<JobInProgress> visited = new HashSet<JobInProgress>();
+      Set<JobInProgress> launched = new HashSet<JobInProgress>();
+      // Compute a maximum number of tasks to assign on this task tracker
+      int cap = maxTasksToAssign(taskType, tts);
+      // Assign up to cap tasks
+      for (int i = 0; i < cap; i++) {
+        // Break if all runnable tasks of this type are already running
+        if (taskType == TaskType.MAP && runningMaps == runnableMaps ||
+            taskType == TaskType.REDUCE && runningReduces == runnableReduces)
+          break;
+        // Break if the node can't support another task of this type
+        boolean canAssign = (taskType == TaskType.MAP) ? 
+            loadMgr.canAssignMap(tts, runnableMaps, totalMapSlots) :
+            loadMgr.canAssignReduce(tts, runnableReduces, totalReduceSlots);
+        if (canAssign) {
+          // Get the map or reduce schedulables and sort them by fair sharing
+          List<PoolSchedulable> scheds = getPoolSchedulables(taskType);
+          Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
+          for (Schedulable sched: scheds) {
+            eventLog.log("INFO", "Checking for " + taskType + 
+                " task in " + sched.getName());
+            Task task = sched.assignTask(tts, currentTime, visited);
+            if (task != null) {
+              JobInProgress job = taskTrackerManager.getJob(task.getJobID());
+              eventLog.log("ASSIGN", trackerName, taskType,
+                  job.getJobID(), task.getTaskID());
+              launched.add(job);
+              // Update running task counts, and the job's locality level
+              if (taskType == TaskType.MAP) {
+                runningMaps++;
+                updateLastMapLocalityLevel(job, task, tts);
+              } else {
+                runningReduces++;
+              }
+              // Add task to the list of assignments
+              tasks.add(task);
+              break;
+            } // end if(task != null)
+          } // end for(Schedulable sched: scheds)
+        } else {
+          eventLog.log("INFO", "Can't assign another " + taskType +
+              " to " + trackerName);
+          break;
         }
-        // Sort jobs by deficit (for Fair Sharing) or submit time (for FIFO)
-        Comparator<JobInProgress> comparator = useFifo ?
-            new FifoJobComparator() : new DeficitComparator(taskType);
-        Collections.sort(candidates, comparator);
-        for (JobInProgress job: candidates) {
-          eventLog.log("INFO", 
-              "Checking for " + taskType + " task in " + job.getJobID());
-          Task task;
-          if (taskType == TaskType.MAP) {
-            task = taskSelector.obtainNewMapTask(trackerStatus, job);
-          } else {
-            task = taskSelector.obtainNewReduceTask(trackerStatus, job);
-          }
-          if (task != null) {
-            eventLog.log("ASSIGN", trackerName, taskType,
-                job.getJobID(), task.getTaskID());
-            // Update the JobInfo for this job so we account for the launched
-            // tasks during this update interval and don't try to launch more
-            // tasks than the job needed on future heartbeats
-            JobInfo info = infos.get(job);
-            if (taskType == TaskType.MAP) {
-              info.runningMaps++;
-              info.neededMaps--;
-            } else {
-              info.runningReduces++;
-              info.neededReduces--;
-            }
-            // Add task to the list of assignments
-            tasks.add(task);
-            // If not allowed to assign multiple tasks per heartbeat, return
-            if (!assignMultiple)
-              return tasks;
-            break;
+      } // end for(i = 0; i < cap; i++)
+      // If we were assigning maps, mark any jobs that were visited but
+      // did not launch a task as skipped on this heartbeat
+      if (taskType == TaskType.MAP) {
+        for (JobInProgress job: visited) {
+          if (!launched.contains(job)) {
+            infos.get(job).skippedAtLastHeartbeat = true;
           }
         }
-      } else {
-        eventLog.log("INFO", 
-            "Can't assign another " + taskType + " to " + trackerName);
       }
-    }
+      // Return if assignMultiple was disabled and we found a task
+      if (!assignMultiple && tasks.size() > 0)
+        return tasks;
+    } // end for(TaskType taskType: MAP_AND_REDUCE)
     
     // If no tasks were found, return null
     return tasks.isEmpty() ? null : tasks;
   }
 
   /**
-   * Compare jobs by deficit for a given task type, putting jobs whose current
-   * allocation is less than their minimum share always ahead of others. This is
-   * the default job comparator used for Fair Sharing.
-   */
-  private class DeficitComparator implements Comparator<JobInProgress> {
-    private final TaskType taskType;
+   * Get maximum number of tasks to assign on a TaskTracker on a heartbeat.
+   * The scheduler may launch fewer than this many tasks if the LoadManager
+   * says not to launch more, but it will never launch more than this number.
+   */
+  private int maxTasksToAssign(TaskType type, TaskTrackerStatus tts) {
+    if (!assignMultiple)
+      return 1;
+    int cap = (type == TaskType.MAP) ? mapAssignCap : reduceAssignCap;
+    if (cap == -1) // Infinite cap; use the TaskTracker's slot count
+      return (type == TaskType.MAP) ?
+          tts.getAvailableMapSlots(): tts.getAvailableReduceSlots();
+    else
+      return cap;
+  }
 
-    private DeficitComparator(TaskType taskType) {
-      this.taskType = taskType;
+  /**
+   * Update locality wait times for jobs that were skipped at last heartbeat.
+   */
+  private void updateLocalityWaitTimes(long currentTime) {
+    long timeSinceLastHeartbeat = 
+      (lastHeartbeatTime == 0 ? 0 : currentTime - lastHeartbeatTime);
+    lastHeartbeatTime = currentTime;
+    for (JobInfo info: infos.values()) {
+      if (info.skippedAtLastHeartbeat) {
+        info.timeWaitedForLocalMap += timeSinceLastHeartbeat;
+        info.skippedAtLastHeartbeat = false;
+      }
     }
+  }
 
-    public int compare(JobInProgress j1, JobInProgress j2) {
-      // Put needy jobs ahead of non-needy jobs (where needy means must receive
-      // new tasks to meet slot minimum), comparing among jobs of the same type
-      // by deficit so as to put jobs with higher deficit ahead.
-      JobInfo j1Info = infos.get(j1);
-      JobInfo j2Info = infos.get(j2);
-      double deficitDif;
-      boolean j1Needy, j2Needy;
-      if (taskType == TaskType.MAP) {
-        j1Needy = j1.runningMaps() < Math.floor(j1Info.minMaps);
-        j2Needy = j2.runningMaps() < Math.floor(j2Info.minMaps);
-        deficitDif = j2Info.mapDeficit - j1Info.mapDeficit;
-      } else {
-        j1Needy = j1.runningReduces() < Math.floor(j1Info.minReduces);
-        j2Needy = j2.runningReduces() < Math.floor(j2Info.minReduces);
-        deficitDif = j2Info.reduceDeficit - j1Info.reduceDeficit;
-      }
-      if (j1Needy && !j2Needy)
-        return -1;
-      else if (j2Needy && !j1Needy)
-        return 1;
-      else // Both needy or both non-needy; compare by deficit
-        return (int) Math.signum(deficitDif);
+  /**
+   * Update a job's locality level and locality wait variables given that that 
+   * it has just launched a map task on a given task tracker.
+   */
+  private void updateLastMapLocalityLevel(JobInProgress job,
+      Task mapTaskLaunched, TaskTrackerStatus tracker) {
+    JobInfo info = infos.get(job);
+    LocalityLevel localityLevel = LocalityLevel.fromTask(
+        job, mapTaskLaunched, tracker);
+    info.lastMapLocalityLevel = localityLevel;
+    info.timeWaitedForLocalMap = 0;
+    eventLog.log("ASSIGNED_LOC_LEVEL", job.getJobID(), localityLevel);
+  }
+
+  /**
+   * Get the maximum locality level at which a given job is allowed to
+   * launch tasks, based on how long it has been waiting for local tasks.
+   * This is used to implement the "delay scheduling" feature of the Fair
+   * Scheduler for optimizing data locality.
+   * If the job has no locality information (e.g. it does not use HDFS), this 
+   * method returns LocalityLevel.ANY, allowing tasks at any level.
+   * Otherwise, the job can only launch tasks at its current locality level
+   * or lower, unless it has waited at least localityDelay milliseconds
+   * (in which case it can go one level beyond) or 2 * localityDelay millis
+   * (in which case it can go to any level).
+   */
+  protected LocalityLevel getAllowedLocalityLevel(JobInProgress job,
+      long currentTime) {
+    JobInfo info = infos.get(job);
+    if (info == null) { // Job not in infos (shouldn't happen)
+      LOG.error("getAllowedLocalityLevel called on job " + job
+          + ", which does not have a JobInfo in infos");
+      return LocalityLevel.ANY;
+    }
+    if (job.nonLocalMaps.size() > 0) { // Job doesn't have locality information
+      return LocalityLevel.ANY;
+    }
+    // Don't wait for locality if the job's pool is starving for maps
+    Pool pool = poolMgr.getPool(job);
+    PoolSchedulable sched = pool.getMapSchedulable();
+    long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool.getName());
+    long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout();
+    if (currentTime - sched.getLastTimeAtMinShare() > minShareTimeout ||
+        currentTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
+      eventLog.log("INFO", "No delay scheduling for "
+          + job.getJobID() + " because it is being starved");
+      return LocalityLevel.ANY;
+    }
+    // In the common case, compute locality level based on time waited
+    switch(info.lastMapLocalityLevel) {
+    case NODE: // Last task launched was node-local
+      if (info.timeWaitedForLocalMap >= 2 * localityDelay)
+        return LocalityLevel.ANY;
+      else if (info.timeWaitedForLocalMap >= localityDelay)
+        return LocalityLevel.RACK;
+      else
+        return LocalityLevel.NODE;
+    case RACK: // Last task launched was rack-local
+      if (info.timeWaitedForLocalMap >= localityDelay)
+        return LocalityLevel.ANY;
+      else
+        return LocalityLevel.RACK;
+    default: // Last task was non-local; can launch anywhere
+      return LocalityLevel.ANY;
     }
   }
   
@@ -420,11 +487,24 @@
    * and needed tasks of each type. 
    */
   protected void update() {
-    //Making more granual locking so that clusterStatus can be fetched from Jobtracker.
+    // Making more granular locking so that clusterStatus can be fetched 
+    // from Jobtracker without locking the scheduler.
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
-    // Got clusterStatus hence acquiring scheduler lock now
-    // Remove non-running jobs
-    synchronized(this){
+    
+    // Recompute locality delay from JobTracker heartbeat interval if enabled.
+    // This will also lock the JT, so do it outside of a fair scheduler lock.
+    if (autoComputeLocalityDelay) {
+      JobTracker jobTracker = (JobTracker) taskTrackerManager;
+      localityDelay = Math.min(MAX_AUTOCOMPUTED_LOCALITY_DELAY,
+          (long) (1.5 * jobTracker.getNextHeartbeatInterval()));
+    }
+    
+    // Got clusterStatus hence acquiring scheduler lock now.
+    synchronized (this) {
+      // Reload allocations file if it hasn't been loaded in a while
+      poolMgr.reloadAllocsIfNecessary();
+      
+      // Remove any jobs that have stopped running
       List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
       for (JobInProgress job: infos.keySet()) { 
         int runState = job.getStatus().getRunState();
@@ -437,29 +517,40 @@
         infos.remove(job);
         poolMgr.removeJob(job);
       }
-      // Update running jobs with deficits since last update, and compute new
-      // slot allocations, weight, shares and task counts
-      long now = clock.getTime();
-      long timeDelta = now - lastUpdateTime;
-      updateDeficits(timeDelta);
-      updateRunnability();
-      updateTaskCounts();
-      updateWeights();
-      updateMinSlots();
-      updateFairShares(clusterStatus);
+      
+      updateRunnability(); // Set job runnability based on user/pool limits 
+      
+      // Update demands of jobs and pools
+      for (Pool pool: poolMgr.getPools()) {
+        pool.getMapSchedulable().updateDemand();
+        pool.getReduceSchedulable().updateDemand();
+      }
+      
+      // Compute fair shares based on updated demands
+      List<PoolSchedulable> mapScheds = getPoolSchedulables(TaskType.MAP);
+      List<PoolSchedulable> reduceScheds = getPoolSchedulables(TaskType.REDUCE);
+      SchedulingAlgorithms.computeFairShares(
+          mapScheds, clusterStatus.getMaxMapTasks());
+      SchedulingAlgorithms.computeFairShares(
+          reduceScheds, clusterStatus.getMaxReduceTasks());
+      
+      // Use the computed shares to assign shares within each pool
+      for (Pool pool: poolMgr.getPools()) {
+        pool.getMapSchedulable().redistributeShare();
+        pool.getReduceSchedulable().redistributeShare();
+      }
+      
       if (preemptionEnabled)
         updatePreemptionVariables();
-      lastUpdateTime = now;
     }
   }
   
-  private void updateDeficits(long timeDelta) {
-    for (JobInfo info: infos.values()) {
-      info.mapDeficit +=
-        (info.mapFairShare - info.runningMaps) * timeDelta;
-      info.reduceDeficit +=
-        (info.reduceFairShare - info.runningReduces) * timeDelta;
+  public List<PoolSchedulable> getPoolSchedulables(TaskType type) {
+    List<PoolSchedulable> scheds = new ArrayList<PoolSchedulable>();
+    for (Pool pool: poolMgr.getPools()) {
+      scheds.add(pool.getSchedulable(type));
     }
+    return scheds;
   }
   
   private void updateRunnability() {
@@ -490,286 +581,19 @@
     }
   }
 
-  private void updateTaskCounts() {
-    for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
-      JobInProgress job = entry.getKey();
-      JobInfo info = entry.getValue();
-      if (job.getStatus().getRunState() != JobStatus.RUNNING) {
-        // Job is still in PREP state and tasks aren't initialized; skip it.
-        continue;
-      }
-      // Count maps
-      int totalMaps = job.numMapTasks;
-      int finishedMaps = 0;
-      int runningMaps = 0;
-      for (TaskInProgress tip: job.getMapTasks()) {
-        if (tip.isComplete()) {
-          finishedMaps += 1;
-        } else if (tip.isRunning()) {
-          runningMaps += tip.getActiveTasks().size();
-        }
-      }
-      info.runningMaps = runningMaps;
-      info.neededMaps = (totalMaps - runningMaps - finishedMaps
-          + taskSelector.neededSpeculativeMaps(job));
-      // Count reduces
-      int totalReduces = job.numReduceTasks;
-      int finishedReduces = 0;
-      int runningReduces = 0;
-      for (TaskInProgress tip: job.getReduceTasks()) {
-        if (tip.isComplete()) {
-          finishedReduces += 1;
-        } else if (tip.isRunning()) {
-          runningReduces += tip.getActiveTasks().size();
-        }
-      }
-      info.runningReduces = runningReduces;
-      if (enoughMapsFinishedToRunReduces(finishedMaps, totalMaps)) {
-        info.neededReduces = (totalReduces - runningReduces - finishedReduces 
-            + taskSelector.neededSpeculativeReduces(job));
-      } else {
-        info.neededReduces = 0;
-      }
-      // If the job was marked as not runnable due to its user or pool having
-      // too many active jobs, set the neededMaps/neededReduces to 0. We still
-      // count runningMaps/runningReduces however so we can give it a deficit.
-      if (!info.runnable) {
-        info.neededMaps = 0;
-        info.neededReduces = 0;
-      }
-    }
-  }
-
-  /**
-   * Has a job finished enough maps to allow launching its reduces?
-   */
-  protected boolean enoughMapsFinishedToRunReduces(
-      int finishedMaps, int totalMaps) {
-    if (waitForMapsBeforeLaunchingReduces) {
-      return finishedMaps >= Math.max(1, totalMaps * 0.05);
-    } else {
-      return true;
-    }
-  }
-
-  private void updateWeights() {
-    // First, calculate raw weights for each job
-    for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
-      JobInProgress job = entry.getKey();
-      JobInfo info = entry.getValue();
-      info.mapWeight = calculateRawWeight(job, TaskType.MAP);
-      info.reduceWeight = calculateRawWeight(job, TaskType.REDUCE);
-    }
-    // Now calculate job weight sums for each pool
-    Map<String, Double> mapWeightSums = new HashMap<String, Double>();
-    Map<String, Double> reduceWeightSums = new HashMap<String, Double>();
-    for (Pool pool: poolMgr.getPools()) {
-      double mapWeightSum = 0;
-      double reduceWeightSum = 0;
-      for (JobInProgress job: pool.getJobs()) {
-        if (isRunnable(job)) {
-          if (runnableTasks(job, TaskType.MAP) > 0) {
-            mapWeightSum += infos.get(job).mapWeight;
-          }
-          if (runnableTasks(job, TaskType.REDUCE) > 0) {
-            reduceWeightSum += infos.get(job).reduceWeight;
-          }
-        }
-      }
-      mapWeightSums.put(pool.getName(), mapWeightSum);
-      reduceWeightSums.put(pool.getName(), reduceWeightSum);
-    }
-    // And normalize the weights based on pool sums and pool weights
-    // to share fairly across pools (proportional to their weights)
-    for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
-      JobInProgress job = entry.getKey();
-      JobInfo info = entry.getValue();
-      String pool = poolMgr.getPoolName(job);
-      double poolWeight = poolMgr.getPoolWeight(pool);
-      double mapWeightSum = mapWeightSums.get(pool);
-      double reduceWeightSum = reduceWeightSums.get(pool);
-      if (mapWeightSum == 0)
-        info.mapWeight = 0;
-      else
-        info.mapWeight *= (poolWeight / mapWeightSum); 
-      if (reduceWeightSum == 0)
-        info.reduceWeight = 0;
-      else
-        info.reduceWeight *= (poolWeight / reduceWeightSum); 
-    }
-  }
-  
-  private void updateMinSlots() {
-    // Clear old minSlots
-    for (JobInfo info: infos.values()) {
-      info.minMaps = 0;
-      info.minReduces = 0;
-    }
-    // For each pool, distribute its task allocation among jobs in it that need
-    // slots. This is a little tricky since some jobs in the pool might not be
-    // able to use all the slots, e.g. they might have only a few tasks left.
-    // To deal with this, we repeatedly split up the available task slots
-    // between the jobs left, give each job min(its alloc, # of slots it needs),
-    // and redistribute any slots that are left over between jobs that still
-    // need slots on the next pass. If, in total, the jobs in our pool don't
-    // need all its allocation, we leave the leftover slots for general use.
-    PoolManager poolMgr = getPoolManager();
-    for (Pool pool: poolMgr.getPools()) {
-      for (final TaskType type: MAP_AND_REDUCE) {
-        Set<JobInProgress> jobs = new HashSet<JobInProgress>(pool.getJobs());
-        int slotsLeft = poolMgr.getAllocation(pool.getName(), type);
-        // Keep assigning slots until none are left
-        while (slotsLeft > 0) {
-          // Figure out total weight of jobs that still need slots
-          double totalWeight = 0;
-          for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext();) {
-            JobInProgress job = it.next();
-            if (isRunnable(job) &&
-                runnableTasks(job, type) > minTasks(job, type)) {
-              totalWeight += weight(job, type);
-            } else {
-              it.remove();
-            }
-          }
-          if (totalWeight == 0) // No jobs that can use more slots are left 
-            break;
-          // Assign slots to jobs, using the floor of their weight divided by
-          // total weight. This ensures that all jobs get some chance to take
-          // a slot. Then, if no slots were assigned this way, we do another
-          // pass where we use ceil, in case some slots were still left over.
-          int oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
-          for (JobInProgress job: jobs) {
-            double weight = weight(job, type);
-            int share = (int) Math.floor(oldSlots * weight / totalWeight);
-            slotsLeft = giveMinSlots(job, type, slotsLeft, share);
-          }
-          if (slotsLeft == oldSlots) {
-            // No tasks were assigned; do another pass using ceil, giving the
-            // extra slots to jobs in order of weight then deficit
-            List<JobInProgress> sortedJobs = new ArrayList<JobInProgress>(jobs);
-            Collections.sort(sortedJobs, new Comparator<JobInProgress>() {
-              public int compare(JobInProgress j1, JobInProgress j2) {
-                double dif = weight(j2, type) - weight(j1, type);
-                if (dif == 0) // Weights are equal, compare by deficit 
-                  dif = deficit(j2, type) - deficit(j1, type);
-                return (int) Math.signum(dif);
-              }
-            });
-            for (JobInProgress job: sortedJobs) {
-              double weight = weight(job, type);
-              int share = (int) Math.ceil(oldSlots * weight / totalWeight);
-              slotsLeft = giveMinSlots(job, type, slotsLeft, share);
-            }
-            if (slotsLeft > 0) {
-              LOG.warn("Had slotsLeft = " + slotsLeft + " after the final "
-                  + "loop in updateMinSlots. This probably means some fair "
-                  + "scheduler weights are being set to NaN or Infinity.");
-            }
-            break;
-          }
-        }
-      }
-    }
-  }
-
-  /**
-   * Give up to <code>tasksToGive</code> min slots to a job (potentially fewer
-   * if either the job needs fewer slots or there aren't enough slots left).
-   * Returns the number of slots left over.
-   */
-  private int giveMinSlots(JobInProgress job, TaskType type,
-      int slotsLeft, int slotsToGive) {
-    int runnable = runnableTasks(job, type);
-    int curMin = minTasks(job, type);
-    slotsToGive = Math.min(Math.min(slotsLeft, runnable - curMin), slotsToGive);
-    slotsLeft -= slotsToGive;
-    JobInfo info = infos.get(job);
-    if (type == TaskType.MAP)
-      info.minMaps += slotsToGive;
-    else
-      info.minReduces += slotsToGive;
-    return slotsLeft;
-  }
-
-  private void updateFairShares(ClusterStatus clusterStatus) {
-    // Clear old fairShares
-    for (JobInfo info: infos.values()) {
-      info.mapFairShare = 0;
-      info.reduceFairShare = 0;
-    }
-    // Assign new shares, based on weight and minimum share. This is done
-    // as follows. First, we split up the available slots between all
-    // jobs according to weight. Then if there are any jobs whose minSlots is
-    // larger than their fair allocation, we give them their minSlots and
-    // remove them from the list, and start again with the amount of slots
-    // left over. This continues until all jobs' minSlots are less than their
-    // fair allocation, and at this point we know that we've met everyone's
-    // guarantee and we've split the excess capacity fairly among jobs left.
-    for (TaskType type: MAP_AND_REDUCE) {
-      // Select only jobs that still need this type of task
-      HashSet<JobInfo> jobsLeft = new HashSet<JobInfo>();
-      for (Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
-        JobInProgress job = entry.getKey();
-        JobInfo info = entry.getValue();
-        if (isRunnable(job) && runnableTasks(job, type) > 0) {
-          jobsLeft.add(info);
-        }
-      }
-      double slotsLeft = getTotalSlots(type, clusterStatus);
-      while (!jobsLeft.isEmpty()) {
-        double totalWeight = 0;
-        for (JobInfo info: jobsLeft) {
-          double weight = (type == TaskType.MAP ?
-              info.mapWeight : info.reduceWeight);
-          totalWeight += weight;
-        }
-        boolean recomputeSlots = false;
-        double oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
-        for (Iterator<JobInfo> iter = jobsLeft.iterator(); iter.hasNext();) {
-          JobInfo info = iter.next();
-          double minSlots = (type == TaskType.MAP ?
-              info.minMaps : info.minReduces);
-          double weight = (type == TaskType.MAP ?
-              info.mapWeight : info.reduceWeight);
-          double fairShare = weight / totalWeight * oldSlots;
-          if (minSlots > fairShare) {
-            // Job needs more slots than its fair share; give it its minSlots,
-            // remove it from the list, and set recomputeSlots = true to 
-            // remember that we must loop again to redistribute unassigned slots
-            if (type == TaskType.MAP)
-              info.mapFairShare = minSlots;
-            else
-              info.reduceFairShare = minSlots;
-            slotsLeft -= minSlots;
-            iter.remove();
-            recomputeSlots = true;
-          }
-        }
-        if (!recomputeSlots) {
-          // All minimums are met. Give each job its fair share of excess slots.
-          for (JobInfo info: jobsLeft) {
-            double weight = (type == TaskType.MAP ?
-                info.mapWeight : info.reduceWeight);
-            double fairShare = weight / totalWeight * oldSlots;
-            if (type == TaskType.MAP)
-              info.mapFairShare = fairShare;
-            else
-              info.reduceFairShare = fairShare;
-          }
-          break;
-        }
-      }
-    }
-  }
-
-  private double calculateRawWeight(JobInProgress job, TaskType taskType) {
+  public double getJobWeight(JobInProgress job, TaskType taskType) {
     if (!isRunnable(job)) {
-      return 0;
+      // Job won't launch tasks, but don't return 0 to avoid division errors
+      return 1.0;
     } else {
       double weight = 1.0;
       if (sizeBasedWeight) {
         // Set weight based on runnable tasks
-        weight = Math.log1p(runnableTasks(job, taskType)) / Math.log(2);
+        JobInfo info = infos.get(job);
+        int runnableTasks = (taskType == TaskType.MAP) ?
+            info.mapSchedulable.getDemand() : 
+            info.reduceSchedulable.getDemand();
+        weight = Math.log1p(runnableTasks) / Math.log(2);
       }
       weight *= getPriorityFactor(job.getPriority());
       if (weightAdjuster != null) {
@@ -800,69 +624,57 @@
   }
 
   /**
-   * Update the preemption JobInfo fields for all jobs, i.e. the times since
-   * each job last was at its guaranteed share and at > 1/2 of its fair share
+   * Update the preemption fields for all PoolScheduables, i.e. the times since
+   * each pool last was at its guaranteed share and at > 1/2 of its fair share
    * for each type of task.
    */
   private void updatePreemptionVariables() {
     long now = clock.getTime();
-    for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
-      JobInProgress job = entry.getKey();
-      JobInfo info = entry.getValue();
-      if (job.getStatus().getRunState() != JobStatus.RUNNING) {
-        // Job is still in PREP state and tasks aren't initialized. Count it as
-        // both at min and fair share since we shouldn't start any timeouts now.
-        info.lastTimeAtMapMinShare = now;
-        info.lastTimeAtReduceMinShare = now;
-        info.lastTimeAtMapHalfFairShare = now;
-        info.lastTimeAtReduceHalfFairShare = now;
-      } else {
-        if (!isStarvedForMinShare(job, TaskType.MAP))
-          info.lastTimeAtMapMinShare = now;
-        if (!isStarvedForMinShare(job, TaskType.REDUCE))
-          info.lastTimeAtReduceMinShare = now;
-        if (!isStarvedForFairShare(job, TaskType.MAP))
-          info.lastTimeAtMapHalfFairShare = now;
-        if (!isStarvedForFairShare(job, TaskType.REDUCE))
-          info.lastTimeAtReduceHalfFairShare = now;
-      }
-      eventLog.log("PREEMPT_VARS", job.getJobID(),
-          now - info.lastTimeAtMapMinShare,
-          now - info.lastTimeAtMapHalfFairShare);
+    for (TaskType type: MAP_AND_REDUCE) {
+      for (PoolSchedulable sched: getPoolSchedulables(type)) {
+        if (!isStarvedForMinShare(sched)) {
+          sched.setLastTimeAtMinShare(now);
+        }
+        if (!isStarvedForFairShare(sched)) {
+          sched.setLastTimeAtHalfFairShare(now);
+        }
+        eventLog.log("PREEMPT_VARS", sched.getName(), type,
+            now - sched.getLastTimeAtMinShare(),
+            now - sched.getLastTimeAtHalfFairShare());
+      }
     }
   }
 
   /**
-   * Is a job below its min share for the given task type?
+   * Is a pool below its min share for the given task type?
    */
-  boolean isStarvedForMinShare(JobInProgress job, TaskType taskType) {
-    return runningTasks(job, taskType) < minTasks(job, taskType);
+  boolean isStarvedForMinShare(PoolSchedulable sched) {
+    int desiredShare = Math.min(sched.getMinShare(), sched.getDemand());
+    return (sched.getRunningTasks() < desiredShare);
   }
   
   /**
-   * Is a job being starved for fair share for the given task type?
-   * This is defined as being below half its fair share *and* having a
-   * positive deficit.
+   * Is a pool being starved for fair share for the given task type?
+   * This is defined as being below half its fair share.
    */
-  boolean isStarvedForFairShare(JobInProgress job, TaskType type) {
+  boolean isStarvedForFairShare(PoolSchedulable sched) {
     int desiredFairShare = (int) Math.floor(Math.min(
-        fairTasks(job, type) / 2, neededTasks(job, type)));
-    return (runningTasks(job, type) < desiredFairShare &&
-            deficit(job, type) > 0);
+        sched.getFairShare() / 2, sched.getDemand()));
+    return (sched.getRunningTasks() < desiredFairShare);
   }
-  
+
   /**
-   * Check for jobs that need tasks preempted, either because they have been
-   * below their guaranteed share for their pool's preemptionTimeout or they
+   * Check for pools that need tasks preempted, either because they have been
+   * below their guaranteed share for minSharePreemptionTimeout or they
    * have been below half their fair share for the fairSharePreemptionTimeout.
-   * If such jobs exist, compute how many tasks of each type need to be
-   * preempted and then select the right ones using selectTasksToPreempt.
+   * If such pools exist, compute how many tasks of each type need to be
+   * preempted and then select the right ones using preemptTasks.
    * 
    * This method computes and logs the number of tasks we want to preempt even
    * if preemption is disabled, for debugging purposes.
    */
   protected void preemptTasksIfNecessary() {
-    if (!preemptionEnabled || useFifo)
+    if (!preemptionEnabled)
       return;
     
     long curTime = clock.getTime();
@@ -874,31 +686,16 @@
     // because we might need to call some JobTracker methods (killTask).
     synchronized (taskTrackerManager) {
       synchronized (this) {
-        List<JobInProgress> jobs = new ArrayList<JobInProgress>(infos.keySet());
         for (TaskType type: MAP_AND_REDUCE) {
+          List<PoolSchedulable> scheds = getPoolSchedulables(type);
           int tasksToPreempt = 0;
-          for (JobInProgress job: jobs) {
-            tasksToPreempt += tasksToPreempt(job, type, curTime);
+          for (PoolSchedulable sched: scheds) {
+            tasksToPreempt += tasksToPreempt(sched, curTime);
           }
           if (tasksToPreempt > 0) {
             eventLog.log("SHOULD_PREEMPT", type, tasksToPreempt);
             if (!onlyLogPreemption) {
-              // Actually preempt the tasks. The policy for this is to pick
-              // tasks from jobs that are above their min share and have very 
-              // negative deficits (meaning they've been over-scheduled). 
-              // However, we also want to minimize the amount of computation
-              // wasted by preemption, so prefer tasks that started recently.
-              // We go through all jobs in order of deficit (highest first), 
-              // and for each job, we preempt tasks in order of start time 
-              // until we hit either minTasks or fairTasks tasks left (so as
-              // not to create a new starved job).
-              Collections.sort(jobs, new DeficitComparator(type));
-              for (int i = jobs.size() - 1; i >= 0; i--) {
-                JobInProgress job = jobs.get(i);
-                int tasksPreempted = preemptTasks(job, type, tasksToPreempt);
-                tasksToPreempt -= tasksPreempted;
-                if (tasksToPreempt == 0) break;
-              }
+              preemptTasks(scheds, tasksToPreempt);
             }
           }
         }
@@ -907,44 +704,98 @@
   }
 
   /**
-   * Count how many tasks of a given type the job needs to preempt, if any.
-   * If the job has been below its min share for at least its pool's preemption
+   * Preempt a given number of tasks from a list of PoolSchedulables. 
+   * The policy for this is to pick tasks from pools that are over their fair 
+   * share, but make sure that no pool is placed below its fair share in the 
+   * process. Furthermore, we want to minimize the amount of computation
+   * wasted by preemption, so out of the tasks in over-scheduled pools, we
+   * prefer to preempt tasks that started most recently.
+   */
+  private void preemptTasks(List<PoolSchedulable> scheds, int tasksToPreempt) {
+    if (scheds.isEmpty() || tasksToPreempt == 0)
+      return;
+    
+    TaskType taskType = scheds.get(0).getTaskType();
+    
+    // Collect running tasks of our type from over-scheduled pools
+    List<TaskStatus> runningTasks = new ArrayList<TaskStatus>();
+    for (PoolSchedulable sched: scheds) {
+      if (sched.getRunningTasks() > sched.getFairShare())
+      for (JobSchedulable js: sched.getJobSchedulables()) {
+        runningTasks.addAll(getRunningTasks(js.getJob(), taskType));
+      }
+    }
+    
+    // Sort tasks into reverse order of start time
+    Collections.sort(runningTasks, new Comparator<TaskStatus>() {
+      public int compare(TaskStatus t1, TaskStatus t2) {
+        if (t1.getStartTime() < t2.getStartTime())
+          return 1;
+        else if (t1.getStartTime() == t2.getStartTime())
+          return 0;
+        else
+          return -1;
+      }
+    });
+    
+    // Maintain a count of tasks left in each pool; this is a bit
+    // faster than calling runningTasks() on the pool repeatedly
+    // because the latter must scan through jobs in the pool
+    HashMap<Pool, Integer> tasksLeft = new HashMap<Pool, Integer>(); 
+    for (Pool p: poolMgr.getPools()) {
+      tasksLeft.put(p, p.getSchedulable(taskType).getRunningTasks());
+    }
+    
+    // Scan down the sorted list of task statuses until we've killed enough
+    // tasks, making sure we don't kill too many from any pool
+    for (TaskStatus status: runningTasks) {
+      JobID jobID = status.getTaskID().getJobID();
+      JobInProgress job = taskTrackerManager.getJob(jobID);
+      Pool pool = poolMgr.getPool(job);
+      PoolSchedulable sched = pool.getSchedulable(taskType);
+      if (tasksLeft.get(pool) > sched.getFairShare()) {
+        eventLog.log("PREEMPT", status.getTaskID(),
+            status.getTaskTracker());
+        try {
+          taskTrackerManager.killTask(status.getTaskID(), false);
+          tasksToPreempt--;
+          if (tasksToPreempt == 0)
+            break;
+        } catch (IOException e) {
+          LOG.error("Failed to kill task " + status.getTaskID(), e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Count how many tasks of a given type the pool needs to preempt, if any.
+   * If the pool has been below its min share for at least its preemption
    * timeout, it should preempt the difference between its current share and
    * this min share. If it has been below half its fair share for at least the
    * fairSharePreemptionTimeout, it should preempt enough tasks to get up to
-   * its full fair share. If both situations hold, we preempt the max of the
+   * its full fair share. If both conditions hold, we preempt the max of the
    * two amounts (this shouldn't happen unless someone sets the timeouts to
    * be identical for some reason).
    */
-  protected int tasksToPreempt(JobInProgress job, TaskType type, long curTime) {
-    JobInfo info = infos.get(job);
-    if (info == null) return 0;
-    String pool = poolMgr.getPoolName(job);
+  protected int tasksToPreempt(PoolSchedulable sched, long curTime) {
+    String pool = sched.getName();
     long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool);
     long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout();
     int tasksDueToMinShare = 0;
     int tasksDueToFairShare = 0;
-    if (type == TaskType.MAP) {
-      if (curTime - info.lastTimeAtMapMinShare > minShareTimeout) {
-        tasksDueToMinShare = info.minMaps - info.runningMaps;
-      }
-      if (curTime - info.lastTimeAtMapHalfFairShare > fairShareTimeout) {
-        double fairShare = Math.min(info.mapFairShare, info.neededMaps);
-        tasksDueToFairShare = (int) (fairShare - info.runningMaps);
-      }
-    } else { // type == TaskType.REDUCE
-      if (curTime - info.lastTimeAtReduceMinShare > minShareTimeout) {
-        tasksDueToMinShare = info.minReduces - info.runningReduces;
-      }
-      if (curTime - info.lastTimeAtReduceHalfFairShare > fairShareTimeout) {
-        double fairShare = Math.min(info.reduceFairShare, info.neededReduces);
-        tasksDueToFairShare = (int) (fairShare - info.runningReduces);
-      }
+    if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
+      int target = Math.min(sched.getMinShare(), sched.getDemand());
+      tasksDueToMinShare = target - sched.getRunningTasks();
+    }
+    if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
+      int target = (int) Math.min(sched.getFairShare(), sched.getDemand());
+      tasksDueToFairShare = target - sched.getRunningTasks();
     }
     int tasksToPreempt = Math.max(tasksDueToMinShare, tasksDueToFairShare);
     if (tasksToPreempt > 0) {
       String message = "Should preempt " + tasksToPreempt + " " 
-          + type + " tasks for " + job.getJobID() 
+          + sched.getTaskType() + " tasks for pool " + sched.getName() 
           + ": tasksDueToMinShare = " + tasksDueToMinShare
           + ", tasksDueToFairShare = " + tasksDueToFairShare;
       eventLog.log("INFO", message);
@@ -953,24 +804,7 @@
     return tasksToPreempt;
   }
 
-  /**
-   * Preempt up to maxToPreempt tasks of the given type from the given job,
-   * without having it go below its min share or below half its fair share.
-   * Selects the tasks so as to preempt the least recently launched one first,
-   * thus minimizing wasted compute time. Returns the number of tasks preempted.
-   */
-  private int preemptTasks(JobInProgress job, TaskType type, int maxToPreempt) {
-    // Figure out how many tasks to preempt. NOTE: We use the runningTasks, etc
-    // values in JobInfo rather than re-counting them, but this should be safe
-    // because we are being called only inside update(), which has a lock on
-    // the JobTracker, so all the values are fresh.
-    int desiredFairShare = (int) Math.floor(Math.min(
-        fairTasks(job, type) / 2, neededTasks(job, type)));
-    int tasksToLeave = Math.max(minTasks(job, type), desiredFairShare);
-    int tasksToPreempt = Math.min(
-        maxToPreempt, runningTasks(job, type) - tasksToLeave);
-    if (tasksToPreempt == 0)
-      return 0;
+  private List<TaskStatus> getRunningTasks(JobInProgress job, TaskType type) {
     // Create a list of all running TaskInProgress'es in the job
     List<TaskInProgress> tips = new ArrayList<TaskInProgress>();
     if (type == TaskType.MAP) {
@@ -993,83 +827,7 @@
         statuses.add(tip.getTaskStatus(id));
       }
     }
-    // Sort the statuses in order of start time, with the latest launched first
-    Collections.sort(statuses, new Comparator<TaskStatus>() {
-      public int compare(TaskStatus t1, TaskStatus t2) {
-        return (int) Math.signum(t2.getStartTime() - t1.getStartTime());
-      }
-    });
-    // Preempt the tasks in order of start time until we've done enough
-    int numKilled = 0;
-    for (int i = 0; i < tasksToPreempt; i++) {
-      if (i > statuses.size() - tasksToLeave) {
-        // Sanity check in case we computed maxToPreempt incorrectly due to
-        // stale data in JobInfos. Shouldn't happen if we are called from update.
-        LOG.error("Stale task counts in the JobInfos in preemptTasks - "
-            + "probaly due to calling preemptTasks() from outside update(). ");
-        break;
-      }
-      TaskStatus status = statuses.get(i);
-      eventLog.log("PREEMPT", status.getTaskID(), status.getTaskTracker());
-      try {
-        taskTrackerManager.killTask(status.getTaskID(), false);
-        numKilled++;
-      } catch (IOException e) {
-        LOG.error("Failed to kill task " + status.getTaskID(), e);
-      }
-    }
-    return numKilled;
-  }
-
-  public synchronized boolean getUseFifo() {
-    return useFifo;
-  }
-  
-  public synchronized void setUseFifo(boolean useFifo) {
-    this.useFifo = useFifo;
-  }
-  
-  // Getter methods for reading JobInfo values based on TaskType, safely
-  // returning 0's for jobs with no JobInfo present.
-
-  protected int neededTasks(JobInProgress job, TaskType taskType) {
-    JobInfo info = infos.get(job);
-    if (info == null) return 0;
-    return taskType == TaskType.MAP ? info.neededMaps : info.neededReduces;
-  }
-  
-  protected int runningTasks(JobInProgress job, TaskType taskType) {
-    JobInfo info = infos.get(job);
-    if (info == null) return 0;
-    return taskType == TaskType.MAP ? info.runningMaps : info.runningReduces;
-  }
-
-  protected int runnableTasks(JobInProgress job, TaskType type) {
-    return neededTasks(job, type) + runningTasks(job, type);
-  }
-
-  protected int minTasks(JobInProgress job, TaskType type) {
-    JobInfo info = infos.get(job);
-    if (info == null) return 0;
-    return (type == TaskType.MAP) ? info.minMaps : info.minReduces;
-  }
-
-  protected double fairTasks(JobInProgress job, TaskType type) {
-    JobInfo info = infos.get(job);
-    if (info == null) return 0;
-    return (type == TaskType.MAP) ? info.mapFairShare : info.reduceFairShare;
-  }
-
-  protected double weight(JobInProgress job, TaskType taskType) {
-    JobInfo info = infos.get(job);
-    if (info == null) return 0;
-    return (taskType == TaskType.MAP ? info.mapWeight : info.reduceWeight);
-  }
-
-  protected double deficit(JobInProgress job, TaskType taskType) {
-    JobInfo info = infos.get(job);
-    if (info == null) return 0;
-    return taskType == TaskType.MAP ? info.mapDeficit : info.reduceDeficit;
+    return statuses;
   }
 
   protected boolean isRunnable(JobInProgress job) {
@@ -1111,13 +869,14 @@
       for (JobInProgress job: jobs) {
         JobProfile profile = job.getProfile();
         JobInfo info = infos.get(job);
-        eventLog.log("JOB",
+        // TODO: Fix
+        /*eventLog.log("JOB",
             profile.getJobID(), profile.name, profile.user,
             job.getPriority(), poolMgr.getPoolName(job),
             job.numMapTasks, info.runningMaps, info.neededMaps, 
             info.mapFairShare, info.mapWeight, info.mapDeficit,
             job.numReduceTasks, info.runningReduces, info.neededReduces, 
-            info.reduceFairShare, info.reduceWeight, info.reduceDeficit);
+            info.reduceFairShare, info.reduceWeight, info.reduceDeficit);*/
       }
       // List pools in alphabetical order
       List<Pool> pools = new ArrayList<Pool>(poolMgr.getPools());
@@ -1135,8 +894,9 @@
         for (JobInProgress job: pool.getJobs()) {
           JobInfo info = infos.get(job);
           if (info != null) {
-            runningMaps += info.runningMaps;
-            runningReduces += info.runningReduces;
+            // TODO: Fix
+            //runningMaps += info.runningMaps;
+            //runningReduces += info.runningReduces;
           }
         }
         String name = pool.getName();
@@ -1149,4 +909,16 @@
       eventLog.log("END_DUMP");
     }
   }
+
+  public Clock getClock() {
+    return clock;
+  }
+  
+  public FairSchedulerEventLog getEventLog() {
+    return eventLog;
+  }
+
+  public JobInfo getJobInfo(JobInProgress job) {
+    return infos.get(job);
+  }
 }



Mime
View raw message