hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shai Erera <ser...@gmail.com>
Subject Re: Pipelining Mappers and Reducers
Date Thu, 29 Jul 2010 11:06:41 GMT
Well, the current scale does not warrant sharding up the index. 13GB of
data, ~8-10GB index is something a single machine (even not a strong one)
can handle pretty well. And as the data will grow, so will the number of
Mappers, and the number of sub-indexes. So at some point I will need to
merge indexes. Sharding is one of my strategies, but for the TB scale, not
what I'm training on now :).


On Thu, Jul 29, 2010 at 1:23 PM, Ferdy Galema <ferdy.galema@kalooga.com>wrote:

>  You right about the fact that merging cannot be done by simply appending.
> Have you thought about the possibility to actually take advantage of fact
> that your final index will be split into several segments? Especially if you
> plan to increase the scale of the input data, you may eventually want some
> sort of scaling on the search side as well. Sharding your index on several
> machines (there are frameworks that will help you doing this; e.g. Katta -
> distributed lucene). You can even do this on a single machine with a
> ParallelMultiSearcher, preferrably if the machine has several disks to
> spread I/O.
> Ofcourse in this case you still have to make sure that the number of
> segments won't be too high.
> Shai Erera wrote:
> Specifically at the moment, a Mapper output is an index (Lucene), and the
> Reducer's job is to take all indexes and merge them down to 1. Searching on
> an index w/ hundreds of segments is inefficient. Basically, if you think of
> the Reducer as holding the 'final' index (to be passed on in the larger
> pipeline I have), then as soon as a Mapper finishes, I'd like to merge its
> index w/ the Reducer's. Not all merges will actually be expensive - only
> every several such indexes will they be truly merged together.
> I cannot use simple merge techniques - must use Lucene's API to merge the
> indexes. And the merge is not a simple 'append to current index' either. So
> if I could get a hold of some object (code) which gets executed when Mappers
> output is ready for the Reducer, then I can have my merge code executed
> there. From what I understand so far, it can be the Combiner, which is what
> I'm investigating now.
> On the multi-level reduce -- if your cluster is big, and Mappers work on
> their own hardware (even if some share the same HW), then multi-level reduce
> can help. True - you might eventually process more bytes than you would if
> you had one Reducer, but (1) that's not always true, and (2) wall-clock time
> may be shortened in such a case.
> Shai
> On Thu, Jul 29, 2010 at 12:31 PM, Ferdy Galema <ferdy.galema@kalooga.com>wrote:
>> Could you elaborate on how you merge your data? If you have independent
>> map tasks with single key-value outputs that can be written as soon as
>> possible, I'm still curious why need to reduce at all. Surely there must be
>> a way to merge your data 'on read'.
>> About multi-level reducing; if you merge your data in multiple steps,
>> there would only be a gain if the reducers somehow reduce (no pun intended)
>> the amount of data in the pipeline. For example running 400 maps and 10
>> reduces followed by another job with a single reducer will not benefit if
>> the single reducer has to process the same amount of data that the previous
>> reducers have been outputting. Therefore it completely depends on what your
>> reducer actually does.
>> Ferdy.
>> Shai Erera wrote:
>> Yes, I'm aware of the fact that I can run w/ 0 reducers. However, I do
>> need my output to be merged, and unfortunately the merge is not really that
>> simple that I can use 'getmerge'. I think I'll give the Combiner a chance
>> now - hopefully its reduce() method will get called as Mappers finish their
>> work.
>> BTW, if I'd like to have multi-level Reducers, such that each level works
>> on less data, do I need to do that using multiple Jobs? I haven't seen any
>> API to do so, unless I chain mappers/reducers and includes #levels in the
>> chain. To clarify, if I need to take 400 outputs (or much more) and merge
>> them down to 1, currently I need to do that sequentially. If I could do them
>> in several rounds, by chunks of say 10/20, and each round will spawn N
>> Mappers/Reducers to do the job, while N will get smaller and smaller,
>> perhaps I'd gain something?
>> Has anyone experienced with something like that? Am I expected to see some
>> improvements?
>> Thanks,
>> Shai
>> On Thu, Jul 29, 2010 at 11:20 AM, Ferdy Galema <ferdy.galema@kalooga.com>wrote:
>>> Just a quick pointer here:
>>> You are aware of the fact that you can configure the number of reduce
>>> tasks to be zero? If I read well, you mention the order of your map outputs
>>> in de merge does not really matter, as well as having a single value for
>>> every key. Perhaps you could eliminate your reduce function altogether?
>>> Running a job with zero reduces means that output of the mappers is
>>> directly written on the output filesystem (DFS most of the time). Ofcourse
>>> this will result in your output being split into multiple parts, namely the
>>> number of map tasks. I'm not sure if this suffices, it depends on how your
>>> output data will be used. Still, if you absolutely need your data to be
>>> merged, you could use "hadoop fs -getmerge ..." to pull a merged copy of the
>>> DFS.
>>> Btw I share your opinion on keeping your Map/Reduce functions
>>> singlethreaded (thus simple) when possible. The Hadoop framework will be
>>> able to run your application concurrently by using multiple tasks per node.
>>> Ferdy.
>>> Shai Erera wrote:
>>> Without going too much into the detail (so that we keep the discussion
>>> simple) -- I'm processing large volumes of text documents. Currently I'm
>>> training (myself :)) on a 13GB collection, but after I figure out the recipe
>>> for writing the proper mix of Mappers/Reducers/Jobs, I will move to a TB
>>> collection and then larger. Processing of a document outputs a processed
>>> representation of a document (think of it as a large sequence of bytes).
>>> Each Mapper processes a set of documents and outputs a processed
>>> representation which includes all of them.
>>> The output of all Mappers (all processed docs) need to be merged into one
>>> large document (sequence of bytes). I can live w/ a *few* large sequence of
>>> bytes, but since the data will only grow, merging those 13GB (1.1M docs)
>>> will eventually be done into one file. Therefore I'd like to focus on
>>> merging them down to 1 file.
>>> There is no ordering in the merge - as soon as a Mapper finishes, its
>>> output can be appended to the merged file. Hence the pipelining I'm looking
>>> for. The 'merge' process I've described is simple, however I think it is
>>> sufficient in order to understand what I'm trying to achieve.
>>> Currently, if I use Hadoop's default settings, the 13GB data is split
>>> into 397 mappers (each receives 33MB to process). I've played w/ the split
>>> size and increased it to 64MB and higher (up to 512MB). The merge part of
>>> the job (Reducer) is done on 397 parts (or the # Mappers). So I'm trying to
>>> balance the split size so that on one hand the Mappers can still finish
>>> fast, but on the other hand the Reducer won't take so long to run.
>>> Problem is, the more data I give a Mapper to handle, the more work it has
>>> to do and the more resources it consumes. I wrote a multi-threaded Mapper
>>> which does the processing by several concurrent threads, when the split size
>>> is very large. However that has a downside -- I basically assume the threads
>>> can get all the CPU power they need, which may not be true. In my cluster I
>>> have 96 cores and 96 Mappers can run concurrently. If I split the data to
>>> 512MB (25 Mappers), then it means each Mapper can get ~4 cores to use. BUT,
>>> in production, there may be other jobs running on the cluster, so beside my
>>> 25 Mappers there will be additional 71 that may be run, therefore each of my
>>> Mappers won't get the CPU power it needs.
>>> Therefore, I'd like to avoid multi-threaded Mappers at all, and keep it
>>> simple - figure out a magic number for the split size so that each Mapper
>>> can do its job in a single thread, and if the cluster is busy, then less of
>>> my Mappers will run.
>>> But all that is sort of my problem (figure out the magic number). What
>>> I'm trying to do is have the Reducer's reduce() called as soon as Mappers
>>> finish their job. I thought that if I set the Combiner to be the Reducer's
>>> class, then when the Combiner will be fired on the Reducer job (I'll need to
>>> figure out that I'm on the Reducer side, which I can by parsing the task
>>> Id), its reduce() method will be called and it can start merging the
>>> incoming inputs.
>>> I had thought that the Combiner works like the Reducer - it can combine
>>> several outputs of the same key down to 1, but that if there are several
>>> keys it will be invoked for all, thereby emitting <key,value> pairs --
>>> like the Reducer. Are you saying that won't work? If need be, I can parse
>>> the task Id and have N tasks grouped together into one 'key', but that will
>>> add a (minor) complexity which I'd be happy to avoid.
>>> I hope I gave enough details.
>>> Shai
>>> On Tue, Jul 27, 2010 at 8:47 PM, Gregory Lawrence <gregl@yahoo-inc.com>wrote:
>>>> Shai,
>>>> It’s hard to determine what the best solution would be without knowing
>>>> more about your problem. In general, combiner functions work well but they
>>>> will be of little value if each mapper output contains a unique key. This
>>>> because combiner functions only “combine” multiple values associated
>>>> the same key (e.g., counting the number of occurrences of a word). Another
>>>> common approach is to use two mapreduce jobs. The first job would use
>>>> multiple reducers to do some processing. Here, you can hopefully shrink the
>>>> size of the data by generating, for example, some sufficient statistics of
>>>> what ultimately needs to be generated. A second mapreduce job would take
>>>> intermediate output and produce the desired result.
>>>> As for the reducer processing map outputs as they are ready question, I
>>>> believe that the copy stage may start before all mappers are finished.
>>>> However, the sorting and application of your reduce function can not proceed
>>>> until each mapper is finished.
>>>> Could you describe your problem in more detail?
>>>> Regards,
>>>> Greg Lawrence
>>>> On 7/27/10 4:06 AM, "Shai Erera" <serera@gmail.com> wrote:
>>>>    Hi
>>>> I have a scenario for which I'd like to write a MR job in which Mappers
>>>> do some work and eventually the output of all mappers need to be combined
>>>> a single Reducer. Each Mapper outputs <key,value> that is distinct
from all
>>>> other Mappers, meaning the Reducer.reduce() method always receives a single
>>>> element in the values argument of a specific key. Really - the Mappers are
>>>> independent of each others in their output.
>>>> What would really be great for me is if I could have the Reducer start
>>>> processing the map outputs as they are ready, and not after all Mappers
>>>> finish. For example, I'm processing a very large data set and the MR
>>>> framework spawns hundreds of Mappers for the task. The output of all Mappers
>>>> though is required to be processed by 1 Reducer. It so happens to be that
>>>> the Reducer job is very heavy, compared to the Mappers, and while all
>>>> Mappers finish in about 7 minutes (total wall clock time), the Reducer takes
>>>> ~30 minutes.
>>>> In my cluster I can run 96 Mappers in parallel, so I'm pretty sure that
>>>> if I could streamline the outputs of the Mappers to the Reducer, I could
>>>> gain some cycles back - I can easily limit the number of Mappers to say 95
>>>> and have the Reducer constantly doing some job.
>>>> I've read about chaining mappers, but to the best of my understanding
>>>> the second line of Mappers will only start after the first ones finished.
>>>> I correct?
>>>> Someone also hinted to me that I could write a Combiner that Hadoop
>>>> might invoke on the Reducer's side when Mappers finish, if say the data of
>>>> the Mappers is very large and cannot be kept in RAM. I haven't tried it yet,
>>>> so if anyone can confirm this will indeed work, I'm willing to give it a
>>>> try. The output of the Mappers is very large, and therefore they already
>>>> write it directly to disk. So I'd like to avoid doing this serialization
>>>> twice (once when the Mapper works, and the second time when Hadoop will
>>>> *flush* the Reducer's buffer - or whatever the right terminology is).
>>>> I apologize if this has been raised before - if it has, could you please
>>>> point me at the relevant discussion/issue?
>>>> Shai

View raw message