hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1169585 [1/5] - in /hadoop/common/branches/branch-0.20-security: ./ conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/ src/co...
Date Sun, 11 Sep 2011 23:57:38 GMT
Author: acmurthy
Date: Sun Sep 11 23:57:37 2011
New Revision: 1169585

URL: http://svn.apache.org/viewvc?rev=1169585&view=rev
MAPREDUCE-2981. Backport FairScheduler from trunk. Contributed by Matei Zaharia.


Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1169585&r1=1169584&r2=1169585&view=diff
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Sun Sep 11 23:57:37 2011
@@ -202,6 +202,9 @@ Release - unreleased
     HADOOP-7599. Script improvements to setup a secure Hadoop cluster 
     (Eric Yang via ddas)
+    MAPREDUCE-2981. Backport FairScheduler from trunk. (Matei Zaharia via
+    acmurthy) 
 Release - 2011-8-25

Added: hadoop/common/branches/branch-0.20-security/conf/fair-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/conf/fair-scheduler.xml.template?rev=1169585&view=auto
--- hadoop/common/branches/branch-0.20-security/conf/fair-scheduler.xml.template (added)
+++ hadoop/common/branches/branch-0.20-security/conf/fair-scheduler.xml.template Sun Sep 11
23:57:37 2011
@@ -0,0 +1,12 @@
+<?xml version="1.0"?>
+  This file contains pool and user allocations for the Fair Scheduler.
+  Its format is explained in the Fair Scheduler documentation at
+  http://hadoop.apache.org/common/docs/r0.20.205.0/fair_scheduler.html.
+  The documentation also includes a sample config file.

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1169585&r1=1169584&r2=1169585&view=diff
--- hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
+++ hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Sun Sep 11 23:57:37 2011
@@ -509,8 +509,9 @@ class CapacityTaskScheduler extends Task
       // First, try to get a 'local' task
-      Task t = 
-        job.obtainNewLocalMapTask(taskTracker, numTaskTrackers, numUniqueHosts);
+      Task t = job.obtainNewNodeOrRackLocalMapTask(taskTracker,
+                                                   numTaskTrackers,
+                                                   numUniqueHosts);
       if (t != null) {
         return TaskLookupResult.getTaskFoundResult(t, job); 

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1169585&r1=1169584&r2=1169585&view=diff
--- hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
+++ hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Sun Sep 11 23:57:37 2011
@@ -195,8 +195,8 @@ public class TestCapacityScheduler exten
-    public Task obtainNewLocalMapTask(final TaskTrackerStatus tts, int clusterSize,
-        int ignored) throws IOException {
+    public Task obtainNewNodeOrRackLocalMapTask(final TaskTrackerStatus tts,
+        int clusterSize, int ignored) throws IOException {
       return obtainNewMapTask(tts, clusterSize, ignored);
@@ -553,6 +553,12 @@ public class TestCapacityScheduler exten
       return statuses;
+    @Override
+    public boolean killTask(TaskAttemptID taskid, boolean shouldFail)
+      throws IOException {
+      return false;
+    }
     public void addJobInProgressListener(JobInProgressListener listener) {

Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.tex
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.tex?rev=1169585&view=auto
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.tex
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.tex
Sun Sep 11 23:57:37 2011
@@ -0,0 +1,253 @@
+% Licensed to the Apache Software Foundation (ASF) under one
+% or more contributor license agreements.  See the NOTICE file
+% distributed with this work for additional information
+% regarding copyright ownership.  The ASF licenses this file
+% to you under the Apache License, Version 2.0 (the
+% "License"); you may not use this file except in compliance
+% with the License.  You may obtain a copy of the License at
+%     http://www.apache.org/licenses/LICENSE-2.0
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS,
+% See the License for the specific language governing permissions and
+% limitations under the License.
+\title{Hadoop Fair Scheduler Design Document}
+The Hadoop Fair Scheduler started as a simple means to share MapReduce clusters. Over time,
it has grown in functionality to support hierarchical scheduling, preemption, and multiple
ways of organizing and weighing jobs. This document explains the goals and features of the
Fair Scheduler and its internal design.
+\section{Fair Scheduler Goals}
+The Fair Scheduler was designed with four main goals:
+  \item Run small jobs quickly even if they are sharing a cluster with large jobs. Unlike
Hadoop's built-in FIFO scheduler, fair scheduling lets small jobs make progress even if a
large job is running, without starving the large job.
+  \item Provide guaranteed service levels to ``production" jobs, to let them run alongside
experimental jobs in a shared cluster.
+  \item Be simple to administer and configure. The scheduler should do something reasonable
``out of the box," and users should only need to configure it as they discover that they want
to use more advanced features.
+  \item Support reconfiguration at runtime, without requiring a cluster restart.
+\section{Scheduler Features}
+This section provides a quick overview of the features of the Fair Scheduler. A detailed
usage guide is available in the Hadoop documentation in {\tt build/docs/fair\_scheduler.html}.
+The Fair Scheduler groups jobs into ``pools" and performs fair sharing between these pools.
Each pool can use either FIFO or fair sharing to schedule jobs internal to the pool. The pool
that a job is placed in is determined by a JobConf property, the ``pool name property". By
default, this is {\tt user.name}, so that there is one pool per user. However, different properties
can be used, e.g.~{\tt group.name} to have one pool per Unix group.
+A common trick is to set the pool name property to an unused property name such as {\tt pool.name}
and make this default to {\tt user.name}, so that there is one pool per user but it is also
possible to place jobs into ``special" pools by setting their {\tt pool.name} directly. The
{\tt mapred-site.xml} snippet below shows how to do this:
+  <name>mapred.fairscheduler.poolnameproperty</name>
+  <value>pool.name</value>
+  <name>pool.name</name>
+  <value>${user.name}</value>
+\subsection{Minimum Shares}
+Normally, active pools (those that contain jobs) will get equal shares of the map and reduce
task slots in the cluster. However, it is also possible to set a \emph{minimum share} of map
and reduce slots on a given pool, which is a number of slots that it will always get when
it is active, even if its fair share would be below this number. This is useful for guaranteeing
that production jobs get a certain desired level of service when sharing a cluster with non-production
jobs. Minimum shares have three effects:
+  \item The pool's fair share will always be at least as large as its minimum share. Slots
are taken from the share of other pools to achieve this. The only exception is if the minimum
shares of the active pools add up to more than the total number of slots in the cluster; in
this case, each pool's share will be scaled down proportionally.
+  \item Pools whose running task count is below their minimum share get assigned slots first
when slots are available.
+  \item It is possible to set a \emph{preemption timeout} on the pool after which, if it
has not received enough task slots to meet its minimum share, it is allowed to kill tasks
in other jobs to meet its share. Minimum shares with preemption timeouts thus act like SLAs.
+Note that when a pool is inactive (contains no jobs), its minimum share is not ``reserved"
for it -- the slots are split up among the other pools.
+As explained above, the scheduler may kill tasks from a job in one pool in order to meet
the minimum share of another pool. We call this preemption, although this usage of the word
is somewhat strange given the normal definition of preemption as pausing; really it is the
\emph{job} that gets preempted, while the task gets killed. The feature explained above is
called \emph{min share preemption}. In addition, the scheduler supports \emph{fair share preemption},
to kill tasks when a pool's fair share is not being met. Fair share preemption is much more
conservative than min share preemption, because pools without min shares are expected to be
non-production jobs where some amount of unfairness is tolerable. In particular, fair share
preemption activates if a pool has been below \emph{half} of its fair share for a configurable
fair share preemption timeout, which is recommended to be set fairly high (e.g. 10 minutes).
+In both types of preemption, the scheduler kills the most recently launched tasks from over-scheduled
pools, to minimize the amount of computation wasted by preemption.
+\subsection{Running Job Limits}
+The fair scheduler can limit the number of concurrently running jobs from each user and from
each pool. This is useful for limiting the amount of intermediate data generated on the cluster.
The jobs that will run are chosen in order of submit time and priority. Jobs submitted beyond
the limit wait for one of the running jobs to finish.
+\subsection{Job Priorities}
+Within a pool, job priorities can be used to control the scheduling of jobs, whether the
pool's internal scheduling mode is FIFO or fair sharing:
+  \item In FIFO pools, jobs are ordered first by priority and then by submit time, as in
Hadoop's default scheduler.
+  \item In fair sharing pools, job priorities are used as weights to control how much share
a job gets. The normal priority corresponds to a weight of 1.0, and each level gives 2x more
weight. For example, a high-priority job gets a weight of 2.0, and will therefore get 2x the
share of a normal-priority job. 
+\subsection{Pool Weights}
+Pools can be given weights to achieve unequal sharing of the cluster. For example, a pool
with weight 2.0 gets 2x the share of a pool with weight 1.0.
+\subsection{Delay Scheduling}
+The Fair Scheduler contains an algorithm called delay scheduling to improve data locality.
Jobs that cannot launch a data-local map task wait for some period of time before they are
allowed to launch non-data-local tasks, ensuring that they will run locally if some node in
the cluster has the relevant data. Delay scheduling is described in detail in Section \ref{sec:delay-scheduling}.
+The Fair Scheduler includes a web UI displaying the active pools and jobs and their fair
shares, moving jobs between pools, and changing job priorities.
+In addition, the Fair Scheduler's allocation file (specifying min shares and preemption timeouts
for the pools) is automatically reloaded if it is modified on disk, to allow runtime reconfiguration.
+\subsection{Hadoop Scheduling Background}
+Hadoop jobs consist of a number of map and reduce \emph{tasks}. These task run in \emph{slots}
on the nodes on the cluster. Each node is configured with a number of map slots and reduce
slots based on its computational resources (typically one slot per core). The role of the
scheduler is to assign tasks to any slots that are free.
+All schedulers in Hadoop, including the Fair Scheduler, inherit from the {\tt TaskScheduler}
abstract class. This class provides access to a {\tt TaskTrackerManager} -- an interface to
the JobTracker -- as well as a {\tt Configuration} instance. It also ask the scheduler to
implement three abstract methods: the lifecycle methods {\tt start} and {\tt terminate}, and
a method called {\tt assignTasks} to launch tasks on a given TaskTracker.
+Task assignment in Hadoop is reactive. TaskTrackers periodically send heartbeats to the JobTracker
with their {\tt TaskTrackerStatus}, which contains a list of running tasks, the number of
slots on the node, and other information. The JobTracker then calls {\tt assignTasks} on the
scheduler to obtain tasks to launch. These are returned with the heartbeat response.
+Apart from reacting to heartbeats through {\tt assignTasks}, schedulers can also be notified
when jobs have been submitted to the cluster, killed, or removed by adding listeners to the
{\tt TaskTrackerManager}. The Fair Scheduler sets up these listeners in its {\tt start} method.
An important role of the listeners is to initialize jobs that are submitted -- until a job
is initialized, it cannot launch tasks. The Fair Scheduler currently initializes all jobs
right away, but it may also be desirable to hold off initializing jobs if too many are submitted
to limit memory usage on the JobTracker.
+Selection of tasks \emph{within} a job is mostly done by the {\tt JobInProgress} class, and
not by individual schedulers. {\tt JobInProgress} exposes two methods, {\tt obtainNewMapTask}
and {\tt obtainNewReduceTask}, to launch a task of either type. Both methods may either return
a {\tt Task} object or {\tt null} if the job does not wish to launch a task. Whether a job
wishes to launch a task may change back and forth during its lifetime. Even after all tasks
in the job have been started, the job may wish to run another task for speculative execution.
In addition, if the node containing a map task failed, the job will wish to re-run it to rebuild
its output for use in the reduce tasks. Schedulers may therefore need to poll multiple jobs
until they find one with a task to run.
+Finally, for map tasks, an important scheduling criterion is data locality: running the task
on a node or rack that contains its input data. Normally, {\tt JobInProgress.obtainNewMapTask}
returns the ``closest" map task to a given node. However, to give schedulers slightly more
control over data locality, there is also a version of {\tt obtainNewMapTask} that allow the
scheduler to cap the level of non-locality allowed for the task (e.g.~request a task only
on the same node, or {\tt null} if none is available). The Fair Scheduler uses this method
with an algorithm called delay scheduling (Section \ref{sec:delay-scheduling}) to optimize
data locality.
+\subsection{Fair Scheduler Basics}
+At a high level, the Fair Scheduler uses hierarchical scheduling to assign tasks. First it
selects a pool to assign a task to according to the fair sharing algorithm in Section \ref{sec:fair-sharing-alg}.
Then it asks the pool obtain a task. The pool chooses among its jobs according to its internal
scheduling order (FIFO or fair sharing).
+In fact, because jobs might not have tasks to launch ({\tt obtainNew(Map|Reduce)Task} can
return null), the scheduler actually establishes an ordering on jobs and asks them for tasks
in turn. Within a pool, jobs are sorted either by priority and start time (for FIFO) or by
distance below fair share. If the first job in the ordering does not have a task to launch,
the pool will ask the second, third, etc jobs. Pools themselves are sorted by distance below
min share and fair share, so if the first pool does not have any jobs that can launch tasks,
the second pool is asked, etc. This makes it straightforward to implement features like delay
scheduling (Section \ref{sec:delay-scheduling}) that may cause jobs to ``pass" on a slot.
+Apart from the assign tasks code path, the Fair Scheduler also has a periodic update thread
that calls {\tt update} every few seconds. This thread is responsible for recomputing fair
shares to display them on the UI (Section \ref{sec:fair-share-computation}), checking whether
jobs need to be preempted (Section \ref{sec:preemption}), and checking whether the allocations
file has changed to reload pool allocations (through {\tt PoolManager}).
+\subsection{The {\tt Schedulable} Class}
+To allow the same fair sharing algorithm to be used both between pools and within a pool,
the Fair Scheduler uses an abstract class called {\tt Schedulable} to represent both pools
and jobs. Its subclasses for these roles are {\tt PoolSchedulable} and {\tt JobSchedulable}.
A {\tt Schedulable} is responsible for three roles:
+  \item It can be asked to obtain a task through {\tt assignTask}. This may return {\tt null}
if the {\tt Schedulable} has no tasks to launch.
+  \item It can be queried for information about the pool/job to use in scheduling, such as:
+  \begin{itemize}
+    \item Number of running tasks.
+    \item Demand (number of tasks the {\tt Schedulable} \emph{wants} to run; this is equal
to number of running tasks + number of unlaunched tasks).
+    \item Min share assigned through config file.
+    \item Weight (for fair sharing).
+    \item Priority and start time (for FIFO scheduling).
+  \end{itemize}
+  \item It can be assigned a fair share through {\tt setFairShare}.
+There are separate {\tt Schedulable}s for map and reduce tasks, to make it possible to use
the same algorithm on both types of tasks.
+\subsection{Fair Sharing Algorithm}
+A simple way to achieve fair sharing is the following: whenever a slot is available, assign
it to the pool that has the fewest running tasks. This will ensure that all pool get an equal
number of slots, unless a pool's demand is less than its fair share, in which case the extra
slots are divided evenly among the other pools. Two features of the Fair Scheduler complicate
this algorithm a little:
+  \item Pool weights mean that some pools should get more slots than others. For example,
a pool with weight 2 should get 2x more slots than a pool with weight 1. This is accomplished
by changing the scheduling rule to ``assign the slot to the pool whose value of $runningTasks/weight$
is smallest."
+  \item Minimum shares mean that pools below their min share should get slots first. When
we sort pools to choose which ones to schedule next, we place pools below their min share
ahead of pools above their min share. We order the pools below their min share by how far
they are below it as a percentage of the share.
+This fair sharing algorithm is implemented in {\tt FairShareComparator} in the {\tt SchedulingAlgorithms}
class. The comparator orders jobs by distance below min share and then by $runningTasks/weight$.
+To determine when to preempt tasks, the Fair Schedulers maintains two values for each {\tt
PoolSchedulable}: the last time when the pool was at its min share, and the last time when
the pool was at half its fair share. These conditions are checked periodically by the update
thread in {\tt FairScheduler.updatePreemptionVariables}, using the methods {\tt isStarvedForMinShare}
and {\tt isStarvedForFairShare}. These methods also take into account the demand of the pool,
so that a pool is not counted as starving if its demand is below its min/fair share but is
otherwise met.
+When preempting tasks, the scheduler kills the most recently launched tasks from over-scheduled
pools. This minimizes the amount of computation wasted by preemption and ensures that all
jobs can eventually finish (it is as if the preempted jobs just never got their last few slots).
The tasks are chosen and preempted in {\tt preemptTasks}.
+Note that for min share preemption, it is clear when a pool is below its min share because
the min share is given as a number of slots, but for fair share preemption, we must be able
to compute a pool's fair share to determine when it is being starved. This computation is
trickier than dividing the number of slots by the number of pools due to weights, min shares
and demands. Section \ref{sec:fair-share-computation} explains how fair shares are computed.
+\subsection{Fair Share Computation}
+The scheduling algorithm in Section \ref{sec:fair-sharing-alg} achieves fair shares without
actually needing to compute pools' numerical shares beforehand. However, for preemption and
for displaying shares in the Web UI, we want to know what a pool's fair share is even if the
pool is not currently at its share. That is, we want to know how many slots the pool \emph{would}
get if we started with all slots being empty and ran the algorithm in Section \ref{sec:fair-sharing-alg}
until we filled them.
+One way to compute these shares would be to simulate starting out with empty slots and calling
{\tt assignTasks} repeatedly until they filled, but this is expensive, because each scheduling
decision takes $O(numJobs)$ time and we need to make $O(numSlots)$ decisions.
+To compute fair shares efficiently, the Fair Scheduler includes an algorithm based on binary
search in {\tt SchedulingAlgorithms.computeFairShares}. This algorithm is based on the following
observation. If all slots had been assigned according to weighted fair sharing respecting
pools' demands and min shares, then there would exist a ratio $r$ such that:
+  \item Pools whose demand $d_i$ is less than $r w_i$ (where $w_i$ is the weight of the pool)
are assigned $d_i$ slots.
+  \item Pools whose min share $m_i$ is more than $r w_i$ are assigned $\min(m_i, d_i)$ slots.
+  \item All other pools are assigned $r w_i$ slots.
+  \item The pools' shares sum up to the total number of slots $t$.
+The Fair Scheduler uses binary search to compute the correct $r$. We define a function $f(r)$
as the number of slots that would be used for a given $r$ if conditions 1-3 above were met,
and then find a value of $r$ that makes $f(r)=t$. More precisely, $f(r)$ is defined as:
+$$f(r) = \sum_i{\min(d_i, \max(r w_i, m_i)).}$$
+Note that $f(r)$ is increasing in $r$ because every term of the sum is increasing, so the
equation $f(r) = t$ can be solved by binary search. We choose 0 as a lower bound of our binary
search because with $r=0$, only min shares are assigned. (An earlier check in {\tt computeFairShares}
checks whether the min shares add up to more than the total number of slots, and if so, computes
fair shares by scaling down the min shares proportionally and returns.) To compute an upper
bound for the binary search, we try $r=1,2,4,8,\dots$ until we find a value large enough that
either more than $t$ slots are used or all pools' demands are met (in case the demands added
up to less than $t$).
+The steps of the algorithm are explained in detail in {\tt SchedulingAlgorithms.java}.
+This algorithm runs in time $O(NP)$, where $N$ is the number of jobs/pools and $P$ is the
desired number of bits of precision in the computed values (number of iterations of binary
search), which we've set to 25. It thus scales linearly in the number of jobs and pools.
+\subsection{Running Job Limits}
+Running job limits are implemented by marking jobs as not runnable if there are too many
jobs submitted by the same user or pool. This is done in {\tt FairScheduler.updateRunnability}.
A job that is not runnable declares its demand as 0 and always returns {\tt null} from {\tt
+\subsection{Delay Scheduling}
+In Hadoop, running map tasks on the nodes or racks that contain their input data is critical
for performance, because it avoids shipping the data over the network. However, always assigning
slots to the first job in order of pool shares and in-pool ordering (the ``head-of-line job")
can sometimes lead to poor locality:
+  \item If the head-of-line job is small, the chance of it having data on the node that a
heartbeat was received from is small. Therefore, locality would be poor in a small-job workload
if we always assigned slots to the head-of-line job.
+  \item When fair sharing is used, there is a strong bias for a job to be reassigned into
a slot that it just finished a task in, because when it finishes the task, the job falls below
its fair share. This can mean that jobs have a difficult time running in slots that other
jobs have taken and thus achieve poor locality.
+To deal with both of these situations, the Fair Scheduler can sacrifice fairness temporarily
to improve locality through an algorithm called delay scheduling. If the head-of-line job
cannot launch a local task on the TaskTracker that sent a heartbeat, then it is skipped, and
other running jobs are looked at in order of pool shares and in-pool scheduling rules to find
a job with a local task. However, if the head-of-line job has been skipped for a sufficiently
long time, it is allowed to launch rack-local tasks. Then, if it is skipped for a longer time,
it is also allowed to launch off-rack tasks. These skip times are called locality delays.
Delays of a few seconds are sufficient to drastically increase locality.
+The Fair Scheduler allows locality delays to be set through {\tt mapred-site.xml} or to be
turned off by setting them to zero. However, by default, it computes the delay automatically
based on the heartbeat interval of the cluster. The delay is set to 1.5x the heartbeat interval.
+When a job that has been allowed to launch non-local tasks ends up launching a local task
again, its ``locality level" resets and it must wait again before launching non-local tasks.
This is done so that a job that gets ``unlucky" early in its lifetime does not continue to
launch non-local tasks throughout its life.
+Delay scheduling is implemented by keeping track of two variables on each job: the locality
level of the last map it launched (0 for node-local, 1 for rack-local and 2 for off-rack)
and the time it has spent being skipped for a task. These are kept in a {\tt JobInfo} structure
associated with each job in {\tt FairScheduler.java}. Whenever a job is asked for tasks, it
checks the locality level it is allowed to launch them at through {\tt FairScheduler.getAllowedLocalityLevel}.
If it does not launch a task, it is marked as ``visited" on that heartbeat by appending itself
to a {\tt visited} job list that is passed around between calls to {\tt assignTasks} on the
same heartbeat. Jobs that are visited on a heartbeat but do not launch any tasks during it
are considered as skipped for the time interval between this heartbeat and the next. Code
at the beginning of {\tt FairScheduler.assignTasks} increments the wait time of each skipped
job by the time elapsed since the last heartbea
 t. Once a job has been skipped for more than the locality delay, {\tt getAllowedLocalityLevel}
starts returning higher locality so that it is allowed to launch less-local tasks. Whenever
the job launches a task, its wait time is reset, but we remember the locality level of the
launched task so that the job is allowed to launch more tasks at this level without further
+\subsection{Locking Order}
+Fair Scheduler data structures can be touched by several threads. Most commonly, the JobTracker
invokes {\tt assignTasks}. This happens inside a block of code where the JobTracker has locked
itself already. Therefore, to prevent deadlocks, we always ensure that \emph{if both the FairScheduler
and the JobTracker must be locked, the JobTracker is locked first}. Other threads that can
lock the FairScheduler include the update thread and the web UI.
+\subsection{Unit Tests}
+The Fair Scheduler contains extensive unit tests using mock {\tt TaskTrackerManager}, {\tt
JobInProgress}, {\tt TaskInProgress}, and {\tt Schedulable} objects. Scheduler tests are in
{\tt TestFairScheduler.java}. The {\tt computeFairShares} algorithm is tested separately in
{\tt TestComputeFairShares.java}. All tests use accelerated time via a fake {\tt Clock} class.
+\section{Code Guide}
+The following table lists some key source files in the Fair Scheduler:
+  \hline
+  {\bf File} & {\bf Contents} 
+  \\ \hline
+  {\tt FairScheduler.java} & Scheduler entry point. Also contains update thread, and
logic for preemption, delay scheduling, and running job limits.
+  \\ \hline
+  {\tt Schedulable.java} & Definition of the {\tt Schedulable} class. Extended by {\tt
JobSchedulable} and {\tt PoolSchedulable}.
+  \\ \hline
+  {\tt SchedulingAlgorithms.java} & Contains FIFO and fair sharing comparators, as well
as the {\tt computeFairShares} algorithm in Section \ref{sec:fair-share-computation}.
+  \\ \hline
+  {\tt PoolManager.java} & Reads pool properties from the allocation file and maintains
a collection of {\tt Pool} objects. Pools are created on demand.
+  \\ \hline
+  {\tt Pool.java} & Represents a pool and stores its map and reduce {\tt Schedulables}.
+  \\ \hline
+  {\tt FairSchedulerServlet.java} & Implements the scheduler's web UI.
+  \\ \hline
+  {\tt FairSchedulerEventLog.java} & An easy-to-parse event log for debugging. Must be
enabled through {\tt mapred.fairscheduler.eventlog.enabled}.
+  If enabled, logs are placed in {\tt \$HADOOP\_LOG\_DIR/fairscheduler}.
+  \\ \hline
+  {\tt TaskSelector.java} & A pluggable class responsible for picking tasks within a
job. Currently, {\tt DefaultTaskSelector} delegates to {\tt JobInProgress}, but this would
be a useful place to experiment with new algorithms for speculative execution and locality.
+  \\ \hline
+  {\tt LoadManager.java} & A pluggable class responsible for determining when to launch
more tasks on a TaskTracker. Currently, {\tt CapBasedLoadManager} uses slot counts, but this
would be a useful place to experiment with scheduling based on machine load.
+  \\ \hline
+  {\tt WeightAdjuster.java} & A pluggable class responsible for setting job weights.
An example, {\tt NewJobWeightBooster}, is provided, which increases weight temporarily for
new jobs.
+  \\ \hline

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/ivy.xml?rev=1169585&r1=1169584&r2=1169585&view=diff
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/ivy.xml (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/ivy.xml Sun Sep
11 23:57:37 2011
@@ -26,6 +26,14 @@
+    <dependency org="commons-collections"
+      name="commons-collections"
+      rev="${commons-collections.version}"
+      conf="common->default"/>
+    <dependency org="commons-cli"
+      name="commons-cli"
+      rev="${commons-cli.version}"
+      conf="common->default"/>
     <dependency org="log4j"

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java?rev=1169585&r1=1169584&r2=1169585&view=diff
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
Sun Sep 11 23:57:37 2011
@@ -18,12 +18,23 @@
 package org.apache.hadoop.mapred;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.conf.Configuration;
  * A {@link LoadManager} for use by the {@link FairScheduler} that allocates
  * tasks evenly across nodes up to their per-node maximum, using the default
  * load management algorithm in Hadoop.
 public class CapBasedLoadManager extends LoadManager {
+  float maxDiff = 0.0f;
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+    maxDiff = conf.getFloat("mapred.fairscheduler.load.max.diff", 0.0f);
+  }
    * Determine how many tasks of a given type we want to run on a TaskTracker. 
    * This cap is chosen based on how many tasks of that type are outstanding in
@@ -32,7 +43,7 @@ public class CapBasedLoadManager extends
    * machines sent out heartbeats earliest.
   int getCap(int totalRunnableTasks, int localMaxTasks, int totalSlots) {
-    double load = ((double)totalRunnableTasks) / totalSlots;
+    double load = maxDiff + ((double)totalRunnableTasks) / totalSlots;
     return (int) Math.ceil(localMaxTasks * Math.min(1.0, load));
@@ -49,4 +60,10 @@ public class CapBasedLoadManager extends
     return tracker.countReduceTasks() < getCap(totalRunnableReduces,
         tracker.getMaxReduceSlots(), totalReduceSlots);
+  @Override
+  public boolean canLaunchTask(TaskTrackerStatus tracker,
+      JobInProgress job,  TaskType type) {
+    return true;
+  }

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java?rev=1169585&r1=1169584&r2=1169585&view=diff
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
Sun Sep 11 23:57:37 2011
@@ -56,12 +56,21 @@ public class DefaultTaskSelector extends
-  public Task obtainNewMapTask(TaskTrackerStatus taskTracker, JobInProgress job)
-      throws IOException {
+  public Task obtainNewMapTask(TaskTrackerStatus taskTracker, JobInProgress job,
+      int localityLevel) throws IOException {
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     int numTaskTrackers = clusterStatus.getTaskTrackers();
-    return job.obtainNewMapTask(taskTracker, numTaskTrackers,
-        taskTrackerManager.getNumberOfUniqueHosts());
+    switch (localityLevel) {
+      case 1:
+        return job.obtainNewNodeLocalMapTask(taskTracker, numTaskTrackers,
+          taskTrackerManager.getNumberOfUniqueHosts());
+      case 2:
+        return job.obtainNewNodeOrRackLocalMapTask(taskTracker, numTaskTrackers,
+          taskTrackerManager.getNumberOfUniqueHosts());
+      default:
+        return job.obtainNewMapTask(taskTracker, numTaskTrackers,
+          taskTrackerManager.getNumberOfUniqueHosts());
+    }

View raw message