hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ferdy Galema <ferdy.gal...@kalooga.com>
Subject Re: Pipelining Mappers and Reducers
Date Thu, 29 Jul 2010 09:31:51 GMT
Could you elaborate on how you merge your data? If you have independent 
map tasks with single key-value outputs that can be written as soon as 
possible, I'm still curious why need to reduce at all. Surely there must 
be a way to merge your data 'on read'.

About multi-level reducing; if you merge your data in multiple steps, 
there would only be a gain if the reducers somehow reduce (no pun 
intended) the amount of data in the pipeline. For example running 400 
maps and 10 reduces followed by another job with a single reducer will 
not benefit if the single reducer has to process the same amount of data 
that the previous reducers have been outputting. Therefore it completely 
depends on what your reducer actually does.


Shai Erera wrote:
> Yes, I'm aware of the fact that I can run w/ 0 reducers. However, I do 
> need my output to be merged, and unfortunately the merge is not really 
> that simple that I can use 'getmerge'. I think I'll give the Combiner 
> a chance now - hopefully its reduce() method will get called as 
> Mappers finish their work.
> BTW, if I'd like to have multi-level Reducers, such that each level 
> works on less data, do I need to do that using multiple Jobs? I 
> haven't seen any API to do so, unless I chain mappers/reducers and 
> includes #levels in the chain. To clarify, if I need to take 400 
> outputs (or much more) and merge them down to 1, currently I need to 
> do that sequentially. If I could do them in several rounds, by chunks 
> of say 10/20, and each round will spawn N Mappers/Reducers to do the 
> job, while N will get smaller and smaller, perhaps I'd gain something?
> Has anyone experienced with something like that? Am I expected to see 
> some improvements?
> Thanks,
> Shai
> On Thu, Jul 29, 2010 at 11:20 AM, Ferdy Galema 
> <ferdy.galema@kalooga.com <mailto:ferdy.galema@kalooga.com>> wrote:
>     Just a quick pointer here:
>     You are aware of the fact that you can configure the number of
>     reduce tasks to be zero? If I read well, you mention the order of
>     your map outputs in de merge does not really matter, as well as
>     having a single value for every key. Perhaps you could eliminate
>     your reduce function altogether?
>     Running a job with zero reduces means that output of the mappers
>     is directly written on the output filesystem (DFS most of the
>     time). Ofcourse this will result in your output being split into
>     multiple parts, namely the number of map tasks. I'm not sure if
>     this suffices, it depends on how your output data will be used.
>     Still, if you absolutely need your data to be merged, you could
>     use "hadoop fs -getmerge ..." to pull a merged copy of the DFS.
>     Btw I share your opinion on keeping your Map/Reduce functions
>     singlethreaded (thus simple) when possible. The Hadoop framework
>     will be able to run your application concurrently by using
>     multiple tasks per node.
>     Ferdy.
>     Shai Erera wrote:
>>     Without going too much into the detail (so that we keep the
>>     discussion simple) -- I'm processing large volumes of text
>>     documents. Currently I'm training (myself :)) on a 13GB
>>     collection, but after I figure out the recipe for writing the
>>     proper mix of Mappers/Reducers/Jobs, I will move to a TB
>>     collection and then larger. Processing of a document outputs a
>>     processed representation of a document (think of it as a large
>>     sequence of bytes). Each Mapper processes a set of documents and
>>     outputs a processed representation which includes all of them.
>>     The output of all Mappers (all processed docs) need to be merged
>>     into one large document (sequence of bytes). I can live w/ a
>>     *few* large sequence of bytes, but since the data will only grow,
>>     merging those 13GB (1.1M docs) will eventually be done into one
>>     file. Therefore I'd like to focus on merging them down to 1 file.
>>     There is no ordering in the merge - as soon as a Mapper finishes,
>>     its output can be appended to the merged file. Hence the
>>     pipelining I'm looking for. The 'merge' process I've described is
>>     simple, however I think it is sufficient in order to understand
>>     what I'm trying to achieve.
>>     Currently, if I use Hadoop's default settings, the 13GB data is
>>     split into 397 mappers (each receives 33MB to process). I've
>>     played w/ the split size and increased it to 64MB and higher (up
>>     to 512MB). The merge part of the job (Reducer) is done on 397
>>     parts (or the # Mappers). So I'm trying to balance the split size
>>     so that on one hand the Mappers can still finish fast, but on the
>>     other hand the Reducer won't take so long to run.
>>     Problem is, the more data I give a Mapper to handle, the more
>>     work it has to do and the more resources it consumes. I wrote a
>>     multi-threaded Mapper which does the processing by several
>>     concurrent threads, when the split size is very large. However
>>     that has a downside -- I basically assume the threads can get all
>>     the CPU power they need, which may not be true. In my cluster I
>>     have 96 cores and 96 Mappers can run concurrently. If I split the
>>     data to 512MB (25 Mappers), then it means each Mapper can get ~4
>>     cores to use. BUT, in production, there may be other jobs running
>>     on the cluster, so beside my 25 Mappers there will be additional
>>     71 that may be run, therefore each of my Mappers won't get the
>>     CPU power it needs.
>>     Therefore, I'd like to avoid multi-threaded Mappers at all, and
>>     keep it simple - figure out a magic number for the split size so
>>     that each Mapper can do its job in a single thread, and if the
>>     cluster is busy, then less of my Mappers will run.
>>     But all that is sort of my problem (figure out the magic number).
>>     What I'm trying to do is have the Reducer's reduce() called as
>>     soon as Mappers finish their job. I thought that if I set the
>>     Combiner to be the Reducer's class, then when the Combiner will
>>     be fired on the Reducer job (I'll need to figure out that I'm on
>>     the Reducer side, which I can by parsing the task Id), its
>>     reduce() method will be called and it can start merging the
>>     incoming inputs.
>>     I had thought that the Combiner works like the Reducer - it can
>>     combine several outputs of the same key down to 1, but that if
>>     there are several keys it will be invoked for all, thereby
>>     emitting <key,value> pairs -- just like the Reducer. Are you
>>     saying that won't work? If need be, I can parse the task Id and
>>     have N tasks grouped together into one 'key', but that will add a
>>     (minor) complexity which I'd be happy to avoid.
>>     I hope I gave enough details.
>>     Shai
>>     On Tue, Jul 27, 2010 at 8:47 PM, Gregory Lawrence
>>     <gregl@yahoo-inc.com <mailto:gregl@yahoo-inc.com>> wrote:
>>         Shai,
>>         It’s hard to determine what the best solution would be
>>         without knowing more about your problem. In general, combiner
>>         functions work well but they will be of little value if each
>>         mapper output contains a unique key. This is because combiner
>>         functions only “combine” multiple values associated with the
>>         same key (e.g., counting the number of occurrences of a
>>         word). Another common approach is to use two mapreduce jobs.
>>         The first job would use multiple reducers to do some
>>         processing. Here, you can hopefully shrink the size of the
>>         data by generating, for example, some sufficient statistics
>>         of what ultimately needs to be generated. A second mapreduce
>>         job would take the intermediate output and produce the
>>         desired result.
>>         As for the reducer processing map outputs as they are ready
>>         question, I believe that the copy stage may start before all
>>         mappers are finished. However, the sorting and application of
>>         your reduce function can not proceed until each mapper is
>>         finished.
>>         Could you describe your problem in more detail?
>>         Regards,
>>         Greg Lawrence
>>         On 7/27/10 4:06 AM, "Shai Erera" <serera@gmail.com
>>         <http://serera@gmail.com>> wrote:
>>             Hi
>>             I have a scenario for which I'd like to write a MR job in
>>             which Mappers do some work and eventually the output of
>>             all mappers need to be combined by a single Reducer. Each
>>             Mapper outputs <key,value> that is distinct from all
>>             other Mappers, meaning the Reducer.reduce() method always
>>             receives a single element in the values argument of a
>>             specific key. Really - the Mappers are independent of
>>             each others in their output.
>>             What would really be great for me is if I could have the
>>             Reducer start processing the map outputs as they are
>>             ready, and not after all Mappers finish. For example, I'm
>>             processing a very large data set and the MR framework
>>             spawns hundreds of Mappers for the task. The output of
>>             all Mappers though is required to be processed by 1
>>             Reducer. It so happens to be that the Reducer job is very
>>             heavy, compared to the Mappers, and while all Mappers
>>             finish in about 7 minutes (total wall clock time), the
>>             Reducer takes ~30 minutes.
>>             In my cluster I can run 96 Mappers in parallel, so I'm
>>             pretty sure that if I could streamline the outputs of the
>>             Mappers to the Reducer, I could gain some cycles back - I
>>             can easily limit the number of Mappers to say 95 and have
>>             the Reducer constantly doing some job.
>>             I've read about chaining mappers, but to the best of my
>>             understanding the second line of Mappers will only start
>>             after the first ones finished. Am I correct?
>>             Someone also hinted to me that I could write a Combiner
>>             that Hadoop might invoke on the Reducer's side when
>>             Mappers finish, if say the data of the Mappers is very
>>             large and cannot be kept in RAM. I haven't tried it yet,
>>             so if anyone can confirm this will indeed work, I'm
>>             willing to give it a try. The output of the Mappers is
>>             very large, and therefore they already write it directly
>>             to disk. So I'd like to avoid doing this serialization
>>             twice (once when the Mapper works, and the second time
>>             when Hadoop will *flush* the Reducer's buffer - or
>>             whatever the right terminology is).
>>             I apologize if this has been raised before - if it has,
>>             could you please point me at the relevant discussion/issue?
>>             Shai

View raw message