hadoop-common-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Owen O'Malley (JIRA)" <j...@apache.org>
Subject [jira] Commented: (HADOOP-3445) Implementing core scheduler functionality in Resource Manager (V1) for Hadoop
Date Sat, 30 Aug 2008 00:05:45 GMT

    [ https://issues.apache.org/jira/browse/HADOOP-3445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12627166#action_12627166

Owen O'Malley commented on HADOOP-3445:

You can use the new method TaskInProgress.getActiveTasks instead of

TaskInProgress.killAnyTask isn't appropriate to add to the
framework. I think the semantics should be rethough rather than
killing the first attempt in a task, which will likely kill the
non-speculative task attempt instead of the speculative task
attempt. It would be much better to use the new getActiveTasks to find
the task attempt that is furthest behind.

The default configuration for this scheduler should *not* be in the
framework class ResourceManagerConf. They should be in a class in

I don't understand the meaning of "getFirstQueue in
ResourceManagerConf. Do you mean default queue? This seems to not
match HADOOP-3698.

JobInProgress.killTasks doesn't belong in the JobInProgress. It is
making scheduling decisions and belongs in the capacity
scheduler. If you need access to the runningMapCache, make a
getRunningMapCache() that is package private. It isn't a great API,
but it keeps the scheduling decisions that are specific to this
scheduler in an appropriate place. This scheduler should pick which
task attempt to kill and call TaskInProgress.killTask(attemptid,

That said, I think the approach is wrong. The queues should kill from
the most recently launch job and kill the task attempt that has made
the least progress. That minimized the cost of the killing actions. 

The myDaddy field should be renamed to parent. We don't need to be
gender specific or fail to use the standard computer science terms.

Lots of the comments are block comments with "/////////////" that
should be javadoc, so that the tools understand them. In Hadoop, we
refrain from using long strings of separators in comments.

ReclaimedResource.countdown would be better expressed as the time of
the deadline, so that it doesn't need to be decremented and isn't
sensitive to the update cycle times.

> Implementing core scheduler functionality in Resource Manager (V1) for Hadoop
> -----------------------------------------------------------------------------
>                 Key: HADOOP-3445
>                 URL: https://issues.apache.org/jira/browse/HADOOP-3445
>             Project: Hadoop Core
>          Issue Type: New Feature
>            Reporter: Vivek Ratan
>            Assignee: Vivek Ratan
>         Attachments: 3445.1.patch, 3445.2.patch, 3445.3.patch, 3445.4.patch
> The architecture for the Hadoop Resource Manager (V1) is described in HADOOP-3444. This
Jira proposes implementation details on the core scheduling piece - the changes to the JT
to handle Orgs, queues, guaranteed capacities, user limits, and ultimately, scheduling a task
on a TT. 
> As per the architecture in HADOOP-3444, the JT contains a new component, Job Queue Manager
(JQM), to handle queues of jobs. Each queue represents a queue in an Org (one queue per Org).
Job queues are backed up by disk based storage. 
> We now look at some details. Terminology: 
> * A queue has *excess capacity* if it does not have enough jobs (queued or running) to
take up its guaranteed capacity. Excess capacity needs to be distributed to queues that have
> * Queues that have given up excess capacity to other queues are called *low queues*,
for the sake of this discussion. Queues that are running on additional capacity are called
*high queues*.
> For each queue, the JT keeps track of the following: 
> * Guaranteed capacity (GC): the capacity guaranteed to the queue, set up through configuration.
The sum of all GCs is equal to the grid capacity. Since we're handling Map and Reduce slots
differently, we will have a GC for each, i.e., a CG-M for maps and a GC-R for reducers. The
sum of all GC-Ms is equal to the sum of all map slots available in the Grid, and the same
for GC-Rs. 
> * Allocated capacity (AC): the current capacity of the queue. This can be higher or lower
than the GC because of excess capacity distribution. The sum of all ACs is equal to the grid
capacity. As above, each queue will have a AC-M and AC-R. 
> * Timer for claiming containers: can just be the # of seconds the queue can wait till
it needs its capacity back. There will be separate timers for claiming map and reduce slots
(we will likely want to take more time to claim reduce slots, as reducers take longer to run).

> * # of containers being used, i.e., the number of running tasks associated with the queue
(C-RUN).  Each queue will have a C-RUN-M and C-RUN-R. 
> * Whether any jobs are queued. 
> * The number of Map and Reduce containers used by each user. 
> Every once in a while (this can be done periodically, or based on events), the JT looks
at redistributing capacity. This can result in excess capacity being given to queues that
need them, and capacity being claimed by queues. 
> *Excess capacity is redistributed as follows*:
>    * The algorithm below is in terms of tasks, which can be map or reduce tasks. It is
the same for both. The JT will run the algorithm to redistribute excess capacity for both
Maps and Reduces. 
>    * The JT checks each queue to see if it has excess capacity. A queue has excess capacity
if the number of running tasks associated with the queue is less than the allocated capacity
of the queue (i.e., if C-RUN < AC) and there are no jobs queued. 
>       ** Note: a tighter definition is if C-RUN plus the number of tasks required by
the waiting jobs is less than AC, but we don't need that level of detail. 
>    * If there is at least one queue with excess capacity, the total excess capacity is
the sum of excess capacities of each queue. The JT figures out the queues that this capacity
can be distributed to. These are queues that need capacity, where C-RUN = AC (i.e., the queue
is running at max capacity) and there are queued jobs. 
>    * The JT now figures out how much excess capacity to distribute to each queue that
needs it. This can be done in many ways. 
>       ** Distribute capacity in the ratio of each Org's guaranteed capacity. So if queues
Q1, Q2, and Q3 have guaranteed capacities of GC1, GC2, and GC3, and if Q3 has N containers
of excess capacity, Q1 gets (GC1*N)/(GC1+GC2) additional capacity, while Q2 gets (GC2*N)/(GC1+GC2).

>       ** You could use some other ratio that uses the number of waiting jobs. The more
waiting jobs a queue has, the more its share of excess capacity.
>    * For each queue that needs capacity, the JT increments its AC with the capacity it
is allocated. At the same time, the JT appropriately decrements the AC of queues with excess
> *Excess capacity is reclaimed as follows*: 
> * The algorithm below is in terms of tasks, which can be map or reduce tasks. It is the
same for both. The JT will run the algorithm to reclaim excess capacity for both Maps and
> * The JT determines which queues are low queues (if AC < GC). If a low queue has a
job waiting, then we need to reclaim its resources. Capacity to be reclaimed = GC-AC. 
> * Capacity is re-claimed from any of the high queues (where AC > GC). 
> * JT decrements the AC of the high queue from which capacity is to be claimed, and increments
the AC of the low queue. The decremented AC of the high queue cannot go below its GC, so the
low queue may get its capacity back from more than one queue. 
> * The JT also starts a timer for the low queue (this can be an actual timer, or just
a count, perhaps representing seconds, which can be decremented by the JT periodically). 
> * If a timer goes off, the JT needs to instruct some high queue to kill some of their
tasks. How do we decide which high queues to claim capacity from? 
> ** The candidates are those high queues which are running more tasks than they should
be, i.e., where C-RUN > AC. 
> ** Among these queues, the JT can pick those that are using the most excess capacity
(i.e. queues with a higher value for (C-RUN - AC)/AC ). 
> * How does a high queue decide which tasks to kill? 
> ** Ideally, you want to kill tasks that have started recently or made the least progress.
You might want to use the same algorithm you use to decide which tasks to speculatively run
(though that algorithm needs to be fixed). 
> ** Note: it is expensive to kill tasks, so we need to focus on getting better at deciding
which tasks to kill. 
> Within a queue, a user's limit can dynamically change depending on how many users have
submitted jobs. This needs to be handled in a way similar to how we handle excess capacity
between queues. 
> *When a TT has a free Map slot*:
> # TT contacts JT to give it a Map task to run. 
> # JT figures out which queue to approach first (among all queues that have capacity,
i.e., where C-RUN-M < AC-M). This can be done in a few ways:
>       ** Round-robin, so every queue/Org has the same chance to get a free container.

>       ** JT can pick the queue with the maximum unused capacity. 
> # JT needs to pick a job which can use the slot. 
>       ** If it has no running jobs from that queue, it gets one from the JQM. 
>          *** JT asks for the first Job in the selected queue, via the JQM. If the job's
user's limit is maxed out, the job is returned to the queue and JT asks for the next job.
This continues till the JT finds a suitable job. 
>          *** Or else, JT has a list of users in the queue whose jobs it is running, and
it can figure out which of these users are over their limit. It asks the JQM for the first
job in the queue whose user is not in a list of maxed-out users it provides. 
>       ** If the JT already has a list of running jobs from the queue, it looks at each
(in order of priority) till it finds one whose user's limit has not been exceeded.
>    # If there is no job in the queue that is eligible to run (the queue may have no queued
jobs), the JT picks another queue using the same steps. 
>    # The JT figures out which Map task from the job to run on the free TT using the same
algorithm as today (find a  locality match using the job's cache, then look for failed tasks
or tasks on the rack, etc). 
>    # JT increments C-RUN-M and the number of Map containers used by the job's user. It
then returns the task to the TT. 
> *When a TT has a free Reduce slot*: This is similar to what happens with a free Map slot,
except that: 
>    * we can use a different algorithm to decide which Reduce task to run from a give
job. I'm not sure what we do today for Reduce tasks (I think we just pick the first one),
but if it needs to be improved, that's a separate issue.
>    * Since there is no preemption of jobs based on priorities, we will not have the situation
where a job's Reducers are blocking containers as they're waiting for Maps to run and there
are no Map slots to run. 
> *When a task fails or completes*: JT decrements C-RUN and the # of containers used by
the user. 

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

View raw message