hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ricky Ho <...@adobe.com>
Subject More Hadoop Design Question
Date Thu, 06 Nov 2008 20:30:12 GMT
Bryan and Owen, thanks for your prompted response and comment.

Hmmm, sounds like the combiner is invoked after the map() process completed for the file split.
 In other words, the combiner will not started before all the map() within that file split
is completed (for the same reason that the combiner result won't be accurate otherwise). 
Is this correct ?

That means, before the combiner function starts, all the intermediate map() output result
will be kept in memory ?  Any comment on the memory footprint consumption ?

Regarding your comment on the final "reduce" result will be incorrect if we let the map tasks
and reduce tasks running concurrently.  I think a sufficient condition is just to make sure
the reduce task will not COMPLETE before all the map tasks has completed.  We don't need to
make sure the reduce task will not START before all maps tasks has completed.  This can be
achieved easily by letting the iterator.next() call within the reduce() method blocked.

Look at a typical reduce method ...

class MyReducer implements Reducer {
        void reduce(Text key, Iterator values, OutputCollector output, ...) {
                Object accum = new Result();
                while (values.hasNext()) {
                        Object value = values.next().get();
                        accum = accumulate(accum, value);
                output.collect(key, accum);

Think about the reduce() method is called much earlier, while the map tasks still working.
 When reaching the line "values.next()", the reduce task will be blocked if there is no map
result available (ie: when the map tasks doesn't feed him fast enough).  The final result
is still accurate because the reduce() method will not return before all the map tasks has
completed.  This way we can have the reduce() task overlapping with map() task to shorten
the overall job duration.  Right ?

There is another potential issue in the reduce() API, can you explain why do we need to expose
the OutputCollector to the reduce() method ?  For example, is it possible that the "key" in
the output.collect() be a different key from the reduce method parameter ?  What happen if
two reduce method (start with different keys) writing their output on the same key ?

You may notice that in my earlier proposed blocked call in values.next(), having a lot of
blocking threads (one thread per key) is not a good idea.  Instead of blocking within the
reduce() method, it is much better to have Hadoop framework to invoke the reduce method after
some (but not all) map results are available.

However, this requires some change of the current Reducer interface.  Currently the reduce()
method is called once per key.  We want that to be called once per map result (within the
same key).  What I mean is the following interface ...

public interface Reducer {
        Writable init();
        Writable reduce(Object key, Writable value, Writable accum);

Now implement a WordCountReducer will look like the following ...

public class WordCountReducer implements Reducer {
        public Writable init() {
                return new IntWritable(0);

        public Writable reduce(Object key, IntWritable value, IntWritable accum) {
                return new IntWritable(accum.get() + value.get());

Summarizing the difference of this interface (from the original one)

1) The reduce() method called per map output, rather than called per key.  Therefore, the
Hadoop framework doesn't need to wait for the whole map process to complete before starting
the reduce, performance will be improved by overlapping the map and reduce execution.

2) OutputCollector is not exposed to the reduce() method.  Hadoop framework will write out
the final reduced result using the same key.

Does it make sense ?


-----Original Message-----
From: Bryan Duxbury [mailto:bryan@rapleaf.com]
Sent: Thursday, November 06, 2008 10:36 AM
To: core-user@hadoop.apache.org
Subject: Re: Hadoop Design Question

Comments inline.

On Nov 6, 2008, at 9:29 AM, Ricky Ho wrote:

> Hi,
> While exploring how Hadoop fits in our usage scenarios, there are 4
> recurring issues keep popping up.  I don't know if they are real
> issues or just our misunderstanding of Hadoop.  Can any expert shed
> some light here ?
> Disk I/O overhead
> ==================
> - The output of a Map task is written to a local disk and then
> later on upload to the Reduce task.  While this enable a simple
> recovery strategy when the map task failed, it incur additional
> disk I/O overhead.
> - For example, in our popular Hadoop example of calculating the
> approximation of "Pi", there isn't any input data.  The map tasks
> in this example, should just directly feed its output to the reduce
> task.  So I am wondering if there is an option to bypassing the
> step of writing the map result to the local disk.

In most data-intensive map/reduce jobs, you have to spill your map
output to disk at some point because you will run out of memory
otherwise. Additionally, Pi calculation is a really bad example,
because you could always start "reducing" any pairs together
arbitrarily. This is because pi calculation is commutative and
associative. We have a special construct for situations like that
called a "combiner", which is basically a map-side reducer.

> Pipelining between Map & Reduce phases is not possible
> =======================================================
> - In the current setting, it sounds like no reduce task will be
> started before all map tasks have completed.  In case if there are
> a few slow running map tasks, the whole job will be delayed.
> - The overall job execution can be shortened if the reduce tasks
> can starts its processing as soon as some map results are available
> rather than waiting for all the map tasks to complete.

You can't start reducing until all map tasks are complete because
until all map tasks complete, you can't do an accurate sort of all
intermediate key/value pairs. That is, if you just started reducing
the results of a single map task immediately, you might have other
values for some keys that come from different map tasks, and your
reduce would be inaccurate. In theory if you know that each map task
produces keys only in a certain range, you could start reducing
immediately after the map task finishes, but that seems like an
unlikely case.

> Pipelining between jobs
> ========================
> - In many cases, we've found the parallel computation doesn't
> involve just one single map/reduce job, but multiple inter-
> dependent map/reduce jobs then work together in some coordinating
> fashion.
> - Again, I haven't seen any mechanism available for 2 MapReduce
> jobs to directly interact with each other.  Job1 must write its
> output to HDFS for Job2 to pickup. On the other hand, once the
> "map" phase of a Job2 has started, all its input HDFS files has to
> be freezed (in other words, Job1 cannot append more records into
> the HDFS files)
> - Therefore it is impossible for the reduce phase of Job1 to stream
> its output data to a file while the map phase of Job2 start reading
> the same file.  Job2 can only start after ALL REDUCE TASKS of Job1
> is completed, which makes pipelining between jobs impossible.

Certainly, many transformations take more than one map/reduce job.
However, very few could actually be pipelined such that the output of
one fed directly into another without an intermediate stop in a file.
If the first job does any grouping or sorting, then the reduce is
necessary and it will have to write out to a file before anything
else can go on. If the second job also does grouping or sorting, then
you definitely need two jobs. If the second job doesn't do grouping
or sorting, then it can probably be collapsed into either the map or
reduce of the first job.

> No parallelism of reduce task with one key
> ===========================================
> - Parallelism only happens in the map phase, as well as reduce
> phase (on different keys).  But there is no parallelism within a
> reduce process of a particular key
> - This means the partitioning function has to be chosen carefully
> to make sure the workload of the reduce processes is balanced.
> (maybe not a big deal)
> - Is there any thoughts of running a pool of reduce tasks on the
> same key and have they combine their results later ?

I think you will find very few situations where you have only one key
on reduce. If you do, it's probably a scenario where you can use a
combiner and eliminate the problem. Basically all map/reduce jobs
I've worked on have a large number of keys going into the reduce phase.

> Rgds, Ricky

View raw message