Return-Path: Delivered-To: apmail-lucene-hadoop-dev-archive@locus.apache.org Received: (qmail 54408 invoked from network); 10 Aug 2007 18:58:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Aug 2007 18:58:20 -0000 Received: (qmail 25850 invoked by uid 500); 10 Aug 2007 18:58:06 -0000 Delivered-To: apmail-lucene-hadoop-dev-archive@lucene.apache.org Received: (qmail 25818 invoked by uid 500); 10 Aug 2007 18:58:06 -0000 Mailing-List: contact hadoop-dev-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-dev@lucene.apache.org Received: (qmail 25680 invoked by uid 99); 10 Aug 2007 18:58:05 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Aug 2007 11:58:05 -0700 X-ASF-Spam-Status: No, hits=-99.6 required=10.0 tests=ALL_TRUSTED,SUBJECT_FUZZY_TION X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO brutus.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 10 Aug 2007 18:58:08 +0000 Received: from brutus (localhost [127.0.0.1]) by brutus.apache.org (Postfix) with ESMTP id 599C0714191 for ; Fri, 10 Aug 2007 11:57:43 -0700 (PDT) Message-ID: <33386222.1186772263364.JavaMail.jira@brutus> Date: Fri, 10 Aug 2007 11:57:43 -0700 (PDT) From: "Devaraj Das (JIRA)" To: hadoop-dev@lucene.apache.org Subject: [jira] Issue Comment Edited: (HADOOP-1698) 7500+ reducers/partitions causes job to hang In-Reply-To: <8460686.1186618859833.JavaMail.jira@brutus> MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/jira/browse/HADOOP-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12519086 ] devaraj edited comment on HADOOP-1698 at 8/10/07 11:56 AM: --------------------------------------------------------------- Thanks Srikanth for the so-detailed bug report/analysis. Really awesome! The attached patch should address the concerns. Regarding your questions: 1) Output.collect should not take hit from framework, separate thread to handle spill buffer? I can't see how this can be done without substantial redesign, and I don't see much gain of doing it this way (if we want to keep memory usage under bounds) 2) InetAddress.getLocalHost result should be cached in a static variable? Done. The code is modified to always use 127.0.0.1 3) Spilling logic should not involve #of partitions, needs redesign? Modified this to take into account the number of key/value in a particular partition. If a particular partition has nothing, then the associated arrays (like startOffset) doesn't contribute anything to the memory used. Also, reduced the initial array sizes from 1K to 5. Shouldn't be a problem on performance. Also, the patch models the memory usage better. It takes into account the temp space that the mergesort would take for the temp arrays when the spilling logic executes. The patch also reinitializes the arrays to null upon every spill to reflect the used memory better. was (Author: devaraj): Thanks Srikanth for the so-detailed bug report/analysis. Really awesome! The attached patch should address the concerns. Regarding your questions: 1) Output.collect should not take hit from framework, separate thread to handle spill buffer? I can't see how this can be done without substantial redesign, and I don't see much gain of doing it this way (if we want to keep memory usage under bounds) 2) InetAddress.getLocalHost result should be cached in a static variable? Done. The code is modified to always use 127.0.0.1 3) Spilling logic should not involve #of partitions, needs redesign? Modified this to take into account the number of key/value in a particular partition. If a particular partition has nothing, then the associated arrays (like startOffset) doesn't contribute anything to the memory used. Also, reduced the initial array sizes from 1K to 5. Shouldn't be a problem on performance. Also, the patch models the memory usage better. It takes into account the temp space that the mergesort would take for the temp arrays when the spilling logic executes. The patch also reinitializes the arrays to null upon every spill so that unused memory doesn't account towards spilling. > 7500+ reducers/partitions causes job to hang > -------------------------------------------- > > Key: HADOOP-1698 > URL: https://issues.apache.org/jira/browse/HADOOP-1698 > Project: Hadoop > Issue Type: Bug > Components: examples > Affects Versions: 0.13.1 > Environment: Standard hadoop installation, any number of nodes > 10 > Reporter: Srikanth Kakani > Assignee: Devaraj Das > Priority: Blocker > Attachments: 1698.patch > > > Steps to Reproduce: > On the above cluster run any job with #partitions/reducers = 8000+ > Observe CPU utilization on any mapper. > Observations: > The output.collect(Key, Value) call takes a huge amount of CPU, causing the job to hang. > This is a result of two issues: > 1) Number of partitions beyond 7500 results in a call to sortAndSpillToDisk() on each call to output.collect > 2) Call to sortAndSpillToDisk causes creation of a writer object, eventually calling: > MessageDigest digester = MessageDigest.getInstance("MD5"); > digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes()); > sync = digester.digest(); > A code-block in SequenceFile.java(652) > Issue #1: > Further investigation reveals the following stack trace whenever the task is suspended. > [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method) > [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849) > [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183) > [4] java.net.InetAddress.getLocalHost (InetAddress.java:1,312) > [5] org.apache.hadoop.io.SequenceFile$Writer. (SequenceFile.java:653) > [6] org.apache.hadoop.io.SequenceFile$Writer. (SequenceFile.java:622) > [7] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:386) > [8] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:412) > [9] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.startPartition (MapTask.java:307) > [10] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpillToDisk (MapTask.java:387) > [11] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect (MapTask.java:355) > /*My code*/ > [12] mypackage.MyClass$Map.map (MyClass.java:283) > -------------- > [13] org.apache.hadoop.mapred.MapRunner.run (MapRunner.java:46) > [14] org.apache.hadoop.mapred.MapTask.run (MapTask.java:189) > [15] org.apache.hadoop.mapred.TaskTracker$Child.main (TaskTracker.java:1,771) > The piece of code causing the problem is (MapTask.java:355) > ---------------------------------------------------------- > long totalMem = 0; > for (int i = 0; i < partitions; i++) > totalMem += sortImpl[i].getMemoryUtilized(); <---- == 16K (BasicTypeSorterBase.java(88) (startOffsets.length (below)) * BUFFERED_KEY_VAL_OVERHEAD; > if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { <----------------condition is always true if partitions > 7500 > sortAndSpillToDisk(); > keyValBuffer.reset(); > for (int i = 0; i < partitions; i++) { > sortImpl[i].close(); > } > } > ---------------------------------------------------------- > Looking at the variable values in org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect (MapTask.java:355) > sortImpl[0] = { > org.apache.hadoop.mapred.BasicTypeSorterBase.keyValBuffer: instance of org.apache.hadoop.io.DataOutputBuffer(id=1159) > org.apache.hadoop.mapred.BasicTypeSorterBase.startOffsets: instance of int[1024] (id=1160) <--1K * 16 (previously explained) == 16K > org.apache.hadoop.mapred.BasicTypeSorterBase.keyLengths: instance of int[1024] (id=1161) > org.apache.hadoop.mapred.BasicTypeSorterBase.valueLengths: instance of int[1024] (id=1162) > org.apache.hadoop.mapred.BasicTypeSorterBase.pointers: instance of int[1024] (id=1163) > org.apache.hadoop.mapred.BasicTypeSorterBase.comparator: instance of org.apache.hadoop.io.MD5Hash$Comparator(id=1164) > org.apache.hadoop.mapred.BasicTypeSorterBase.count: 0 > org.apache.hadoop.mapred.BasicTypeSorterBase.BUFFERED_KEY_VAL_OVERHEAD: 16 > org.apache.hadoop.mapred.BasicTypeSorterBase.reporter: instance of org.apache.hadoop.mapred.Task$2(id=1165) > } > Computation > maxBufferSize == 120M > therotical max #of partitions assuming 0 keyValBuffer.getLength() =120M/16K = 7500 partitions > Issue #2: > digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes()); > [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method) > [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849) > [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183) > InetAddress.getLocalHost() call does not cache results, this results in a look up to the host file and DNS(???) bumping up the CPU usage even higher (Observed). > This is a BLOCKER issue and needs immediate attention. > Notes: > 1) Output.collect should not take hit from framework, separate thread to handle spill buffer? > 2) InetAddress.getLocalHost result should be cached in a static variable? > 3) Spilling logic should not involve #of partitions, needs redesign? -- This message is automatically generated by JIRA. - You can reply to this email to add a comment to the issue online.