hadoop-mapreduce-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Chris Douglas (JIRA)" <j...@apache.org>
Subject [jira] Commented: (MAPREDUCE-64) Map-side sort is hampered by io.sort.record.percent
Date Sun, 18 Oct 2009 08:00:35 GMT

    [ https://issues.apache.org/jira/browse/MAPREDUCE-64?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12767022#action_12767022

Chris Douglas commented on MAPREDUCE-64:

bq. one simple way might be to simply add TRACE level log messages at every collect() call
with the current values of every index plus the spill number [...]

That could be an interesting visualization. I'd already made up the diagrams, but anything
that helps the analysis and validation would be welcome. I'd rather not add a trace to the
committed code, but data from it sounds great.

bq. I ran a simple test where I was running a sort of 10 byte records, and it turned out that
the "optimal" io.sort.record.percent caused my job to be significantly slower. It was the
case then that a small number of large spills actually ran slower than a large number of small
spills. Did we ever determine what that issue was? I think we should try to understand why
the theory isn't agreeing with observations here.

IIRC those tests used a non-RawComparator, right? Runping reported similar results, where
hits to concurrent collection were more expensive than small spills. The current theory is
that keeping the map thread unblocked is usually better for performance. Based on this observation,
I'm hoping that the spill.percent can also be eliminated at some point in the future, though
the performance we're leaving on the table there is probably not as severe and is more difficult
to generalize. Microbenchmarks may also not capture the expense of merging many small spills
in a busy, shared cluster, where HDFS and other tasks are completing for disk bandwidth. I'll
be very interested in metrics from MAPREDUCE-1115, as they would help to flesh out this hypothesis.


The documentation (such as it is) in HADOOP-2919 describes the existing code. The metadata
and serialization data are tracked using a set of indices marking the start and end of a spill
({{kvstart}}, {{kvend}}) and the current position ({{kvindex}}) while the serialization data
are described by similar markers ({{bufstart}}, {{bufend}}, {{bufindex}}). There are two other
indices carried over from the existing design. {{bufmark}} is the position in the serialized
record data of the end of the last fully serialized record. {{bufvoid}} is necessary for the
RawComparator interface, which requires contiguous ranges for key compares; if a serialized
key crosses the end of the buffer, it must be copied to the front to satisfy the aforementioned
API spec. All of these are retained; the role of each is largely unchanged.

The proposed design adds another parameter, the {{equator}} (while {{kvstart}} and {{bufstart}}
could be replaced with a single variable similar to {{equator}} the effort seemed misspent).
The record serialization moves "forward" in the buffer, while the metadata are allocated in
16 byte blocks in the opposite direction. This is illustrated in the following diagram:


The role played by kvoffsets and kvindices is preserved; logically, particularly in the spill,
each is interpreted in roughly the same way. In the new code, the allocation is not static,
but will instead expand with the serialized records. This avoids degenerate cases for combiners
and multilevel merges (though not necessarily optimal performance).

Spills are triggered in two conditions: either the soft limit is reached (collection proceeds
concurrently with the spill) or a record is large enough to require a spill before it can
be written to the buffer (collection is blocked). In the former case is illustrated here:


The {{equator}} is moved to an offset proportional to the average record size (caveats [above|https://issues.apache.org/jira/browse/MAPREDUCE-64?focusedCommentId=12765984&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12765984]),
{{kvindex}} is moved off the equator, aligned with the end of the array (int alignment, also
so no metadata block will span the end of the array). Collection proceeds again from the equator,
growing toward the ends of the spill. Should either run out of space, collection will block
until the spill completes. Note that there is no partially written data when the soft limit
is reached; it can only be triggered in collect, not in the blocking buffer.

The other case to consider is when record data are partially written into the collection buffer,
but the available space is exhausted:


Here, the equator is moved to the beginning of the partial record and collection blocks. When
the spill completes, the metadata are written off the equator and serialization of the record
can continue.

During collection, indices are adjusted only when holding a lock. As in the current code,
the lock is only obtained in collect when one of the possible conditions for spilling may
be satisfied. Since collect does not block, every serialized record performs a zero-length
write into the buffer once both the key and value are written. This ensures that all the boundaries
are checked and that collection will block if necessary.

That's about it.

> Map-side sort is hampered by io.sort.record.percent
> ---------------------------------------------------
>                 Key: MAPREDUCE-64
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-64
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>            Reporter: Arun C Murthy
>            Assignee: Chris Douglas
>         Attachments: M64-0.patch, M64-0i.png, M64-1.patch, M64-1i.png, M64-2.patch, M64-2i.png,
> Currently io.sort.record.percent is a fairly obscure, per-job configurable, expert-level
parameter which controls how much accounting space is available for records in the map-side
sort buffer (io.sort.mb). Typically values for io.sort.mb (100) and io.sort.record.percent
(0.05) imply that we can store ~350,000 records in the buffer before necessitating a sort/combine/spill.
> However for many applications which deal with small records e.g. the world-famous wordcount
and it's family this implies we can only use 5-10% of io.sort.mb i.e. (5-10M) before we spill
inspite of having _much_ more memory available in the sort-buffer. The word-count for e.g.
results in ~12 spills (given hdfs block size of 64M). The presence of a combiner exacerbates
the problem by piling serialization/deserialization of records too...
> Sure, jobs can configure io.sort.record.percent, but it's tedious and obscure; we really
can do better by getting the framework to automagically pick it by using all available memory
(upto io.sort.mb) for either the data or accounting.

This message is automatically generated by JIRA.
You can reply to this email to add a comment to the issue online.

View raw message