hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ferdy Galema <ferdy.gal...@kalooga.com>
Subject Re: Pipelining Mappers and Reducers
Date Thu, 29 Jul 2010 12:04:22 GMT
Very well. Could you keep us informed on how your instant merging plans 
work out? We're actually running a similar indexing process.

It's very interesting to be able to start merging Lucene indexes once 
the first mappers have finished, instead of waiting until ALL mappers 
have finished.

Shai Erera wrote:
> Well, the current scale does not warrant sharding up the index. 13GB 
> of data, ~8-10GB index is something a single machine (even not a 
> strong one) can handle pretty well. And as the data will grow, so will 
> the number of Mappers, and the number of sub-indexes. So at some point 
> I will need to merge indexes. Sharding is one of my strategies, but 
> for the TB scale, not what I'm training on now :).
> Thanks,
> Shai
> On Thu, Jul 29, 2010 at 1:23 PM, Ferdy Galema 
> <ferdy.galema@kalooga.com <mailto:ferdy.galema@kalooga.com>> wrote:
>     You right about the fact that merging cannot be done by simply
>     appending. Have you thought about the possibility to actually take
>     advantage of fact that your final index will be split into several
>     segments? Especially if you plan to increase the scale of the
>     input data, you may eventually want some sort of scaling on the
>     search side as well. Sharding your index on several machines
>     (there are frameworks that will help you doing this; e.g. Katta -
>     distributed lucene). You can even do this on a single machine with
>     a ParallelMultiSearcher, preferrably if the machine has several
>     disks to spread I/O.
>     Ofcourse in this case you still have to make sure that the number
>     of segments won't be too high.
>     Shai Erera wrote:
>>     Specifically at the moment, a Mapper output is an index (Lucene),
>>     and the Reducer's job is to take all indexes and merge them down
>>     to 1. Searching on an index w/ hundreds of segments is
>>     inefficient. Basically, if you think of the Reducer as holding
>>     the 'final' index (to be passed on in the larger pipeline I
>>     have), then as soon as a Mapper finishes, I'd like to merge its
>>     index w/ the Reducer's. Not all merges will actually be expensive
>>     - only every several such indexes will they be truly merged together.
>>     I cannot use simple merge techniques - must use Lucene's API to
>>     merge the indexes. And the merge is not a simple 'append to
>>     current index' either. So if I could get a hold of some object
>>     (code) which gets executed when Mappers output is ready for the
>>     Reducer, then I can have my merge code executed there. From what
>>     I understand so far, it can be the Combiner, which is what I'm
>>     investigating now.
>>     On the multi-level reduce -- if your cluster is big, and Mappers
>>     work on their own hardware (even if some share the same HW), then
>>     multi-level reduce can help. True - you might eventually process
>>     more bytes than you would if you had one Reducer, but (1) that's
>>     not always true, and (2) wall-clock time may be shortened in such
>>     a case.
>>     Shai
>>     On Thu, Jul 29, 2010 at 12:31 PM, Ferdy Galema
>>     <ferdy.galema@kalooga.com <mailto:ferdy.galema@kalooga.com>> wrote:
>>         Could you elaborate on how you merge your data? If you have
>>         independent map tasks with single key-value outputs that can
>>         be written as soon as possible, I'm still curious why need to
>>         reduce at all. Surely there must be a way to merge your data
>>         'on read'.
>>         About multi-level reducing; if you merge your data in
>>         multiple steps, there would only be a gain if the reducers
>>         somehow reduce (no pun intended) the amount of data in the
>>         pipeline. For example running 400 maps and 10 reduces
>>         followed by another job with a single reducer will not
>>         benefit if the single reducer has to process the same amount
>>         of data that the previous reducers have been outputting.
>>         Therefore it completely depends on what your reducer actually
>>         does.
>>         Ferdy.
>>         Shai Erera wrote:
>>>         Yes, I'm aware of the fact that I can run w/ 0 reducers.
>>>         However, I do need my output to be merged, and unfortunately
>>>         the merge is not really that simple that I can use
>>>         'getmerge'. I think I'll give the Combiner a chance now -
>>>         hopefully its reduce() method will get called as Mappers
>>>         finish their work.
>>>         BTW, if I'd like to have multi-level Reducers, such that
>>>         each level works on less data, do I need to do that using
>>>         multiple Jobs? I haven't seen any API to do so, unless I
>>>         chain mappers/reducers and includes #levels in the chain. To
>>>         clarify, if I need to take 400 outputs (or much more) and
>>>         merge them down to 1, currently I need to do that
>>>         sequentially. If I could do them in several rounds, by
>>>         chunks of say 10/20, and each round will spawn N
>>>         Mappers/Reducers to do the job, while N will get smaller and
>>>         smaller, perhaps I'd gain something?
>>>         Has anyone experienced with something like that? Am I
>>>         expected to see some improvements?
>>>         Thanks,
>>>         Shai
>>>         On Thu, Jul 29, 2010 at 11:20 AM, Ferdy Galema
>>>         <ferdy.galema@kalooga.com <mailto:ferdy.galema@kalooga.com>>
>>>         wrote:
>>>             Just a quick pointer here:
>>>             You are aware of the fact that you can configure the
>>>             number of reduce tasks to be zero? If I read well, you
>>>             mention the order of your map outputs in de merge does
>>>             not really matter, as well as having a single value for
>>>             every key. Perhaps you could eliminate your reduce
>>>             function altogether?
>>>             Running a job with zero reduces means that output of the
>>>             mappers is directly written on the output filesystem
>>>             (DFS most of the time). Ofcourse this will result in
>>>             your output being split into multiple parts, namely the
>>>             number of map tasks. I'm not sure if this suffices, it
>>>             depends on how your output data will be used. Still, if
>>>             you absolutely need your data to be merged, you could
>>>             use "hadoop fs -getmerge ..." to pull a merged copy of
>>>             the DFS.
>>>             Btw I share your opinion on keeping your Map/Reduce
>>>             functions singlethreaded (thus simple) when possible.
>>>             The Hadoop framework will be able to run your
>>>             application concurrently by using multiple tasks per node.
>>>             Ferdy.
>>>             Shai Erera wrote:
>>>>             Without going too much into the detail (so that we keep
>>>>             the discussion simple) -- I'm processing large volumes
>>>>             of text documents. Currently I'm training (myself :))
>>>>             on a 13GB collection, but after I figure out the recipe
>>>>             for writing the proper mix of Mappers/Reducers/Jobs, I
>>>>             will move to a TB collection and then larger.
>>>>             Processing of a document outputs a processed
>>>>             representation of a document (think of it as a large
>>>>             sequence of bytes). Each Mapper processes a set of
>>>>             documents and outputs a processed representation which
>>>>             includes all of them.
>>>>             The output of all Mappers (all processed docs) need to
>>>>             be merged into one large document (sequence of bytes).
>>>>             I can live w/ a *few* large sequence of bytes, but
>>>>             since the data will only grow, merging those 13GB (1.1M
>>>>             docs) will eventually be done into one file. Therefore
>>>>             I'd like to focus on merging them down to 1 file.
>>>>             There is no ordering in the merge - as soon as a Mapper
>>>>             finishes, its output can be appended to the merged
>>>>             file. Hence the pipelining I'm looking for. The 'merge'
>>>>             process I've described is simple, however I think it is
>>>>             sufficient in order to understand what I'm trying to
>>>>             achieve.
>>>>             Currently, if I use Hadoop's default settings, the 13GB
>>>>             data is split into 397 mappers (each receives 33MB to
>>>>             process). I've played w/ the split size and increased
>>>>             it to 64MB and higher (up to 512MB). The merge part of
>>>>             the job (Reducer) is done on 397 parts (or the #
>>>>             Mappers). So I'm trying to balance the split size so
>>>>             that on one hand the Mappers can still finish fast, but
>>>>             on the other hand the Reducer won't take so long to run.
>>>>             Problem is, the more data I give a Mapper to handle,
>>>>             the more work it has to do and the more resources it
>>>>             consumes. I wrote a multi-threaded Mapper which does
>>>>             the processing by several concurrent threads, when the
>>>>             split size is very large. However that has a downside
>>>>             -- I basically assume the threads can get all the CPU
>>>>             power they need, which may not be true. In my cluster I
>>>>             have 96 cores and 96 Mappers can run concurrently. If I
>>>>             split the data to 512MB (25 Mappers), then it means
>>>>             each Mapper can get ~4 cores to use. BUT, in
>>>>             production, there may be other jobs running on the
>>>>             cluster, so beside my 25 Mappers there will be
>>>>             additional 71 that may be run, therefore each of my
>>>>             Mappers won't get the CPU power it needs.
>>>>             Therefore, I'd like to avoid multi-threaded Mappers at
>>>>             all, and keep it simple - figure out a magic number for
>>>>             the split size so that each Mapper can do its job in a
>>>>             single thread, and if the cluster is busy, then less of
>>>>             my Mappers will run.
>>>>             But all that is sort of my problem (figure out the
>>>>             magic number). What I'm trying to do is have the
>>>>             Reducer's reduce() called as soon as Mappers finish
>>>>             their job. I thought that if I set the Combiner to be
>>>>             the Reducer's class, then when the Combiner will be
>>>>             fired on the Reducer job (I'll need to figure out that
>>>>             I'm on the Reducer side, which I can by parsing the
>>>>             task Id), its reduce() method will be called and it can
>>>>             start merging the incoming inputs.
>>>>             I had thought that the Combiner works like the Reducer
>>>>             - it can combine several outputs of the same key down
>>>>             to 1, but that if there are several keys it will be
>>>>             invoked for all, thereby emitting <key,value> pairs --
>>>>             just like the Reducer. Are you saying that won't work?
>>>>             If need be, I can parse the task Id and have N tasks
>>>>             grouped together into one 'key', but that will add a
>>>>             (minor) complexity which I'd be happy to avoid.
>>>>             I hope I gave enough details.
>>>>             Shai
>>>>             On Tue, Jul 27, 2010 at 8:47 PM, Gregory Lawrence
>>>>             <gregl@yahoo-inc.com <mailto:gregl@yahoo-inc.com>>
>>>>                 Shai,
>>>>                 It’s hard to determine what the best solution would
>>>>                 be without knowing more about your problem. In
>>>>                 general, combiner functions work well but they will
>>>>                 be of little value if each mapper output contains a
>>>>                 unique key. This is because combiner functions only
>>>>                 “combine” multiple values associated with the same
>>>>                 key (e.g., counting the number of occurrences of a
>>>>                 word). Another common approach is to use two
>>>>                 mapreduce jobs. The first job would use multiple
>>>>                 reducers to do some processing. Here, you can
>>>>                 hopefully shrink the size of the data by
>>>>                 generating, for example, some sufficient statistics
>>>>                 of what ultimately needs to be generated. A second
>>>>                 mapreduce job would take the intermediate output
>>>>                 and produce the desired result.
>>>>                 As for the reducer processing map outputs as they
>>>>                 are ready question, I believe that the copy stage
>>>>                 may start before all mappers are finished. However,
>>>>                 the sorting and application of your reduce function
>>>>                 can not proceed until each mapper is finished.
>>>>                 Could you describe your problem in more detail?
>>>>                 Regards,
>>>>                 Greg Lawrence
>>>>                 On 7/27/10 4:06 AM, "Shai Erera" <serera@gmail.com
>>>>                 <http://serera@gmail.com>> wrote:
>>>>                     Hi
>>>>                     I have a scenario for which I'd like to write a
>>>>                     MR job in which Mappers do some work and
>>>>                     eventually the output of all mappers need to be
>>>>                     combined by a single Reducer. Each Mapper
>>>>                     outputs <key,value> that is distinct from all
>>>>                     other Mappers, meaning the Reducer.reduce()
>>>>                     method always receives a single element in the
>>>>                     values argument of a specific key. Really - the
>>>>                     Mappers are independent of each others in their
>>>>                     output.
>>>>                     What would really be great for me is if I could
>>>>                     have the Reducer start processing the map
>>>>                     outputs as they are ready, and not after all
>>>>                     Mappers finish. For example, I'm processing a
>>>>                     very large data set and the MR framework spawns
>>>>                     hundreds of Mappers for the task. The output of
>>>>                     all Mappers though is required to be processed
>>>>                     by 1 Reducer. It so happens to be that the
>>>>                     Reducer job is very heavy, compared to the
>>>>                     Mappers, and while all Mappers finish in about
>>>>                     7 minutes (total wall clock time), the Reducer
>>>>                     takes ~30 minutes.
>>>>                     In my cluster I can run 96 Mappers in parallel,
>>>>                     so I'm pretty sure that if I could streamline
>>>>                     the outputs of the Mappers to the Reducer, I
>>>>                     could gain some cycles back - I can easily
>>>>                     limit the number of Mappers to say 95 and have
>>>>                     the Reducer constantly doing some job.
>>>>                     I've read about chaining mappers, but to the
>>>>                     best of my understanding the second line of
>>>>                     Mappers will only start after the first ones
>>>>                     finished. Am I correct?
>>>>                     Someone also hinted to me that I could write a
>>>>                     Combiner that Hadoop might invoke on the
>>>>                     Reducer's side when Mappers finish, if say the
>>>>                     data of the Mappers is very large and cannot be
>>>>                     kept in RAM. I haven't tried it yet, so if
>>>>                     anyone can confirm this will indeed work, I'm
>>>>                     willing to give it a try. The output of the
>>>>                     Mappers is very large, and therefore they
>>>>                     already write it directly to disk. So I'd like
>>>>                     to avoid doing this serialization twice (once
>>>>                     when the Mapper works, and the second time when
>>>>                     Hadoop will *flush* the Reducer's buffer - or
>>>>                     whatever the right terminology is).
>>>>                     I apologize if this has been raised before - if
>>>>                     it has, could you please point me at the
>>>>                     relevant discussion/issue?
>>>>                     Shai

View raw message