hadoop-common-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Arun C Murthy (JIRA)" <j...@apache.org>
Subject [jira] Issue Comment Edited: (HADOOP-1698) 7500+ reducers/partitions causes job to hang
Date Fri, 10 Aug 2007 19:07:43 GMT

    [ https://issues.apache.org/jira/browse/HADOOP-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12519096
] 

acmurthy edited comment on HADOOP-1698 at 8/10/07 12:07 PM:
-----------------------------------------------------------------

bq.2) InetAddress.getLocalHost result should be cached in a static variable?
Done. The code is modified to always use 127.0.0.1

-1

I believe we do a reverse-lookup of the actual IP address of the network interface and use
it to generate a statistically strong unique-random number for the *sync marker*, which this
patch breaks. 

Rather than the IP address we could use a timestamp which is further randomized by trackerid
or some such mechanism... 

Essentially we need a {{UUID}} (http://java.sun.com/j2se/1.5.0/docs/api/java/util/UUID.html);
clearly we need to check the performance impact to generate one.

      was (Author: acmurthy):
    bq.2) InetAddress.getLocalHost result should be cached in a static variable?
Done. The code is modified to always use 127.0.0.1

-1

I believe we do a reverse-lookup of the actual IP address of the network interface and use
it to generate a statistically strong unique-random number for the *sync marker*, which this
patch breaks. 

Rather than the IP address we could use a timestamp which is further randomized by trackerid
or some such mechanism... one option is to use {{UUID}} (http://java.sun.com/j2se/1.5.0/docs/api/java/util/UUID.html),
clearly we need to check it's performance impact to generate one.
  
> 7500+ reducers/partitions causes job to hang
> --------------------------------------------
>
>                 Key: HADOOP-1698
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1698
>             Project: Hadoop
>          Issue Type: Bug
>          Components: examples
>    Affects Versions: 0.13.1
>         Environment: Standard hadoop installation, any number of nodes > 10
>            Reporter: Srikanth Kakani
>            Assignee: Devaraj Das
>            Priority: Blocker
>         Attachments: 1698.patch
>
>
> Steps to Reproduce:
> On the above cluster run any job with #partitions/reducers = 8000+
> Observe CPU utilization on any mapper.
> Observations:
> The output.collect(Key, Value) call takes a huge amount of CPU, causing the job to hang.
> This is a result of two issues:
> 1) Number of partitions beyond 7500 results in a call to sortAndSpillToDisk() on each
call to output.collect
> 2) Call to sortAndSpillToDisk causes creation of a writer object, eventually calling:
>  MessageDigest digester = MessageDigest.getInstance("MD5");
>         digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
>         sync = digester.digest();
> A code-block in  SequenceFile.java(652)
> Issue #1:
> Further investigation reveals the following stack trace whenever the task is suspended.
>   [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
>   [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
>   [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
>   [4] java.net.InetAddress.getLocalHost (InetAddress.java:1,312)
>   [5] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:653)
>   [6] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:622)
>   [7] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:386)
>   [8] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:412)
>   [9] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.startPartition (MapTask.java:307)
>   [10] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpillToDisk (MapTask.java:387)
>   [11] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect (MapTask.java:355)
> /*My code*/
>   [12] mypackage.MyClass$Map.map (MyClass.java:283)
> --------------
>   [13] org.apache.hadoop.mapred.MapRunner.run (MapRunner.java:46)
>   [14] org.apache.hadoop.mapred.MapTask.run (MapTask.java:189)
>   [15] org.apache.hadoop.mapred.TaskTracker$Child.main (TaskTracker.java:1,771)
> The piece of code causing the problem is (MapTask.java:355)
> ----------------------------------------------------------
>         long totalMem = 0;
>         for (int i = 0; i < partitions; i++)
>           totalMem += sortImpl[i].getMemoryUtilized();  <---- == 16K (BasicTypeSorterBase.java(88)
(startOffsets.length (below)) * BUFFERED_KEY_VAL_OVERHEAD;
>         if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { <----------------condition
is always true if partitions > 7500
>           sortAndSpillToDisk();
>           keyValBuffer.reset();
>           for (int i = 0; i < partitions; i++) {
>             sortImpl[i].close();
>           }
>         }
> ----------------------------------------------------------
> Looking at the variable values in  org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect
(MapTask.java:355)
>  sortImpl[0] = {
>     org.apache.hadoop.mapred.BasicTypeSorterBase.keyValBuffer: instance of org.apache.hadoop.io.DataOutputBuffer(id=1159)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.startOffsets: instance of int[1024]
(id=1160) <--1K * 16 (previously explained) == 16K
>     org.apache.hadoop.mapred.BasicTypeSorterBase.keyLengths: instance of int[1024] (id=1161)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.valueLengths: instance of int[1024]
(id=1162)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.pointers: instance of int[1024] (id=1163)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.comparator: instance of org.apache.hadoop.io.MD5Hash$Comparator(id=1164)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.count: 0
>     org.apache.hadoop.mapred.BasicTypeSorterBase.BUFFERED_KEY_VAL_OVERHEAD: 16
>     org.apache.hadoop.mapred.BasicTypeSorterBase.reporter: instance of org.apache.hadoop.mapred.Task$2(id=1165)
> }
> Computation
> maxBufferSize == 120M 
> therotical max #of partitions assuming 0 keyValBuffer.getLength() =120M/16K = 7500 partitions
> Issue #2: 
> digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
>   [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
>   [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
>   [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
> InetAddress.getLocalHost() call does not cache results, this results in a look up to
the host file and DNS(???) bumping up the CPU usage even higher (Observed).
> This is a BLOCKER issue and needs immediate attention. 
> Notes:
> 1) Output.collect should not take hit from framework, separate thread to handle spill
buffer?
> 2) InetAddress.getLocalHost result should be cached in a static variable?
> 3) Spilling logic should not involve #of partitions, needs redesign?

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message