hadoop-common-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Arun C Murthy (JIRA)" <j...@apache.org>
Subject [jira] Commented: (HADOOP-1698) 7500+ reducers/partitions causes job to hang
Date Thu, 09 Aug 2007 07:09:45 GMT

    [ https://issues.apache.org/jira/browse/HADOOP-1698?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_12518641
] 

Arun C Murthy commented on HADOOP-1698:
---------------------------------------

bq. 1) Output.collect should not take hit from framework, separate thread to handle spill
buffer?

I'm not sure this is very easy to achieve... I'll let Devaraj comment on this one.

bq. 2) InetAddress.getLocalHost result should be cached in a static variable?

+1 
We probably do not need to use this at all... some other statistic could be enough e.g. a
numerical trackerid?

bq. 3) Spilling logic should not involve #of partitions, needs redesign?

Here the main idea is that each spill (i.e. to each reduce) is sorted individually; since
the reducer only merges the sorted spills form each map. However I guess we could limit the
no. of partitions we track in memory at a given point? Again, over to Devaraj...


> 7500+ reducers/partitions causes job to hang
> --------------------------------------------
>
>                 Key: HADOOP-1698
>                 URL: https://issues.apache.org/jira/browse/HADOOP-1698
>             Project: Hadoop
>          Issue Type: Bug
>          Components: examples
>    Affects Versions: 0.13.1
>         Environment: Standard hadoop installation, any number of nodes > 10
>            Reporter: Srikanth Kakani
>            Priority: Blocker
>
> Steps to Reproduce:
> On the above cluster run any job with #partitions/reducers = 8000+
> Observe CPU utilization on any mapper.
> Observations:
> The output.collect(Key, Value) call takes a huge amount of CPU, causing the job to hang.
> This is a result of two issues:
> 1) Number of partitions beyond 7500 results in a call to sortAndSpillToDisk() on each
call to output.collect
> 2) Call to sortAndSpillToDisk causes creation of a writer object, eventually calling:
>  MessageDigest digester = MessageDigest.getInstance("MD5");
>         digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
>         sync = digester.digest();
> A code-block in  SequenceFile.java(652)
> Issue #1:
> Further investigation reveals the following stack trace whenever the task is suspended.
>   [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
>   [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
>   [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
>   [4] java.net.InetAddress.getLocalHost (InetAddress.java:1,312)
>   [5] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:653)
>   [6] org.apache.hadoop.io.SequenceFile$Writer.<init> (SequenceFile.java:622)
>   [7] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:386)
>   [8] org.apache.hadoop.io.SequenceFile.createWriter (SequenceFile.java:412)
>   [9] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.startPartition (MapTask.java:307)
>   [10] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpillToDisk (MapTask.java:387)
>   [11] org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect (MapTask.java:355)
> /*My code*/
>   [12] mypackage.MyClass$Map.map (MyClass.java:283)
> --------------
>   [13] org.apache.hadoop.mapred.MapRunner.run (MapRunner.java:46)
>   [14] org.apache.hadoop.mapred.MapTask.run (MapTask.java:189)
>   [15] org.apache.hadoop.mapred.TaskTracker$Child.main (TaskTracker.java:1,771)
> The piece of code causing the problem is (MapTask.java:355)
> ----------------------------------------------------------
>         long totalMem = 0;
>         for (int i = 0; i < partitions; i++)
>           totalMem += sortImpl[i].getMemoryUtilized();  <---- == 16K (BasicTypeSorterBase.java(88)
(startOffsets.length (below)) * BUFFERED_KEY_VAL_OVERHEAD;
>         if ((keyValBuffer.getLength() + totalMem) >= maxBufferSize) { <----------------condition
is always true if partitions > 7500
>           sortAndSpillToDisk();
>           keyValBuffer.reset();
>           for (int i = 0; i < partitions; i++) {
>             sortImpl[i].close();
>           }
>         }
> ----------------------------------------------------------
> Looking at the variable values in  org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect
(MapTask.java:355)
>  sortImpl[0] = {
>     org.apache.hadoop.mapred.BasicTypeSorterBase.keyValBuffer: instance of org.apache.hadoop.io.DataOutputBuffer(id=1159)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.startOffsets: instance of int[1024]
(id=1160) <--1K * 16 (previously explained) == 16K
>     org.apache.hadoop.mapred.BasicTypeSorterBase.keyLengths: instance of int[1024] (id=1161)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.valueLengths: instance of int[1024]
(id=1162)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.pointers: instance of int[1024] (id=1163)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.comparator: instance of org.apache.hadoop.io.MD5Hash$Comparator(id=1164)
>     org.apache.hadoop.mapred.BasicTypeSorterBase.count: 0
>     org.apache.hadoop.mapred.BasicTypeSorterBase.BUFFERED_KEY_VAL_OVERHEAD: 16
>     org.apache.hadoop.mapred.BasicTypeSorterBase.reporter: instance of org.apache.hadoop.mapred.Task$2(id=1165)
> }
> Computation
> maxBufferSize == 120M 
> therotical max #of partitions assuming 0 keyValBuffer.getLength() =120M/16K = 7500 partitions
> Issue #2: 
> digester.update((new UID()+"@"+InetAddress.getLocalHost()).getBytes());
>   [1] java.net.Inet4AddressImpl.lookupAllHostAddr (native method)
>   [2] java.net.InetAddress$1.lookupAllHostAddr (InetAddress.java:849)
>   [3] java.net.InetAddress.getAddressFromNameService (InetAddress.java:1,183)
> InetAddress.getLocalHost() call does not cache results, this results in a look up to
the host file and DNS(???) bumping up the CPU usage even higher (Observed).
> This is a BLOCKER issue and needs immediate attention. 
> Notes:
> 1) Output.collect should not take hit from framework, separate thread to handle spill
buffer?
> 2) InetAddress.getLocalHost result should be cached in a static variable?
> 3) Spilling logic should not involve #of partitions, needs redesign?

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message