spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Allan Douglas R. de Oliveira (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-5928) Remote Shuffle Blocks cannot be more than 2 GB
Date Mon, 23 Mar 2015 22:48:52 GMT

    [ https://issues.apache.org/jira/browse/SPARK-5928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14376833#comment-14376833
] 

Allan Douglas R. de Oliveira commented on SPARK-5928:
-----------------------------------------------------

I will answer with the info I have right know, later I'll try to get more information:
a) I don't have the exact number now, but it's something like 300G of shuffle read/write
b) It was working with 1600 partitions before this started to happen, then after the error
we increased to 10000 but got pretty much the same exception.
c) Yes, it happens some minutes later generally. We've got the job back in production changing
to lz4. But I feel that the problem will come back.
d) Yes, and I think that issue also happens with other errors (e.g if the BlockManager timeouts
because of too much GC)

> Remote Shuffle Blocks cannot be more than 2 GB
> ----------------------------------------------
>
>                 Key: SPARK-5928
>                 URL: https://issues.apache.org/jira/browse/SPARK-5928
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>            Reporter: Imran Rashid
>
> If a shuffle block is over 2GB, the shuffle fails, with an uninformative exception. 
The tasks get retried a few times and then eventually the job fails.
> Here is an example program which can cause the exception:
> {code}
>     val rdd = sc.parallelize(1 to 1e6.toInt, 1).map{ ignore =>
>       val n = 3e3.toInt
>       val arr = new Array[Byte](n)
>       //need to make sure the array doesn't compress to something small
>       scala.util.Random.nextBytes(arr)
>       arr
>     }
>     rdd.map { x => (1, x)}.groupByKey().count()
> {code}
> Note that you can't trigger this exception in local mode, it only happens on remote fetches.
  I triggered these exceptions running with {{MASTER=yarn-client spark-shell --num-executors
2 --executor-memory 4000m}}
> {noformat}
> 15/02/20 11:10:23 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3, imran-3.ent.cloudera.com):
FetchFailed(BlockManagerId(1, imran-2.ent.cloudera.com, 55028), shuffleId=1, mapId=0, reduceId=0,
message=
> org.apache.spark.shuffle.FetchFailedException: Adjusted frame length exceeds 2147483647:
3021252889 - discarded
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> 	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
> 	at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> 	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
> 	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:56)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: io.netty.handler.codec.TooLongFrameException: Adjusted frame length exceeds
2147483647: 3021252889 - discarded
> 	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.fail(LengthFieldBasedFrameDecoder.java:501)
> 	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.failIfNecessary(LengthFieldBasedFrameDecoder.java:477)
> 	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:403)
> 	at io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:343)
> 	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:249)
> 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:149)
> 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> 	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
> 	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> 	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> 	... 1 more
> )
> {noformat}
> or if you use "spark.shuffle.blockTransferService=nio", then you get:
> {noformat}
> 15/02/20 12:48:07 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, imran-2.ent.cloudera.com):
FetchFailed(BlockManagerId(2, imran-3.ent.cloudera.com, 42827), shuffleId=0, mapId=0, reduceId=0,
message=
> org.apache.spark.shuffle.FetchFailedException: sendMessageReliably failed with ACK that
signalled a remote error: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> 	at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
> 	at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
> 	at org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
> 	at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
> 	at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
> 	at org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
> 	at org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> 	at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
> 	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
> 	at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
> 	at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:58)
> 	at org.apache.spark.shuffle.hash.HashShuffleReader.read(HashShuffleReader.scala:46)
> 	at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:92)
> 	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
> 	at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
> 	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:56)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.IOException: sendMessageReliably failed with ACK that signalled a
remote error: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
> 	at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:828)
> 	at org.apache.spark.network.buffer.FileSegmentManagedBuffer.nioByteBuffer(FileSegmentManagedBuffer.java:76)
> 	at org.apache.spark.network.nio.NioBlockTransferService.getBlock(NioBlockTransferService.scala:215)
> 	at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$processBlockMessage(NioBlockTransferService.scala:191)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$2.apply(NioBlockTransferService.scala:165)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at org.apache.spark.network.nio.BlockMessageArray.foreach(BlockMessageArray.scala:28)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at org.apache.spark.network.nio.BlockMessageArray.map(BlockMessageArray.scala:28)
> 	at org.apache.spark.network.nio.NioBlockTransferService.org$apache$spark$network$nio$NioBlockTransferService$$onBlockMessageReceive(NioBlockTransferService.scala:165)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
> 	at org.apache.spark.network.nio.NioBlockTransferService$$anonfun$init$1.apply(NioBlockTransferService.scala:70)
> 	at org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:750)
> 	at org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
> 	at org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:954)
> 	at org.apache.spark.network.nio.ConnectionManager$$anonfun$14.apply(ConnectionManager.scala:940)
> 	at org.apache.spark.network.nio.ConnectionManager$MessageStatus.success(ConnectionManager.scala:67)
> 	at org.apache.spark.network.nio.ConnectionManager.org$apache$spark$network$nio$ConnectionManager$$handleMessage(ConnectionManager.scala:728)
> 	at org.apache.spark.network.nio.ConnectionManager$$anon$12.run(ConnectionManager.scala:581)
> 	... 3 more
> )
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message