hadoop-common-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vuk Ercegovac <verc...@us.ibm.com>
Subject Re: Poly-reduce?
Date Sat, 27 Oct 2007 00:21:58 GMT
We found Map/Reduce job composition to be useful when intermediate result
sizes were comparable or larger than the size of their input.

We implemented a prototype where intermediate Map/Reduce steps are composed
into  a single task. Pictorially, we transform a sequence of Map and Reduce
    M -> R, M -> R, M -> R      // default behavior
    M -> R|M -> R|M -> R        // with composition

Here,  “R|M”  denotes the composition of a Reduce (R) with a subsequent Map
(M),  allowing  the  output  of  R  to  be  directly  consumed by M. In our
prototype,  the  programmer  provides  the  same  Map and Reduce classes as
before,  as well as the sequence of Map/Reduce jobs to run. Then, under the
covers,  the system takes care of composing the intermediate Map and Reduce

To  illustrate how this can improve performance, denote a read and write to
HDFS  as  “rh”  and  “wh”, respectively, and denote a read and write to the
local  file  system  read  as  “rl”,  and  “wl”.  Finally, denote a network
transfer  by  “n”.  Then,  in  terms of disk and network I/O, the two cases
above can be described in more detail as:
    M(rh,  wl,  n)  ->  R(wl, rl, wh),  M(rh, wl, n) -> R(wl, rl, wh)    //
default behavior
    M(rh, wl, n) -> R|M(wl, rl, wl, n) -> R(wl, rl, wh)                  //
with composition

Comparing  the  two  cases,  there  are  two  key points to notice: 1) Each
composed  R|M  task eliminates an HDFS read and write, 2) If a node with an
R|M  task  fails  before  its  output  is  consumed, the whole task must be
re-evaluated,  since  data  is  not written to HDFS. This re-evaluation can
potentially  cascade back to a Map task from the first step but it does not
necessarily require that the whole job be re-evaluated.

To  compare  the  two cases analytically, assume for the sake of simplicity
that  the  size  of the input data is equal to the size of intermediate and
output  data. Let a “pass” over the data denote reading or writing the data
to  disk  or transferring the data over the network. Then with N Map/Reduce
jobs, we have:
    (N  jobs* 2 tasks/job * 3 passes/job) = 6*N passes           // default
    3               +               4*(N-1)               +               3
passes                                                          //     with

Comparing  the  two  cases,  we  have 6 + 6*(N-1) passes verses 6 + 4*(N-1)
passes, which is a savings of 33% with composition.

With  our  prototype  running on a 10-node cluster, we achieved a 15 to 20%
speedup  on a workload with 3- to 5-way joins using Map/Reduce composition.
Note  that  we  used  a  replication  factor of 1 when writing HDFS in both
cases.  While  we  have  not yet explored scheduling and other optimization
issues,  knowing  that  multiple  jobs  are  related  may  lead  to further
improvements in performance.

Our prototype did add complexity to the JobTracker, since now a sequence of
Map/Reduce  jobs  must  be  configured and tracked. This makes job.xml more
complicated.  Also,  as  noted earlier, an intermediate R|M step writes its
output  to the local file system instead of HDFS. So if its node fails, the
output  on that node is lost. But if the output of R|M steps are written to
HDFS  instead,  then  we  can  still  avoid  2  passes over the data, while
benefiting  from  the fault tolerance that HDFS provides to avoid cascading
re-evaulation. We did not implement this, however.

To summarize, Map/Reduce composition can improve the performance of a
sequence of Map/Reduce jobs by eliminating disk I/O for intermediate
results. However, it comes with increased system complexity and possibly
more re-evaluations in the face of node failures. We would be interested in
getting more feedback on whether people think some of the ideas regarding
Map/Reduce composition are worth considering in Hadoop.



             Milind Bhandarkar                                             
             c.com>                                                     To 
             08/29/2007 11:10                                           cc 
                                       Re: Poly-reduce?                    
             Please respond to                                             

I agree with Owen and Doug. As long as the intermediate outputs (i.e. Data
in between phases) are stored on tasktrackers' local disks, prone to
failure, having more than two phases will be counterproductive. If
intermediate data storage were on a fault-tolerant DFS, one would see more
benefits of chaining arbitrary sequence of phases. (But then the reasoning
in the original email for having multiple-phases, i.e not having to upload
data to DFS, would no longer be valid.)

- milind

On 8/24/07 9:53 AM, "Doug Cutting" <cutting@apache.org> wrote:

> Ted Dunning wrote:
>> It isn't hard to implement these programs as multiple fully fledged
>> map-reduces, but it appears to me that many of them would be better
>> expressed as something more like a map-reduce-reduce program.
>> [ ... ]
>> Expressed conventionally, this would have write all of the user sessions
>> HDFS and a second map phase would generate the pairs for counting.  The
>> opportunity for efficiency would come from the ability to avoid writing
>> intermediate results to the distributed data store.
>> Has anybody looked at whether this would help and whether it would be
>> to do?
> It would job tracker more complicated, and might not help job execution
> time that much.
> Consider implementing this as multiple map reduce steps, but using a
> replication level of one for intermediate data.  That would mostly have
> the performance characteristics you want.  But if a node died, things
> could not intelligently automatically re-create just the missing data.
> Instead the application would have to re-run the entire job, or subsets
> of it, in order to re-create the un-replicated data.
> Under poly-reduce, if a node failed, all tasks that were incomplete on
> that node would need to be restarted.  But first, their input data would
> need to be located.  If you saved all intermediate data in the course of
> a job (which would be expensive) then the inputs that need re-creation
> would mostly just be those that were created on the failed node.  But
> this failure would generally cascade all the way back to the initial map
> stage.  So a single machine failure in the last phase could double the
> run time of the job, with most of the cluster idle.
> If, instead, you used normal mapreduce, with intermediate data
> replicated in the filesystem, a single machine failure in the last phase
> would only require re-running tasks from the last job.
> Perhaps, when chaining mapreduces, one should use a lower replication
> level for intermediate data, like two.  Additionally, one might wish to
> relax the one-replica-off-rack criterion for such files, so that
> replication is faster, and since whole-rack failures are rare.  This
> might give good chained performance, but keep machine failures from
> knocking tasks back to the start of the chain.  Currently its not
> possible to disable the one-replica-off-rack preference, but that might
> be a reasonable feature request.
> Doug

Milind Bhandarkar

View raw message