hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Shai Erera <ser...@gmail.com>
Subject Re: Best practices for jobs with large Map output
Date Mon, 18 Apr 2011 10:42:19 GMT
Thanks for your response Chris !

An index, in this case a Lucene index, is a logical name for a collection of
files. Each Mapper instance generates such index from the input it receives,
and then the Reducer merges all of those indexes together, to output a
single Lucene index.

I ended up doing the following -- my HDFS Mapper creates an index in-memory
and then serializes the in-memory index into a single file that is stored on
HDFS (each Mapper serializes to a different file). I use FileSystem API to
achieve that, so hopefully it's the way to do it. The Mapper outputs a Text
value which is the location on HDFS. The Reducer then interprets that value
and reads the file using FileSystem API, and deserialize it into an
in-memory Lucene index.

I still need to write an OutputCommitter which will get rid of those
serialized-index-files (on HDFS) once the job completes successfully (e.g.
in cleanupJob).

Am I going in the right direction, or should I stop and rethink the


On Mon, Apr 18, 2011 at 10:02 AM, Chris Douglas <cdouglas@apache.org> wrote:

> I don't understand your job, but the Writable interface is just a
> format for record serialization. If your mapper generates
> <URI,offset,length> tuples referencing into data written in HDFS, that
> is sufficient to open the stream in the reducer using the FileSystem
> API. Writing an OutputFormat that interprets that tuple as a range of
> bytes to read from HDFS and write to a single stream should not
> diverge too far from the OutputFormats bundled with Hadoop. You might
> start there.
> Again, it's not clear what your goal is or what you mean by "index".
> Are the input records changed before being written by the reduce? Or
> is the purpose of this job only to concatenate index files? -C
> On Fri, Apr 15, 2011 at 8:35 PM, Shai Erera <serera@gmail.com> wrote:
> > bq. If you can change your job to handle metadata backed by a store in
> > I have two Mappers, one that works with HDFS and one with GPFS. The GPFS
> one
> > does exactly that -- it stores the indexes in GPFS (which all Mappers and
> > Reducers see, as a shared location) and outputs just the pointer to that
> > location. Then, Hadoop just merges key=LongWritable and value=Text, and
> > indeed it works better (the job runs ~170% faster).
> > The value (index) is a collection of files, though my Writable writes
> them
> > as a single stream, so in essence I can make them look a single file.
> I've
> > never worked with HDFS before (only GPFS), and HDFS is a new requirement.
> > Can you point out some example code / classes I should use to achieve the
> > same trick? Will I need, in my Mapper, to specifically call FileSystem
> > to store the index, or is a Writable enough?
> > Shai
> > On Fri, Apr 15, 2011 at 9:05 PM, Chris Douglas <cdouglas@apache.org>
> wrote:
> >>
> >> On Fri, Apr 15, 2011 at 9:34 AM, Harsh J <harsh@cloudera.com> wrote:
> >> >> Is there absolutely no way to bypass the shuffle + sort phases? I
> don't
> >> >> mind
> >> >> writing some classes if that's what it takes ...
> >> >
> >> > Shuffle is an essential part of the Map to Reduce transition, it can't
> >> > be 'bypassed' since a Reducer has to fetch all map-outputs to begin
> >> > with. Sort/Group may be made dummy as you had done, but can't be
> >> > disabled altogether AFAIK. The latter has been bought up on the lists
> >> > before, if I remember right; but am not aware of an implementation
> >> > alongside that could do that (just begin reducing merely partitioned,
> >> > unsorted data).
> >>
> >> The sort also effects the partitioning, so completely disabling the
> >> sort (as above) will only work with 1 reducer.
> >>
> >> If only grouping is important, then a bijective f(key) that is
> >> inexpensive to sort is canonical. Though more efficient grouping
> >> methods are possible, in practice this captures most of the possible
> >> performance improvement.
> >>
> >> If neither sorting nor grouping are important, then a comparator that
> >> always asserts that its operands are equal will effect the
> >> partitioning, but each reducer will receive all its records in one
> >> iterator. Note also that the key portion of the record will be
> >> incorrect in the old API.
> >>
> >> However, as Harsh correctly points out, this doesn't appear to be the
> >> bottleneck in your job. The data motion for records of tens or
> >> hundreds of MB is patently inefficient, and OOMs are a regrettable but
> >> relatively minor consequence. If you can change your job to handle
> >> metadata backed by a store in HDFS, then your job can merge the
> >> indices instead of merging GB of record data. In other words, pass a
> >> reference to the record data and not the actual.
> >>
> >> If the job neither sorts nor groups, what is the format for the index?
> >> Instead of a reduce phase, a second, single-map job that concatenates
> >> the output of the first seems better fit (assuming the goal is a
> >> single file). -C
> >
> >

View raw message