hadoop-mapreduce-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arun Murthy <...@hortonworks.com>
Subject Re: hadoop 0.21.0 A Bug Found, followed by a possible fix
Date Sat, 23 Jul 2011 02:38:45 GMT
Pls open a jira and file the patch. Thanks!

Sent from my iPhone

On Jul 22, 2011, at 6:56 PM, sp yu <theysp1986@gmail.com> wrote:

> I've recently using hadoop*(version 0.21.0)* for some data processing, but
> sometimes reducer crashed. Always the log is like bellow(at the end of this
> mail), which tells when multi fetchers recieved mapoutput simultaneously,
> the merge part may fail due to some disadvantages. To verify this
> assumption, I then set the number of fetchers to 1 (
> mapreduce.reduce.shuffle.parallelcopies), after which the problem dispeared
> and the job works well.
>
> So I checked the merge SRC, I found that in
> class org.apache.hadoop.mapreduce.task.reduce.MergeManager, the
> function closeInMemoryFile(MapOutput<K,V>
> mapOutput) start from line 275, which is coded like this:
>
> 275  public synchronized void closeInMemoryFile(MapOutput<K,V> mapOutput) {
> 276    inMemoryMapOutputs.add(mapOutput);
> 277    LOG.info("closeInMemoryFile -> map-output of size: " +
> mapOutput.getSize()
> 278        + ", inMemoryMapOutputs.size() -> " + inMemoryMapOutputs.size());
> 279
> 280    synchronized (inMemoryMerger) {
> 281      if (!inMemoryMerger.isInProgress() && usedMemory >= mergeThreshold)
> {
> 282        LOG.info("Starting inMemoryMerger's merge since usedMemory=" +
> 283            usedMemory + " > mergeThreshold=" + mergeThreshold);
> 284        inMemoryMapOutputs.addAll(inMemoryMergedMapOutputs);
> 285        inMemoryMergedMapOutputs.clear();
> 286        inMemoryMerger.startMerge(inMemoryMapOutputs);
> 287      }
> 288    }
> 289
> 290    if (memToMemMerger != null) {
> 291      synchronized (memToMemMerger) {
> 292        if (!memToMemMerger.isInProgress() &&
> 293            inMemoryMapOutputs.size() >= memToMemMergeOutputsThreshold) {
> 294          memToMemMerger.startMerge(inMemoryMapOutputs);
> 295        }
> 296      }
> 297    }
> 299  }
>
> in which, line 297 is not synchronized by object inMemoryMerger, so there
> would be a risk to caus synchronize problem in the fetcher threads. I think
> move line 297 under line 280 will fix this problem.
>
> the first time to contribute something, thanks for your attention.
>
>
>
> 2011-07-20 18:56:34,999 INFO
> org.apache.hadoop.mapreduce.task.reduce.EventFetcher:
> attempt_201107140944_0086_r_000000_0: Got 2 new map-outputs
> 2011-07-20 18:56:35,000 INFO
> org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler: Assiging
> dc-0005:50060 with 1 to fetcher#1
> 2011-07-20 18:56:35,000 INFO
> org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler: assigned 1 of 1 to
> dc-0005:50060 to fetcher#1
> 2011-07-20 18:56:35,000 INFO
> org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler: Assiging
> dc-0002:50060 with 1 to fetcher#3
> 2011-07-20 18:56:35,000 INFO
> org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler: assigned 1 of 1 to
> dc-0002:50060 to fetcher#3
> 2011-07-20 18:56:35,006 INFO
> org.apache.hadoop.mapreduce.task.reduce.Fetcher: for
> url=50060/mapOutput?job=job_201107140944_0086&reduce=0&map=attempt_201107140944_0086_m_000498_0
> sent hash and receievd reply
> 2011-07-20 18:56:35,016 INFO
> org.apache.hadoop.mapreduce.task.reduce.Fetcher: for
> url=50060/mapOutput?job=job_201107140944_0086&reduce=0&map=attempt_201107140944_0086_m_000491_0
> sent hash and receievd reply
> 2011-07-20 18:56:35,056 INFO
> org.apache.hadoop.mapreduce.task.reduce.Fetcher: fetcher#3 about to shuffle
> output of map attempt_201107140944_0086_m_000498_0 decomp: 12647556 len:
> 12647560 to MEMORY
> 2011-07-20 18:56:35,070 INFO
> org.apache.hadoop.mapreduce.task.reduce.Fetcher: fetcher#1 about to shuffle
> output of map attempt_201107140944_0086_m_000491_0 decomp: 30172760 len:
> 30172764 to MEMORY
> 2011-07-20 18:56:35,553 INFO
> org.apache.hadoop.mapreduce.task.reduce.Fetcher: Read 12647556 bytes from
> map-output for attempt_201107140944_0086_m_000498_0
> 2011-07-20 18:56:35,553 INFO
> org.apache.hadoop.mapreduce.task.reduce.MergeManager: closeInMemoryFile ->
> map-output of size: 12647556, inMemoryMapOutputs.size() -> 25
> 2011-07-20 18:56:35,553 INFO
> org.apache.hadoop.mapreduce.task.reduce.MergeManager: Starting
> inMemoryMerger's merge since usedMemory=479526428 > mergeThreshold=440963456
> 2011-07-20 18:56:35,553 INFO
> org.apache.hadoop.mapreduce.task.reduce.MergeThread: InMemoryMerger - Thread
> to merge in-memory shuffled map-outputs: Starting merge with 25 segments,
> while ignoring 0 segments
> 2011-07-20 18:56:35,554 INFO
> org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler: dc-0002:50060
> freed by fetcher#3 in 554s
> 2011-07-20 18:56:35,556 INFO
> org.apache.hadoop.mapreduce.task.reduce.MergeManager: Initiating in-memory
> merge with 25 segments...
> 2011-07-20 18:56:35,557 INFO org.apache.hadoop.mapred.Merger: Merging 25
> sorted segments
> 2011-07-20 18:56:35,557 INFO org.apache.hadoop.mapred.Merger: Down to the
> last merge-pass, with 25 segments left of total size: 449352783 bytes
> 2011-07-20 18:56:35,696 INFO
> org.apache.hadoop.mapreduce.task.reduce.Fetcher: Read 30172760 bytes from
> map-output for attempt_201107140944_0086_m_000491_0
> 2011-07-20 18:56:35,696 INFO
> org.apache.hadoop.mapreduce.task.reduce.MergeManager: closeInMemoryFile ->
> map-output of size: 30172760, inMemoryMapOutputs.size() -> 1
> 2011-07-20 18:56:35,696 INFO
> org.apache.hadoop.mapreduce.task.reduce.MergeManager: Starting
> inMemoryMerger's merge since usedMemory=479526428 > mergeThreshold=440963456
> 2011-07-20 18:56:35,696 INFO
> org.apache.hadoop.mapreduce.task.reduce.MergeThread: InMemoryMerger - Thread
> to merge in-memory shuffled map-outputs: Starting merge with 1 segments,
> while ignoring 0 segments
> 2011-07-20 18:56:35,696 INFO
> org.apache.hadoop.mapreduce.task.reduce.ShuffleScheduler: dc-0005:50060
> freed by fetcher#1 in 696s
> 2011-07-20 18:56:41,540 WARN org.apache.hadoop.mapred.Child: Exception
> running child :
> org.apache.hadoop.mapreduce.task.reduce.Shuffle$ShuffleError: error in
> shuffle in InMemoryMerger - Thread to merge in-memory shuffled map-outputs
> at org.apache.hadoop.mapreduce.task.reduce.Shuffle.run(Shuffle.java:124)
> at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:362)
> at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:396)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:742)
> at org.apache.hadoop.mapred.Child.main(Child.java:211)
> Caused by: java.lang.RuntimeException: java.io.EOFException
> at
> org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:132)
> at org.apache.hadoop.mapred.Merger$MergeQueue.lessThan(Merger.java:530)
> at org.apache.hadoop.util.PriorityQueue.downHeap(PriorityQueue.java:141)
> at org.apache.hadoop.util.PriorityQueue.adjustTop(PriorityQueue.java:108)
> at
> org.apache.hadoop.mapred.Merger$MergeQueue.adjustPriorityQueue(Merger.java:478)
> at org.apache.hadoop.mapred.Merger$MergeQueue.next(Merger.java:493)
> at org.apache.hadoop.mapred.Merger.writeFile(Merger.java:199)
> at
> org.apache.hadoop.mapreduce.task.reduce.MergeManager$InMemoryMerger.merge(MergeManager.java:443)
> at
> org.apache.hadoop.mapreduce.task.reduce.MergeThread.run(MergeThread.java:89)
> Caused by: java.io.EOFException
> at java.io.DataInputStream.readFully(DataInputStream.java:180)
> at java.io.DataInputStream.readFully(DataInputStream.java:152)
> at com.iflytek.hadoop.streaming.io.BaseKey32.readFields(BaseKey32.java:24)
> at
> org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:129)
> ... 8 more
>
> 2011-07-20 18:56:41,552 INFO org.apache.hadoop.mapred.Task: Runnning cleanup
> for the task

Mime
View raw message