hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adam Kirsch <adam.kir...@gmail.com>
Subject question about spill/combine
Date Sun, 21 Feb 2010 21:35:06 GMT
Hi,

I'm new to hadoop and am confused about some serialization issues in
the spill.  My reference for this post is the following snippet from
the O'Reilly Hadoop book:

===================

Each map task has a circular memory buffer that it writes the output
to. The buffer is 100 MB by default, a size which can be tuned by
changing the io.sort.mb property. When the contents of the buffer
reaches a certain threshold size (io.sort.spill.percent, default 0.80,
or 80%) a background thread will start to spill the contents to disk.
Map outputs will continue to be written to the buffer while the spill
takes place, but if the buffer fills up during this time, the map will
block until the spill is complete. Spills are written in round-robin
fashion to the directories specified by the mapred.local.dir property,
in a job-specific subdirectory.

Before it writes to disk, the thread first divides the data into
partitions corresponding to the reducers that they will ultimately be
sent to. Within each partition, the background thread performs an
in-memory sort by key, and if there is a combiner function, it is run
on the output of the sort.

Each time the memory buffer reaches the spill threshold, a new spill
file is created, so after the map task has written its last output
record there could be several spill files. Before the task is
finished, the spill files are merged into a single partitioned and
sorted output file. The configuration property io.sort.factor controls
the maximum number of streams to merge at once; the default is 10.

If a combiner function has been specified, and the number of spills is
at least three (the value of the min.num.spills.for.combine property),
then the combiner is run before the output file is written. Recall
that combiners may be run repeatedly over the input without affecting
the final result. The point is that running combiners makes for a more
compact map output, so there is less data to write to local disk and
to transfer to the reducer.

=========================

The first time I read this, it made perfect sense to me, but then I
started thinking about whether combining or serialization happens
first.  From the text above, it looks like the circular memory buffer
used in the spill stores serialized data in some form, because
otherwise saying that, by default, the buffer is 100MB and the spill
is triggered when it is 80% full is not very meaningful.  (If it just
stored pointers to objects, how would hadoop know how much serialized
data that really represents?)  Of course, the deserialized (before
serialization) contents of the buffer must still be stored someplace,
in order to facilitate the in-memory sort by key.  After the sort is
run and the results are partitioned by key for the reducers, the
combiner may be run (depending on settings) as a kind of
application-specific compression mechanism.  The output of the
combiner is what is actually written to disk (or the compressed
output, if the compression option is used).  (And when the map job
runs, for each reducer, all the files corresponding to that reducer
are merged and the result is sent to the reducer.)

Here's my problem: it looks like all of the records that come out of
the map task are serialized in-memory before the combiner is even run,
and that if the combiner is being used aggressively (I am thinking
about writing jobs where the combiner is run on every spill, and the
value objects for the reducer are all fairly large), the results of
this serialization won't ever make it to disk.  Instead, the output of
the combiner (which runs on the deserialized [before serialization]
outputs from the map task) is ultimately what gets serialized to disk.

Thus, it looks like even if I use the combiner aggressively, I have to
pay for in-memory serialization of every record output by the map
function, which strikes me as very inefficient.  Now, maybe hadoop can
avoid serializing the contents of the map records by using some
heuristic to estimate the serialized size of the map function outputs
that have not yet been combined/written to disk in order to determine
when to trigger the spill, but I see no mention of that.  (It could
even feed an implementation of DataOutput to the Writable that doesn't
actually serialize, but instead just counts the number of bytes that
would be needed for the serialization; this would save a bunch of
memory operations, but it would still have the problem of doing all of
the computation and memory accesses in the Writables necessary for
serialization.)

Is this right?  The natural work-around for this is for me to
implement my own logic for combining in the mapper.  That is, rather
than just doing some small operations in the map function and passing
the information to the OutputCollector, I could keep state,
periodically do my own combining when that state gets to big, and then
only feed the results to the OutputCollector after that.  But that's
very nasty, because mappers are not supposed to accumulate state like
this; the built-in combining functionality is supposed to be used to
deal with exactly this issue.  Really, it seems that hadoop should
provide some way for a developer to configure triggering of the spill
in a more general way than serialized size.

This issue also begs the question of how hadoop does in-memory
serialization, and how well that performs for large objects, which I
will likely have.  I suppose it could use a linked list of byte arrays
of (possibly) increasing sizes or something like that, but if it does
something silly like use a single resizable vector of bytes, large
objects could require lots of extra copies.  (My guess is that hadoop
relies on some standard java utility from the network I/O package for
this, and that the end result is something fairly reasonable, but it
seems worth looking asking.)

Thanks very much,
Adam

Mime
View raw message