hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shai Erera <ser...@gmail.com>
Subject Re: Pipelining Mappers and Reducers
Date Thu, 29 Jul 2010 09:56:51 GMT
Specifically at the moment, a Mapper output is an index (Lucene), and the
Reducer's job is to take all indexes and merge them down to 1. Searching on
an index w/ hundreds of segments is inefficient. Basically, if you think of
the Reducer as holding the 'final' index (to be passed on in the larger
pipeline I have), then as soon as a Mapper finishes, I'd like to merge its
index w/ the Reducer's. Not all merges will actually be expensive - only
every several such indexes will they be truly merged together.

I cannot use simple merge techniques - must use Lucene's API to merge the
indexes. And the merge is not a simple 'append to current index' either. So
if I could get a hold of some object (code) which gets executed when Mappers
output is ready for the Reducer, then I can have my merge code executed
there. From what I understand so far, it can be the Combiner, which is what
I'm investigating now.

On the multi-level reduce -- if your cluster is big, and Mappers work on
their own hardware (even if some share the same HW), then multi-level reduce
can help. True - you might eventually process more bytes than you would if
you had one Reducer, but (1) that's not always true, and (2) wall-clock time
may be shortened in such a case.

Shai

On Thu, Jul 29, 2010 at 12:31 PM, Ferdy Galema <ferdy.galema@kalooga.com>wrote:

>  Could you elaborate on how you merge your data? If you have independent
> map tasks with single key-value outputs that can be written as soon as
> possible, I'm still curious why need to reduce at all. Surely there must be
> a way to merge your data 'on read'.
>
> About multi-level reducing; if you merge your data in multiple steps, there
> would only be a gain if the reducers somehow reduce (no pun intended) the
> amount of data in the pipeline. For example running 400 maps and 10 reduces
> followed by another job with a single reducer will not benefit if the single
> reducer has to process the same amount of data that the previous reducers
> have been outputting. Therefore it completely depends on what your reducer
> actually does.
>
> Ferdy.
>
>
> Shai Erera wrote:
>
> Yes, I'm aware of the fact that I can run w/ 0 reducers. However, I do need
> my output to be merged, and unfortunately the merge is not really that
> simple that I can use 'getmerge'. I think I'll give the Combiner a chance
> now - hopefully its reduce() method will get called as Mappers finish their
> work.
>
> BTW, if I'd like to have multi-level Reducers, such that each level works
> on less data, do I need to do that using multiple Jobs? I haven't seen any
> API to do so, unless I chain mappers/reducers and includes #levels in the
> chain. To clarify, if I need to take 400 outputs (or much more) and merge
> them down to 1, currently I need to do that sequentially. If I could do them
> in several rounds, by chunks of say 10/20, and each round will spawn N
> Mappers/Reducers to do the job, while N will get smaller and smaller,
> perhaps I'd gain something?
>
> Has anyone experienced with something like that? Am I expected to see some
> improvements?
>
> Thanks,
> Shai
>
> On Thu, Jul 29, 2010 at 11:20 AM, Ferdy Galema <ferdy.galema@kalooga.com>wrote:
>
>>  Just a quick pointer here:
>>
>> You are aware of the fact that you can configure the number of reduce
>> tasks to be zero? If I read well, you mention the order of your map outputs
>> in de merge does not really matter, as well as having a single value for
>> every key. Perhaps you could eliminate your reduce function altogether?
>>
>> Running a job with zero reduces means that output of the mappers is
>> directly written on the output filesystem (DFS most of the time). Ofcourse
>> this will result in your output being split into multiple parts, namely the
>> number of map tasks. I'm not sure if this suffices, it depends on how your
>> output data will be used. Still, if you absolutely need your data to be
>> merged, you could use "hadoop fs -getmerge ..." to pull a merged copy of the
>> DFS.
>>
>> Btw I share your opinion on keeping your Map/Reduce functions
>> singlethreaded (thus simple) when possible. The Hadoop framework will be
>> able to run your application concurrently by using multiple tasks per node.
>>
>> Ferdy.
>>
>>
>> Shai Erera wrote:
>>
>> Without going too much into the detail (so that we keep the discussion
>> simple) -- I'm processing large volumes of text documents. Currently I'm
>> training (myself :)) on a 13GB collection, but after I figure out the recipe
>> for writing the proper mix of Mappers/Reducers/Jobs, I will move to a TB
>> collection and then larger. Processing of a document outputs a processed
>> representation of a document (think of it as a large sequence of bytes).
>> Each Mapper processes a set of documents and outputs a processed
>> representation which includes all of them.
>>
>> The output of all Mappers (all processed docs) need to be merged into one
>> large document (sequence of bytes). I can live w/ a *few* large sequence of
>> bytes, but since the data will only grow, merging those 13GB (1.1M docs)
>> will eventually be done into one file. Therefore I'd like to focus on
>> merging them down to 1 file.
>>
>> There is no ordering in the merge - as soon as a Mapper finishes, its
>> output can be appended to the merged file. Hence the pipelining I'm looking
>> for. The 'merge' process I've described is simple, however I think it is
>> sufficient in order to understand what I'm trying to achieve.
>>
>> Currently, if I use Hadoop's default settings, the 13GB data is split into
>> 397 mappers (each receives 33MB to process). I've played w/ the split size
>> and increased it to 64MB and higher (up to 512MB). The merge part of the job
>> (Reducer) is done on 397 parts (or the # Mappers). So I'm trying to balance
>> the split size so that on one hand the Mappers can still finish fast, but on
>> the other hand the Reducer won't take so long to run.
>>
>> Problem is, the more data I give a Mapper to handle, the more work it has
>> to do and the more resources it consumes. I wrote a multi-threaded Mapper
>> which does the processing by several concurrent threads, when the split size
>> is very large. However that has a downside -- I basically assume the threads
>> can get all the CPU power they need, which may not be true. In my cluster I
>> have 96 cores and 96 Mappers can run concurrently. If I split the data to
>> 512MB (25 Mappers), then it means each Mapper can get ~4 cores to use. BUT,
>> in production, there may be other jobs running on the cluster, so beside my
>> 25 Mappers there will be additional 71 that may be run, therefore each of my
>> Mappers won't get the CPU power it needs.
>>
>> Therefore, I'd like to avoid multi-threaded Mappers at all, and keep it
>> simple - figure out a magic number for the split size so that each Mapper
>> can do its job in a single thread, and if the cluster is busy, then less of
>> my Mappers will run.
>>
>> But all that is sort of my problem (figure out the magic number). What I'm
>> trying to do is have the Reducer's reduce() called as soon as Mappers finish
>> their job. I thought that if I set the Combiner to be the Reducer's class,
>> then when the Combiner will be fired on the Reducer job (I'll need to figure
>> out that I'm on the Reducer side, which I can by parsing the task Id), its
>> reduce() method will be called and it can start merging the incoming inputs.
>>
>> I had thought that the Combiner works like the Reducer - it can combine
>> several outputs of the same key down to 1, but that if there are several
>> keys it will be invoked for all, thereby emitting <key,value> pairs -- just
>> like the Reducer. Are you saying that won't work? If need be, I can parse
>> the task Id and have N tasks grouped together into one 'key', but that will
>> add a (minor) complexity which I'd be happy to avoid.
>>
>> I hope I gave enough details.
>>
>> Shai
>>
>> On Tue, Jul 27, 2010 at 8:47 PM, Gregory Lawrence <gregl@yahoo-inc.com>wrote:
>>
>>> Shai,
>>>
>>> It’s hard to determine what the best solution would be without knowing
>>> more about your problem. In general, combiner functions work well but they
>>> will be of little value if each mapper output contains a unique key. This is
>>> because combiner functions only “combine” multiple values associated with
>>> the same key (e.g., counting the number of occurrences of a word). Another
>>> common approach is to use two mapreduce jobs. The first job would use
>>> multiple reducers to do some processing. Here, you can hopefully shrink the
>>> size of the data by generating, for example, some sufficient statistics of
>>> what ultimately needs to be generated. A second mapreduce job would take the
>>> intermediate output and produce the desired result.
>>>
>>> As for the reducer processing map outputs as they are ready question, I
>>> believe that the copy stage may start before all mappers are finished.
>>> However, the sorting and application of your reduce function can not proceed
>>> until each mapper is finished.
>>>
>>> Could you describe your problem in more detail?
>>>
>>> Regards,
>>> Greg Lawrence
>>>
>>>
>>> On 7/27/10 4:06 AM, "Shai Erera" <serera@gmail.com> wrote:
>>>
>>>    Hi
>>>
>>> I have a scenario for which I'd like to write a MR job in which Mappers
>>> do some work and eventually the output of all mappers need to be combined by
>>> a single Reducer. Each Mapper outputs <key,value> that is distinct from
all
>>> other Mappers, meaning the Reducer.reduce() method always receives a single
>>> element in the values argument of a specific key. Really - the Mappers are
>>> independent of each others in their output.
>>>
>>> What would really be great for me is if I could have the Reducer start
>>> processing the map outputs as they are ready, and not after all Mappers
>>> finish. For example, I'm processing a very large data set and the MR
>>> framework spawns hundreds of Mappers for the task. The output of all Mappers
>>> though is required to be processed by 1 Reducer. It so happens to be that
>>> the Reducer job is very heavy, compared to the Mappers, and while all
>>> Mappers finish in about 7 minutes (total wall clock time), the Reducer takes
>>> ~30 minutes.
>>>
>>> In my cluster I can run 96 Mappers in parallel, so I'm pretty sure that
>>> if I could streamline the outputs of the Mappers to the Reducer, I could
>>> gain some cycles back - I can easily limit the number of Mappers to say 95
>>> and have the Reducer constantly doing some job.
>>>
>>> I've read about chaining mappers, but to the best of my understanding the
>>> second line of Mappers will only start after the first ones finished. Am I
>>> correct?
>>>
>>> Someone also hinted to me that I could write a Combiner that Hadoop might
>>> invoke on the Reducer's side when Mappers finish, if say the data of the
>>> Mappers is very large and cannot be kept in RAM. I haven't tried it yet, so
>>> if anyone can confirm this will indeed work, I'm willing to give it a try.
>>> The output of the Mappers is very large, and therefore they already write it
>>> directly to disk. So I'd like to avoid doing this serialization twice (once
>>> when the Mapper works, and the second time when Hadoop will *flush* the
>>> Reducer's buffer - or whatever the right terminology is).
>>>
>>> I apologize if this has been raised before - if it has, could you please
>>> point me at the relevant discussion/issue?
>>>
>>> Shai
>>>
>>>
>>
>

Mime
View raw message