Return-Path: Delivered-To: apmail-hadoop-core-dev-archive@www.apache.org Received: (qmail 30752 invoked from network); 10 Sep 2008 13:31:39 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Sep 2008 13:31:39 -0000 Received: (qmail 34624 invoked by uid 500); 10 Sep 2008 13:31:33 -0000 Delivered-To: apmail-hadoop-core-dev-archive@hadoop.apache.org Received: (qmail 34592 invoked by uid 500); 10 Sep 2008 13:31:33 -0000 Mailing-List: contact core-dev-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: core-dev@hadoop.apache.org Delivered-To: mailing list core-dev@hadoop.apache.org Received: (qmail 34581 invoked by uid 99); 10 Sep 2008 13:31:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Sep 2008 06:31:33 -0700 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.140] (HELO brutus.apache.org) (140.211.11.140) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Sep 2008 13:30:43 +0000 Received: from brutus (localhost [127.0.0.1]) by brutus.apache.org (Postfix) with ESMTP id 726F6234C1F3 for ; Wed, 10 Sep 2008 06:30:45 -0700 (PDT) Message-ID: <1571327164.1221053445467.JavaMail.jira@brutus> Date: Wed, 10 Sep 2008 06:30:45 -0700 (PDT) From: "Vivek Ratan (JIRA)" To: core-dev@hadoop.apache.org Subject: [jira] Commented: (HADOOP-3445) Implementing core scheduler functionality in Resource Manager (V1) for Hadoop In-Reply-To: <794657095.1211818916167.JavaMail.jira@brutus> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/jira/browse/HADOOP-3445?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12629794#action_12629794 ] Vivek Ratan commented on HADOOP-3445: ------------------------------------- bq. You have a "new Integer(1)" that will likely cause a findbugs warning. It should either just use auto-boxing or use Integer.valueOf(1). I didn't get any findbugs warning because of this line. There have been no findbugs warnings introduced by this patch. bq. I think we should have a single configuration parameter that defines the queue capacity, rather than one for maps and one for reduces. You can configure different number of slots for Maps and Reduces per TT. It then stands to reason that the cluster capacity, and hence a queue capacity, can be different for maps and reduces. The ClusterStatus class keeps different values for max map and reduce tasks. bq. 1. The default values for queues should be configurable. Which values are you referring to? They're all configurable, as far as I know, unless I've missed something. bq. Capacities should be floats, not integers. Queue capacities are expressed as percentages of the grid capacities. Why would they be floats? Sanity checks are something we need to add more of. > Implementing core scheduler functionality in Resource Manager (V1) for Hadoop > ----------------------------------------------------------------------------- > > Key: HADOOP-3445 > URL: https://issues.apache.org/jira/browse/HADOOP-3445 > Project: Hadoop Core > Issue Type: New Feature > Reporter: Vivek Ratan > Assignee: Vivek Ratan > Attachments: 3445.1.patch, 3445.10.patch, 3445.2.patch, 3445.3.patch, 3445.4.patch, 3445.5.patch, 3445.6.patch, 3445.7.patch, 3445.8.patch, 3445.9.patch > > > The architecture for the Hadoop Resource Manager (V1) is described in HADOOP-3444. This Jira proposes implementation details on the core scheduling piece - the changes to the JT to handle Orgs, queues, guaranteed capacities, user limits, and ultimately, scheduling a task on a TT. > As per the architecture in HADOOP-3444, the JT contains a new component, Job Queue Manager (JQM), to handle queues of jobs. Each queue represents a queue in an Org (one queue per Org). Job queues are backed up by disk based storage. > We now look at some details. Terminology: > * A queue has *excess capacity* if it does not have enough jobs (queued or running) to take up its guaranteed capacity. Excess capacity needs to be distributed to queues that have none. > * Queues that have given up excess capacity to other queues are called *low queues*, for the sake of this discussion. Queues that are running on additional capacity are called *high queues*. > For each queue, the JT keeps track of the following: > * Guaranteed capacity (GC): the capacity guaranteed to the queue, set up through configuration. The sum of all GCs is equal to the grid capacity. Since we're handling Map and Reduce slots differently, we will have a GC for each, i.e., a CG-M for maps and a GC-R for reducers. The sum of all GC-Ms is equal to the sum of all map slots available in the Grid, and the same for GC-Rs. > * Allocated capacity (AC): the current capacity of the queue. This can be higher or lower than the GC because of excess capacity distribution. The sum of all ACs is equal to the grid capacity. As above, each queue will have a AC-M and AC-R. > * Timer for claiming containers: can just be the # of seconds the queue can wait till it needs its capacity back. There will be separate timers for claiming map and reduce slots (we will likely want to take more time to claim reduce slots, as reducers take longer to run). > * # of containers being used, i.e., the number of running tasks associated with the queue (C-RUN). Each queue will have a C-RUN-M and C-RUN-R. > * Whether any jobs are queued. > * The number of Map and Reduce containers used by each user. > Every once in a while (this can be done periodically, or based on events), the JT looks at redistributing capacity. This can result in excess capacity being given to queues that need them, and capacity being claimed by queues. > *Excess capacity is redistributed as follows*: > * The algorithm below is in terms of tasks, which can be map or reduce tasks. It is the same for both. The JT will run the algorithm to redistribute excess capacity for both Maps and Reduces. > * The JT checks each queue to see if it has excess capacity. A queue has excess capacity if the number of running tasks associated with the queue is less than the allocated capacity of the queue (i.e., if C-RUN < AC) and there are no jobs queued. > ** Note: a tighter definition is if C-RUN plus the number of tasks required by the waiting jobs is less than AC, but we don't need that level of detail. > * If there is at least one queue with excess capacity, the total excess capacity is the sum of excess capacities of each queue. The JT figures out the queues that this capacity can be distributed to. These are queues that need capacity, where C-RUN = AC (i.e., the queue is running at max capacity) and there are queued jobs. > * The JT now figures out how much excess capacity to distribute to each queue that needs it. This can be done in many ways. > ** Distribute capacity in the ratio of each Org's guaranteed capacity. So if queues Q1, Q2, and Q3 have guaranteed capacities of GC1, GC2, and GC3, and if Q3 has N containers of excess capacity, Q1 gets (GC1*N)/(GC1+GC2) additional capacity, while Q2 gets (GC2*N)/(GC1+GC2). > ** You could use some other ratio that uses the number of waiting jobs. The more waiting jobs a queue has, the more its share of excess capacity. > * For each queue that needs capacity, the JT increments its AC with the capacity it is allocated. At the same time, the JT appropriately decrements the AC of queues with excess capacity. > *Excess capacity is reclaimed as follows*: > * The algorithm below is in terms of tasks, which can be map or reduce tasks. It is the same for both. The JT will run the algorithm to reclaim excess capacity for both Maps and Reduces. > * The JT determines which queues are low queues (if AC < GC). If a low queue has a job waiting, then we need to reclaim its resources. Capacity to be reclaimed = GC-AC. > * Capacity is re-claimed from any of the high queues (where AC > GC). > * JT decrements the AC of the high queue from which capacity is to be claimed, and increments the AC of the low queue. The decremented AC of the high queue cannot go below its GC, so the low queue may get its capacity back from more than one queue. > * The JT also starts a timer for the low queue (this can be an actual timer, or just a count, perhaps representing seconds, which can be decremented by the JT periodically). > * If a timer goes off, the JT needs to instruct some high queue to kill some of their tasks. How do we decide which high queues to claim capacity from? > ** The candidates are those high queues which are running more tasks than they should be, i.e., where C-RUN > AC. > ** Among these queues, the JT can pick those that are using the most excess capacity (i.e. queues with a higher value for (C-RUN - AC)/AC ). > * How does a high queue decide which tasks to kill? > ** Ideally, you want to kill tasks that have started recently or made the least progress. You might want to use the same algorithm you use to decide which tasks to speculatively run (though that algorithm needs to be fixed). > ** Note: it is expensive to kill tasks, so we need to focus on getting better at deciding which tasks to kill. > Within a queue, a user's limit can dynamically change depending on how many users have submitted jobs. This needs to be handled in a way similar to how we handle excess capacity between queues. > *When a TT has a free Map slot*: > # TT contacts JT to give it a Map task to run. > # JT figures out which queue to approach first (among all queues that have capacity, i.e., where C-RUN-M < AC-M). This can be done in a few ways: > ** Round-robin, so every queue/Org has the same chance to get a free container. > ** JT can pick the queue with the maximum unused capacity. > # JT needs to pick a job which can use the slot. > ** If it has no running jobs from that queue, it gets one from the JQM. > *** JT asks for the first Job in the selected queue, via the JQM. If the job's user's limit is maxed out, the job is returned to the queue and JT asks for the next job. This continues till the JT finds a suitable job. > *** Or else, JT has a list of users in the queue whose jobs it is running, and it can figure out which of these users are over their limit. It asks the JQM for the first job in the queue whose user is not in a list of maxed-out users it provides. > ** If the JT already has a list of running jobs from the queue, it looks at each (in order of priority) till it finds one whose user's limit has not been exceeded. > # If there is no job in the queue that is eligible to run (the queue may have no queued jobs), the JT picks another queue using the same steps. > # The JT figures out which Map task from the job to run on the free TT using the same algorithm as today (find a locality match using the job's cache, then look for failed tasks or tasks on the rack, etc). > # JT increments C-RUN-M and the number of Map containers used by the job's user. It then returns the task to the TT. > *When a TT has a free Reduce slot*: This is similar to what happens with a free Map slot, except that: > * we can use a different algorithm to decide which Reduce task to run from a give job. I'm not sure what we do today for Reduce tasks (I think we just pick the first one), but if it needs to be improved, that's a separate issue. > * Since there is no preemption of jobs based on priorities, we will not have the situation where a job's Reducers are blocking containers as they're waiting for Maps to run and there are no Map slots to run. > *When a task fails or completes*: JT decrements C-RUN and the # of containers used by the user. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.