hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shai Erera <ser...@gmail.com>
Subject Re: Pipelining Mappers and Reducers
Date Mon, 09 Aug 2010 05:43:57 GMT

I've done some work and thought I'd report back the results (that are
not too encouraging).

Approach 1:
* Mappers output a side effect Lucene Directory (written on-disk) and
a <Long, Text> pair where the value is the location of the index on
disk and the key is unimportant for now.
* Reducer merges the on-disk indexes into a larger on-disk index and
optimize() it. Its output is the same as the Mapper.

1) Low memory footprint on both Mapper and Reducer side.
2) Hadoop related output is very small - just the location of the
index that needs to be copied around and sort/merged.
3) Utilizes Lucene powerful indexing capabilities by assigning each
Mapper a large portion of text to index.

1) no pipelining can be done, because the output of all Mappers is
very small (Hadoop-wise) and thus fits in memory and never flushed.
I've even tried reducing the buffer on the reducer side (played a/ all
sorts of parameters), but this never triggered the Combiner I've set.
2) if the cluster has complex structure, where not all nodes see the
same HD (not in my case), then this approach may fail, because Hadoop
is not aware of the side-effect files and thus cannot pass them to the

Approach 2:
* Mappers output a DirectoryWritable, which is a Writable that
(de)serializes a Lucene Directory.
* Reducer merges the incoming DWs into a larger index on-disk (output
is same as above)
* Combiner merges the incoming DWs into a larger DW (not on-disk,
though I've tried both)

1) Working w/ a DW is better Hadoop-wise -- the Directories can be
passed around between machines and overcome the 2nd "Con" in Approach
2) Since the output of the Mappers is large (40 MB in my case), the
Reducer's buffer gets flushed more often, and the Combiner actually
gets to do some work.

Because all is done in-memory, I've constantly hit OOM errors on the
Combiner side. I've tried limiting the size of its created directory
(by letting it merge as much N MB before it outputs anything) down to
128MB, but still hit those exceptions. I've even set the child's JVM
options to -Xmx2048m and 4096m, to no avail. The machine has 64GB RAM
(16 cores), so a 4GB RAM should be ok, but still hit the OOM in the
reduce/shuffle/sort phases. Only when I set it to 64MB it worked, but
that's useless because it means each DW is written as is (as they
already come in 40MB size from the Mapper), and not only that I don't
gain anything, I actually lose some (because of all the
(de)serialization done by the Writable).

It would really be great if I can have the Combiner invoked in
Approach #1, but I wasn't able to set its buffer low enough. I don't
mind doing all the merges on-disk (that's how it's done in the final
phase anyway), but I cannot seem to get the Combiner working.

So unless there's an option to tell the Combiner "kick in after N
Mappers finished" or "kick in after VERY SMALL #bytes arrived from
Mappers (in the order of 10/100s bytes)", I'm not sure indexing w/
Lucene and using a Combiner is doable.

Hope you'll find that interesting and that someone knows if/how I can
achieve what I want :-).

Thanks for your help and comments thus far,

On Thursday, July 29, 2010, Ferdy Galema <ferdy.galema@kalooga.com> wrote:
> Very well. Could you keep us informed on how your instant merging plans
> work out? We're actually running a similar indexing process.
> It's very interesting to be able to start merging Lucene indexes once
> the first mappers have finished, instead of waiting until ALL mappers
> have finished.
> Shai Erera wrote:
>   Well, the current scale does not warrant sharding up
> the index. 13GB of data, ~8-10GB index is something a single machine
> (even not a strong one) can handle pretty well. And as the data will
> grow, so will the number of Mappers, and the number of sub-indexes. So
> at some point I will need to merge indexes. Sharding is one of my
> strategies, but for the TB scale, not what I'm training on now :).
> Thanks,
> Shai
>   On Thu, Jul 29, 2010 at 1:23 PM, Ferdy
> Galema <ferdy.galema@kalooga.com>
> wrote:
>     You right about the fact that
> merging cannot be done by simply
> appending. Have you thought about the possibility to actually take
> advantage of fact that your final index will be split into several
> segments? Especially if you plan to increase the scale of the input
> data, you may eventually want some sort of scaling on the search side
> as well. Sharding your index on several machines (there are frameworks
> that will help you doing this; e.g. Katta - distributed lucene). You
> can even do this on a single machine with a ParallelMultiSearcher,
> preferrably if the machine has several disks to spread I/O.
> Ofcourse in this case you still have to make sure that the number of
> segments won't be too high.
> Shai Erera wrote:
>       Specifically at the moment, a Mapper output is an
> index (Lucene), and the Reducer's job is to take all indexes and merge
> them down to 1. Searching on an index w/ hundreds of segments is
> inefficient. Basically, if you think of the Reducer as holding the
> 'final' index (to be passed on in the larger pipeline I have), then as
> soon as a Mapper finishes, I'd like to merge its index w/ the
> Reducer's. Not all merges will actually be expensive - only every
> several such indexes will they be truly merged together.
> I cannot use simple merge techniques - must use Lucene's API to merge
> the indexes. And the merge is not a simple 'append to current index'
> either. So if I could get a hold of some object (code) which gets
> executed when Mappers output is ready for the Reducer, then I can have
> my merge code executed there. From what I understand so far, it can be
> the Combiner, which is what I'm investigating now.
> On the multi-level reduce -- if your cluster is big, and Mappers work
> on their own hardware (even if some share the same HW), then
> multi-level reduce can help. True - you might eventually process more
> bytes than you would if you had one Reducer, but (1) that's not always
> true, and (2) wall-clock time may be shortened in such a case.
> Shai
>       On Thu, Jul 29, 2010 at 12:31 PM, Ferdy
> Galema <ferdy.galema@kalooga.com>
> wrote:
>         Could you elaborate on
> how
> you merge your data? If you have independent
> map tasks with single key-value outputs that can be written as soon as
> possible, I'm still curious why need to reduce at all. Surely there
> must be a way to merge your data 'on read'.
> About multi-level reducing; if you merge your data in multiple steps,
> there would only be a gain if the reducers somehow reduce (no pun
> intended) the amount of data in the pipeline. For example running 400
> maps and 10 reduces followed by another job with a single reducer will
> not benefit if the single reducer has to process the same amount of
> data that the previous reducers have been outputting. Therefore it
> completely depends on what your reducer actually does.
> Ferdy.
> Shai Erera wrote:
>           Yes, I'm aware of the fact that I can run w/ 0
> reducers. However, I do need my output to be merged, and unfortunately
> the merge is not really that simple that I can use 'getmerge'. I think
> I'll give the Combiner a chance now - hopefully its reduce() method
> will get called as Mappers finish their work.
> BTW, if I'd like to have multi-level Reducers, such that each level
> works on less data, do I need to do that using multiple Jobs? I haven't
> seen any API to do so, unless I chain mappers/reducers and includes
> #levels in the chain. To clarify, if I need to take 400 outputs (or
> much more) and merge them down to 1, currently I need to do that
> seque

View raw message