hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shai Erera <ser...@gmail.com>
Subject Re: Pipelining Mappers and Reducers
Date Wed, 28 Jul 2010 10:20:42 GMT
Without going too much into the detail (so that we keep the discussion
simple) -- I'm processing large volumes of text documents. Currently I'm
training (myself :)) on a 13GB collection, but after I figure out the recipe
for writing the proper mix of Mappers/Reducers/Jobs, I will move to a TB
collection and then larger. Processing of a document outputs a processed
representation of a document (think of it as a large sequence of bytes).
Each Mapper processes a set of documents and outputs a processed
representation which includes all of them.

The output of all Mappers (all processed docs) need to be merged into one
large document (sequence of bytes). I can live w/ a *few* large sequence of
bytes, but since the data will only grow, merging those 13GB (1.1M docs)
will eventually be done into one file. Therefore I'd like to focus on
merging them down to 1 file.

There is no ordering in the merge - as soon as a Mapper finishes, its output
can be appended to the merged file. Hence the pipelining I'm looking for.
The 'merge' process I've described is simple, however I think it is
sufficient in order to understand what I'm trying to achieve.

Currently, if I use Hadoop's default settings, the 13GB data is split into
397 mappers (each receives 33MB to process). I've played w/ the split size
and increased it to 64MB and higher (up to 512MB). The merge part of the job
(Reducer) is done on 397 parts (or the # Mappers). So I'm trying to balance
the split size so that on one hand the Mappers can still finish fast, but on
the other hand the Reducer won't take so long to run.

Problem is, the more data I give a Mapper to handle, the more work it has to
do and the more resources it consumes. I wrote a multi-threaded Mapper which
does the processing by several concurrent threads, when the split size is
very large. However that has a downside -- I basically assume the threads
can get all the CPU power they need, which may not be true. In my cluster I
have 96 cores and 96 Mappers can run concurrently. If I split the data to
512MB (25 Mappers), then it means each Mapper can get ~4 cores to use. BUT,
in production, there may be other jobs running on the cluster, so beside my
25 Mappers there will be additional 71 that may be run, therefore each of my
Mappers won't get the CPU power it needs.

Therefore, I'd like to avoid multi-threaded Mappers at all, and keep it
simple - figure out a magic number for the split size so that each Mapper
can do its job in a single thread, and if the cluster is busy, then less of
my Mappers will run.

But all that is sort of my problem (figure out the magic number). What I'm
trying to do is have the Reducer's reduce() called as soon as Mappers finish
their job. I thought that if I set the Combiner to be the Reducer's class,
then when the Combiner will be fired on the Reducer job (I'll need to figure
out that I'm on the Reducer side, which I can by parsing the task Id), its
reduce() method will be called and it can start merging the incoming inputs.

I had thought that the Combiner works like the Reducer - it can combine
several outputs of the same key down to 1, but that if there are several
keys it will be invoked for all, thereby emitting <key,value> pairs -- just
like the Reducer. Are you saying that won't work? If need be, I can parse
the task Id and have N tasks grouped together into one 'key', but that will
add a (minor) complexity which I'd be happy to avoid.

I hope I gave enough details.


On Tue, Jul 27, 2010 at 8:47 PM, Gregory Lawrence <gregl@yahoo-inc.com>wrote:

>  Shai,
> It’s hard to determine what the best solution would be without knowing more
> about your problem. In general, combiner functions work well but they will
> be of little value if each mapper output contains a unique key. This is
> because combiner functions only “combine” multiple values associated with
> the same key (e.g., counting the number of occurrences of a word). Another
> common approach is to use two mapreduce jobs. The first job would use
> multiple reducers to do some processing. Here, you can hopefully shrink the
> size of the data by generating, for example, some sufficient statistics of
> what ultimately needs to be generated. A second mapreduce job would take the
> intermediate output and produce the desired result.
> As for the reducer processing map outputs as they are ready question, I
> believe that the copy stage may start before all mappers are finished.
> However, the sorting and application of your reduce function can not proceed
> until each mapper is finished.
> Could you describe your problem in more detail?
> Regards,
> Greg Lawrence
> On 7/27/10 4:06 AM, "Shai Erera" <serera@gmail.com> wrote:
> Hi
> I have a scenario for which I'd like to write a MR job in which Mappers do
> some work and eventually the output of all mappers need to be combined by a
> single Reducer. Each Mapper outputs <key,value> that is distinct from all
> other Mappers, meaning the Reducer.reduce() method always receives a single
> element in the values argument of a specific key. Really - the Mappers are
> independent of each others in their output.
> What would really be great for me is if I could have the Reducer start
> processing the map outputs as they are ready, and not after all Mappers
> finish. For example, I'm processing a very large data set and the MR
> framework spawns hundreds of Mappers for the task. The output of all Mappers
> though is required to be processed by 1 Reducer. It so happens to be that
> the Reducer job is very heavy, compared to the Mappers, and while all
> Mappers finish in about 7 minutes (total wall clock time), the Reducer takes
> ~30 minutes.
> In my cluster I can run 96 Mappers in parallel, so I'm pretty sure that if
> I could streamline the outputs of the Mappers to the Reducer, I could gain
> some cycles back - I can easily limit the number of Mappers to say 95 and
> have the Reducer constantly doing some job.
> I've read about chaining mappers, but to the best of my understanding the
> second line of Mappers will only start after the first ones finished. Am I
> correct?
> Someone also hinted to me that I could write a Combiner that Hadoop might
> invoke on the Reducer's side when Mappers finish, if say the data of the
> Mappers is very large and cannot be kept in RAM. I haven't tried it yet, so
> if anyone can confirm this will indeed work, I'm willing to give it a try.
> The output of the Mappers is very large, and therefore they already write it
> directly to disk. So I'd like to avoid doing this serialization twice (once
> when the Mapper works, and the second time when Hadoop will *flush* the
> Reducer's buffer - or whatever the right terminology is).
> I apologize if this has been raised before - if it has, could you please
> point me at the relevant discussion/issue?
> Shai

View raw message