Return-Path: Delivered-To: apmail-hadoop-mapreduce-issues-archive@minotaur.apache.org Received: (qmail 73790 invoked from network); 18 Oct 2009 08:01:04 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.3) by minotaur.apache.org with SMTP; 18 Oct 2009 08:01:04 -0000 Received: (qmail 1648 invoked by uid 500); 18 Oct 2009 08:01:04 -0000 Delivered-To: apmail-hadoop-mapreduce-issues-archive@hadoop.apache.org Received: (qmail 1584 invoked by uid 500); 18 Oct 2009 08:01:04 -0000 Mailing-List: contact mapreduce-issues-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-issues@hadoop.apache.org Delivered-To: mailing list mapreduce-issues@hadoop.apache.org Received: (qmail 1574 invoked by uid 99); 18 Oct 2009 08:01:04 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 18 Oct 2009 08:01:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.140] (HELO brutus.apache.org) (140.211.11.140) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 18 Oct 2009 08:00:56 +0000 Received: from brutus (localhost [127.0.0.1]) by brutus.apache.org (Postfix) with ESMTP id 4C792234C045 for ; Sun, 18 Oct 2009 01:00:35 -0700 (PDT) Message-ID: <916453453.1255852835298.JavaMail.jira@brutus> Date: Sun, 18 Oct 2009 01:00:35 -0700 (PDT) From: "Chris Douglas (JIRA)" To: mapreduce-issues@hadoop.apache.org Subject: [jira] Commented: (MAPREDUCE-64) Map-side sort is hampered by io.sort.record.percent MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/jira/browse/MAPREDUCE-64?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12767022#action_12767022 ] Chris Douglas commented on MAPREDUCE-64: ---------------------------------------- bq. one simple way might be to simply add TRACE level log messages at every collect() call with the current values of every index plus the spill number [...] That could be an interesting visualization. I'd already made up the diagrams, but anything that helps the analysis and validation would be welcome. I'd rather not add a trace to the committed code, but data from it sounds great. bq. I ran a simple test where I was running a sort of 10 byte records, and it turned out that the "optimal" io.sort.record.percent caused my job to be significantly slower. It was the case then that a small number of large spills actually ran slower than a large number of small spills. Did we ever determine what that issue was? I think we should try to understand why the theory isn't agreeing with observations here. IIRC those tests used a non-RawComparator, right? Runping reported similar results, where hits to concurrent collection were more expensive than small spills. The current theory is that keeping the map thread unblocked is usually better for performance. Based on this observation, I'm hoping that the spill.percent can also be eliminated at some point in the future, though the performance we're leaving on the table there is probably not as severe and is more difficult to generalize. Microbenchmarks may also not capture the expense of merging many small spills in a busy, shared cluster, where HDFS and other tasks are completing for disk bandwidth. I'll be very interested in metrics from MAPREDUCE-1115, as they would help to flesh out this hypothesis. ---- The documentation (such as it is) in HADOOP-2919 describes the existing code. The metadata and serialization data are tracked using a set of indices marking the start and end of a spill ({{kvstart}}, {{kvend}}) and the current position ({{kvindex}}) while the serialization data are described by similar markers ({{bufstart}}, {{bufend}}, {{bufindex}}). There are two other indices carried over from the existing design. {{bufmark}} is the position in the serialized record data of the end of the last fully serialized record. {{bufvoid}} is necessary for the RawComparator interface, which requires contiguous ranges for key compares; if a serialized key crosses the end of the buffer, it must be copied to the front to satisfy the aforementioned API spec. All of these are retained; the role of each is largely unchanged. The proposed design adds another parameter, the {{equator}} (while {{kvstart}} and {{bufstart}} could be replaced with a single variable similar to {{equator}} the effort seemed misspent). The record serialization moves "forward" in the buffer, while the metadata are allocated in 16 byte blocks in the opposite direction. This is illustrated in the following diagram: !M64-0i.png|thumbnail! The role played by kvoffsets and kvindices is preserved; logically, particularly in the spill, each is interpreted in roughly the same way. In the new code, the allocation is not static, but will instead expand with the serialized records. This avoids degenerate cases for combiners and multilevel merges (though not necessarily optimal performance). Spills are triggered in two conditions: either the soft limit is reached (collection proceeds concurrently with the spill) or a record is large enough to require a spill before it can be written to the buffer (collection is blocked). In the former case is illustrated here: !M64-1i.png|thumbnail! The {{equator}} is moved to an offset proportional to the average record size (caveats [above|https://issues.apache.org/jira/browse/MAPREDUCE-64?focusedCommentId=12765984&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#action_12765984]), {{kvindex}} is moved off the equator, aligned with the end of the array (int alignment, also so no metadata block will span the end of the array). Collection proceeds again from the equator, growing toward the ends of the spill. Should either run out of space, collection will block until the spill completes. Note that there is no partially written data when the soft limit is reached; it can only be triggered in collect, not in the blocking buffer. The other case to consider is when record data are partially written into the collection buffer, but the available space is exhausted: !M64-2i.png|thumbnail! Here, the equator is moved to the beginning of the partial record and collection blocks. When the spill completes, the metadata are written off the equator and serialization of the record can continue. During collection, indices are adjusted only when holding a lock. As in the current code, the lock is only obtained in collect when one of the possible conditions for spilling may be satisfied. Since collect does not block, every serialized record performs a zero-length write into the buffer once both the key and value are written. This ensures that all the boundaries are checked and that collection will block if necessary. That's about it. > Map-side sort is hampered by io.sort.record.percent > --------------------------------------------------- > > Key: MAPREDUCE-64 > URL: https://issues.apache.org/jira/browse/MAPREDUCE-64 > Project: Hadoop Map/Reduce > Issue Type: Bug > Reporter: Arun C Murthy > Assignee: Chris Douglas > Attachments: M64-0.patch, M64-0i.png, M64-1.patch, M64-1i.png, M64-2.patch, M64-2i.png, M64-3.patch > > > Currently io.sort.record.percent is a fairly obscure, per-job configurable, expert-level parameter which controls how much accounting space is available for records in the map-side sort buffer (io.sort.mb). Typically values for io.sort.mb (100) and io.sort.record.percent (0.05) imply that we can store ~350,000 records in the buffer before necessitating a sort/combine/spill. > However for many applications which deal with small records e.g. the world-famous wordcount and it's family this implies we can only use 5-10% of io.sort.mb i.e. (5-10M) before we spill inspite of having _much_ more memory available in the sort-buffer. The word-count for e.g. results in ~12 spills (given hdfs block size of 64M). The presence of a combiner exacerbates the problem by piling serialization/deserialization of records too... > Sure, jobs can configure io.sort.record.percent, but it's tedious and obscure; we really can do better by getting the framework to automagically pick it by using all available memory (upto io.sort.mb) for either the data or accounting. -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.