hadoop-common-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Pete Wyckoff (JIRA)" <j...@apache.org>
Subject [jira] Commented: (HADOOP-2510) Map-Reduce 2.0
Date Tue, 20 May 2008 20:26:55 GMT

    [ https://issues.apache.org/jira/browse/HADOOP-2510?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12598451#action_12598451
] 

Pete Wyckoff commented on HADOOP-2510:
--------------------------------------

Hadoop scheduling woes:

IMHO: A big part of the problem is that the Map/Reduce framework and speculative execution
was never intended to run long tasks.  With short tasks, speculative execution works stunningly
well (even without heartbeats  since who cares if a task fails - the original designers of
speculative execution never used heartbeats or cared if a machine failed) AND any task can
be pre-empted and no task blocks other tasks or holds lots of disk space up on the task tracker
(ie long running reduce holding map slots).

I assume I'm preaching to the choir,  but not as to how we can make reduce tasks small:

1.  sorting and shuffling are cheaper with "bigger" reduces - yes, but the tradeoff is not
worth it (in most? cases), esp to run reduces taking an hour or more.  Does anyone know of
the actual cost in a real cluster of the size of the reduce vs sort/shuffle time?

2. 1 or 2 reduces get the majority of the keys -  (a) make the #of reduces a prime as this
is a hash thing; (b) improve the hashcode implementation as it now uses string.hashCode which
is weak.

3. I want one file as my output?  First, I don't understand why (nor have I met anyone who
did :)) , but even so, there are a million ways around this. e.g., a cat command that looks
at each part and cats them in order. Or if these are big files, augment HDFS to handle different
sized blocks in 1 file and then create a primitive to fold N files into one (in sorted order
:) )

4. My algorithm requires all the keys to go to one reduce. Excellent. So, your reduce is too
long to use speculative execution and during the entire time you're holding the map slots
and the data on the task trackers. I would propose a better model for this is to have __0__
reduces (and replication 1 for the output file) and then run the reduce (being rack aware)
using something like torque - and since it's long running, run 2 copies as that's really the
only way to mask a slow machine/failure for such a task.

(a) If tasks are (relatively) short, speculative execution works! and (b) best of all for
this JIRA, any task can be pre-empted. So, one can be very aggressive about scheduling in
a multi-user environment as there's near 0 cost to pre-empting any task to run a more high
priority job.

Trying to fit a scheduling algorithm that handles long running tasks is really, really tough
and why bother when these type tasks don't really fit the framework.

my 2 cents.



> Map-Reduce 2.0
> --------------
>
>                 Key: HADOOP-2510
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2510
>             Project: Hadoop Core
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Arun C Murthy
>
> We, at Yahoo!, have been using Hadoop-On-Demand as the resource provisioning/scheduling
mechanism. 
> With HoD the user uses a self-service system to ask-for a set of nodes. HoD allocates
these from a global pool and also provisions a private Map-Reduce cluster for the user. She
then runs her jobs and shuts the cluster down via HoD when done. All user-private clusters
use the same humongous, static HDFS (e.g. 2k node HDFS). 
> More details about HoD are available here: HADOOP-1301.
> ----
> h3. Motivation
> The current deployment (Hadoop + HoD) has a couple of implications:
>  * _Non-optimal Cluster Utilization_
>    1. Job-private Map-Reduce clusters imply that the user-cluster potentially could be
*idle* for atleast a while before being detected and shut-down.
>    2. Elastic Jobs: Map-Reduce jobs, typically, have lots of maps with much-smaller no.
of reduces; with maps being light and quick and reduces being i/o heavy and longer-running.
Users typically allocate clusters depending on the no. of maps (i.e. input size) which leads
to the scenario where all the maps are done (idle nodes in the cluster) and the few reduces
are chugging along. Right now, we do not have the ability to shrink the HoD'ed Map-Reduce
clusters which would alleviate this issue. 
>  * _Impact on data-locality_
> With the current setup of a static, large HDFS and much smaller (5/10/20/50 node) clusters
there is a good chance of losing one of Map-Reduce's primary features: ability to execute
tasks on the datanodes where the input splits are located. In fact, we have seen the data-local
tasks go down to 20-25 percent in the GridMix benchmarks, from the 95-98 percent we see on
the randomwriter+sort runs run as part of the hadoopqa benchmarks (admittedly a synthetic
benchmark, but yet). Admittedly, HADOOP-1985 (rack-aware Map-Reduce) helps significantly here.
> ----
> Primarily, the notion of *job-level scheduling* leading to private clusers, as opposed
to *task-level scheduling*, is a good peg to hang-on the majority of the blame.
> Keeping the above factors in mind, here are some thoughts on how to re-structure Hadoop
Map-Reduce to solve some of these issues.
> ----
> h3. State of the Art
> As it exists today, a large, static, Hadoop Map-Reduce cluster (forget HoD for a bit)
does provide task-level scheduling; however as it exists today, it's scalability to tens-of-thousands
of user-jobs, per-week, is in question.
> Lets review it's current architecture and main components:
>  * JobTracker: It does both *task-scheduling* and *task-monitoring* (tasktrackers send
task-statuses via periodic heartbeats), which implies it is fairly loaded. It is also a _single-point
of failure_ in the Map-Reduce framework i.e. its failure implies that all the jobs in the
system fail. This means a static, large Map-Reduce cluster is fairly susceptible and a definite
suspect. Clearly HoD solves this by having per-job clusters, albeit with the above drawbacks.
>  * TaskTracker: The slave in the system which executes one task at-a-time under directions
from the JobTracker.
>  * JobClient: The per-job client which just submits the job and polls the JobTracker
for status. 
> ----
> h3. Proposal - Map-Reduce 2.0 
> The primary idea is to move to task-level scheduling and static Map-Reduce clusters (so
as to maintain the same storage cluster and compute cluster paradigm) as a way to directly
tackle the two main issues illustrated above. Clearly, we will have to get around the existing
problems, especially w.r.t. scalability and reliability.
> The proposal is to re-work Hadoop Map-Reduce to make it suitable for a large, static
cluster. 
> Here is an overview of how its main components would look like:
>  * JobTracker: Turn the JobTracker into a pure task-scheduler, a global one. Lets call
this the *JobScheduler* henceforth. Clearly (data-locality aware) Maui/Moab are  candidates
for being the scheduler, in which case, the JobScheduler is just a thin wrapper around them.

>  * TaskTracker: These stay as before, without some minor changes as illustrated later
in the piece.
>  * JobClient: Fatten up the JobClient my putting a lot more intelligence into it. Enhance
it to talk to the JobTracker to ask for available TaskTrackers and then contact them to schedule
and monitor the tasks. So we'll have lots of per-job clients talking to the JobScheduler and
the relevant TaskTrackers for their respective jobs, a big change from today. Lets call this
the *JobManager* henceforth. 
> A broad sketch of how things would work: 
> h4. Deployment
> There is a single, static, large Map-Reduce cluster, and no per-job clusters.
> Essentially there is one global JobScheduler with thousands of independent TaskTrackers,
each running on one node.
> As mentioned previously, the JobScheduler is a pure task-scheduler. When contacted by
per-job JobManagers querying for TaskTrackers to run their tasks on, the JobTracker takes
into the account the job priority, data-placements (HDFS blocks), current-load/capacity of
the TaskTrackers and gives the JobManager a free slot for the task(s) in question, if available.
> Each TaskTracker periodically updates the master JobScheduler with information about
the currently running tasks and available free-slots. It waits for the per-job JobManager
to contact it for free-slots (which abide the JobScheduler's directives) and status for currently-running
tasks (of course, the JobManager knows exactly which TaskTrackers it needs to talk to).
> The fact that the JobScheduler is no longer doing the heavy-lifting of monitoring tasks
(like the current JobTracker), and hence the jobs, is the key differentiator, which is why
it should be very light-weight. (Thus, it is even conceivable to imagine a hot-backup of the
JobScheduler, topic for another discussion.)
> h4. Job Execution
> Here is how the job-execution work-flow looks like:
>     * User submits a job,
>     * The JobClient, as today, validates inputs, computes the input splits etc.
>     * Rather than submit the job to the JobTracker which then runs it, the JobClient
now dons the role of the JobManager as described above (of course they could be two independent
processes working in conjunction with the other... ). The JobManager pro-actively works with
the JobScheduler and the TaskTrackers to execute the job. While there are more tasks to run
for the still-running job, it contacts the JobScheduler to get 'n' free slots and schedules
m tasks (m <= n) on the given TaskTrackers (slots). The JobManager also monitors the tasks
by contacting the relevant TaskTrackers (it knows which of the TaskTrackers are running its
tasks). 
> h4. Brownie Points
>  *  With Map-Reduce v2.0, we get reliability/scalability of the current (Map-Reduce +
HoD) architecture.
>  * We get elastic jobs for free since there is no concept of private clusters and clearly
JobManagers do not need to hold on to the map-nodes when they are done.
>  * We do get data-locality across all jobs, big or small, since there are no off-limit
DataNodes (i.e. DataNodes outside the private cluster) for a Map-Reduce cluster, as today.
>  * From an architectural standpoint, each component in the system (sans the global scheduler)
is nicely independent and impervious of the other:
>   ** A JobManager is responsible for one and only one job, loss of a JobManager affects
only one job.
>   ** A TaskTracker manages only one node, it's loss affects only one node in the cluster.

>   ** No user-code runs in the JobScheduler since it's a pure scheduler.
>  * We can run all of the user-code (input/output formats, split calculation, task-output
promotion etc.) from the JobManager since it is, by definition, the user-client. 
> h4. Points to Ponder
>  * Given that the JobScheduler, is very light-weight, could we have a hot-backup for
HA?
>  * Discuss the notion of a rack-level aggregator of TaskTracker statuses i.e. rather
than have every TaskTracker update the JobScheduler, a rack-level aggregator could achieve
the same?
>  * We could have the notion of a JobManager being the proxy process running inside the
cluster for the JobClient (the job-submitting program which is running outside the colo e.g.
user's dev box) ... in fact we can think of the JobManager being *another kind of task* which
needs to be scheduled to run at a TaskTracker. 
>  * Task Isolation via separate vms (vmware/xen) rather than just separate jvms?
> h4. How do we get to Map-Reduce 2.0?
> At the risk of sounding hopelessly optimistic, we probably do not have to work too much
to get here.
>  * Clearly the main changes come in the JobTracker/JobClient where we _move_ the pieces
which monitor the job's tasks' progress into the JobScheduler/JobManager.
>  * We also need to enhance the JobClient (as the JobManager) to get it to talk to the
JobTracker (JobScheduler) to query for the empty slots, which might not be available!
>  * Then we need to add RPCs to get the JobClient (JobManager) to talk to the given TaskTrackers
to get them to run the tasks, thus reversing the direction of current RPCs needed to start
a task (now the TaskTracker asks the JobTracker for tasks to run); we also need new RPCs for
the JobClient (JobManager) to talk to the TaskTracker to query it's tasks' statuses.
>  * We leave the current heartbeat mechanism from the TaskTracker to the JobTracker (JobScheduler)
as-is, sans the task-statuses. 
> h4. Glossary
>  * JobScheduler - The global, task-scheduler which is today's JobTracker minus the code
for tracking/monitoring jobs and their tasks. A pure scheduler.
>  * JobManager - The per-job manager which is wholly responsible for working with the
JobScheduler and TaskTrackers to schedule it's tasks and track their progress till job-completion
(success/failure). Simplistically it is the current JobClient plus the enhancements to enable
it to talk to the JobScheduler and TaskTrackers for running/monitoring the tasks. 
> ----
> h3. Tickets for the Gravy-Train ride
> Eric has started a discussion about generalizing Hadoop to support non-MR tasks, a discussion
which has surfaced a few times on our lists, at HADOOP-2491. 
> He notes:
> {quote}
> Our primary goal in going this way would be to get better utilization out of map-reduce
clusters and support a richer scheduling model. The ability to support alternative job frameworks
would just be gravy!
> Putting this in as a place holder. Hope to get folks talking about this to post some
more detail.
> {quote}
> This is the start of the path to the promised gravy-land. *smile*
> We believe Map-Reduce 2.0 is a good start in moving most (if not all) of the Map-Reduce
specific code into the user-clients (i.e. JobManager) and taking a shot at generalizing the
JobTracker (as the JobScheduler) and the TaskTracker to handle more generic tasks via different
(smarter/dumber) user-clients.
> ----
> Thoughts?

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message