hadoop-common-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Douglas (JIRA)" <j...@apache.org>
Subject [jira] Updated: (HADOOP-2919) Create fewer copies of buffer data during sort/spill
Date Sat, 01 Mar 2008 02:03:51 GMT

     [ https://issues.apache.org/jira/browse/HADOOP-2919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel

Chris Douglas updated HADOOP-2919:

    Attachment: 2919-0.patch

This patch effects the following changes to improve our efficiency in this area. Instead of
gradually growing our buffers, we use properties to determine the size of the K,V byte buffer
and accounting data and allocate it up front. We maintain accounting information for the task
as two arrays of ints (rather than separate arrays for each partition), mimicking the existing
BufferSorter interface. The first stores offsets into the second, which maintains the k/v
offsets and partition information for the keys. This permits us to swap offsets to effect
the sort, as is presently implemented in BufferSorter, but without requiring us to wrap them
in IntWritables.

kvoffset buffer        kvindices buffer
 _____________         _________________
|offset k1,v1 |       | partition k1,v1 |
|offset k1,v2 |       | k1 offset       |
     ...              | v1 offset       |
|offset kn,vn |       | partition k2,v2 |
                      | k2 offset       |
                      | v2 offset       |
                      | partition kn,vn |
                      | kn offset       |
                      | vn offset       |

By default, the total size of the accounting space is 5% of io.sort.mb. We build on the work
done in HADOOP-1965, but rather than using 50% of io.sort.mb before a spill, we set a "soft"
limit that defaults to 80% of the number of records or 80% of the K,V buffer before starting
a spill thread. Note that this limit does not require us to query each partition collector
for its memory usage, but can be effected by examining our indices. Rather than permitting
the spill thread to "own" references to the buffers, we maintain a set of indices into the
offset and k,v byte buffers defining the area of each in which the spill buffer is permitted
to work. According to the Java VM spec, we can assume that reading/writing array elements
does not require a lock on the array.

We maintain three indices for both the accounting and k,v buffers: start, end, and index.
The area between start and end is available to the spill, while the area between end and index
(in truth, a marker noting end of the last record written) contains "spillable" data yet to
be written to disk. If the soft limit is reached- or if one attempts a write into the buffer
that is too large to accommodate without a spill- then the task thread sets the end index
to the last record marker and triggers a spill. While the spill is running, the area between
the start and end indices is unavailable for writing from collect(K,V) and the task thread
will block until the spill has completed if the index marker hits the start marker.

Buffer indices uring a spill:
 ___________      ___________      ___________
|___________|    |___________|    |___________|
 ^     ^  ^ ^      ^  ^  ^  ^      ^  ^ ^   ^
 s     e  i v      i  s  e  v      e  i s   v

It is worth mentioning that each key must be contiguous to be used with a RawComparator, but
values can wrap around the end of the buffer. This requires us to note the "voided" space
in the buffer that contains no data. When the spill completes, it sets the start marker to
the end marker, making that space available for writing. Note that it must also reset the
void marker to the buffer size if the spill wraps around the end of the buffer (the rightmost
case in the preceding figure). The "voided" marker is owned by whichever thread needs to manipulate
it, so we require no special locking for it.

When we sort, we sort all spill data by partition instead of creating a separate collector
for each partition. Further, we can use appendRaw (as was suggested in HADOOP-1609) to write
our serialized data directly from the k,v buffer to our spill file writer instead of deserializing
each prior to the write. Note that for record-compressed data (when not using a combiner),
this permits us to store compressed values in our k,v buffer.

The attached patch is a work in progress, and is known to suffer from the following deficiencies:
* Very large keys and values (with a comparably small io.sort.mb) present a difficult problem
for a statically allocated collection buffer. If a series of writes to an empty collection
exceed the space allocated to the k,v byte buffer (e.g. a 100MB k,v byte buffer and a Writable
that attempts 2 51MB write(byte[],int,int) calls), the current patch will loop forever. This
will also happen for separate writes. The current patch only spills when the soft limit is
* Handling of compression is inelegantly implemented. Again, this is a work in progress and
will be cleaned up.
* The spill thread is created each time it is invoked, but it need not be.
* The code managing the contiguous key property is not as efficient as it could be.
* The implementation of QuickSort could be improved (re: Sedgewick) to handle the case where
keys are equal to the pivot, probably a fairly common case.

> Create fewer copies of buffer data during sort/spill
> ----------------------------------------------------
>                 Key: HADOOP-2919
>                 URL: https://issues.apache.org/jira/browse/HADOOP-2919
>             Project: Hadoop Core
>          Issue Type: Improvement
>          Components: mapred
>            Reporter: Chris Douglas
>         Attachments: 2919-0.patch
> Currently, the sort/spill works as follows:
> Let r be the number of partitions
> For each call to collect(K,V) from map:
> * If buffers do not exist, allocate a new DataOutputBuffer to collect K,V bytes, allocate
r buffers for collecting K,V offsets
> * Write K,V into buffer, noting offsets
> * Register offsets with associated partition buffer, allocating/copying accounting buffers
if nesc
> * Calculate the total mem usage for buffer and all partition collectors by iterating
over the collectors
> * If total mem usage is greater than half of io.sort.mb, then start a new thread to spill,
blocking if another spill is in progress
> For each spill (assuming no combiner):
> * Save references to our K,V byte buffer and accounting data, setting the former to null
(will be recreated on the next call to collect(K,V))
> * Open a SequenceFile.Writer for this partition
> * Sort each partition separately (the current version of sort reuses, but still requires
wrapping, indices in IntWritable objects)
> * Build a RawKeyValueIterator of sorted data for the partition
> * Deserialize each key and value and call SequenceFile::append(K,V) on the writer for
this partition
> There are a number of opportunities for reducing the number of copies, creations, and
operations we perform in this stage, particularly since growing many of the buffers involved
requires that we copy the existing data to the newly sized allocation.

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

View raw message