spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Apache Spark (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-24307) Support sending messages over 2GB from memory
Date Sun, 27 May 2018 02:03:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16491876#comment-16491876
] 

Apache Spark commented on SPARK-24307:
--------------------------------------

User 'squito' has created a pull request for this issue:
https://github.com/apache/spark/pull/21440

> Support sending messages over 2GB from memory
> ---------------------------------------------
>
>                 Key: SPARK-24307
>                 URL: https://issues.apache.org/jira/browse/SPARK-24307
>             Project: Spark
>          Issue Type: Sub-task
>          Components: Block Manager, Spark Core
>    Affects Versions: 2.3.0
>            Reporter: Imran Rashid
>            Priority: Major
>
> Spark's networking layer supports sending messages backed by a {{FileRegion}} or a {{ByteBuf}}.
 Sending large FileRegion's works, as netty supports large FileRegions.   However, {{ByteBuf}}
is limited to 2GB.  This is particularly a problem for sending large datasets that are already
in memory, eg.  cached RDD blocks.
> eg. if you try to replicate a block stored in memory that is over 2 GB, you will see
an exception like:
> {noformat}
> 18/05/16 12:40:57 ERROR client.TransportClient: Failed to send RPC 7420542363232096629
to xyz.com/172.31.113.213:44358: io.netty.handler.codec.EncoderException: java.lang.IndexOutOfBoundsException:
readerIndex: 0, writerIndex: -1294617291 (expected: 0 <= readerIndex <= writerIndex
<= capacity(-1294617291))
> io.netty.handler.codec.EncoderException: java.lang.IndexOutOfBoundsException: readerIndex:
0, writerIndex: -1294617291 (expected: 0 <= readerIndex <= writerIndex <= capacity(-1294617291))
>         at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:106)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
>         at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816)
>         at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723)
>         at io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
>         at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730)
>         at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38)
>         at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1081)
>         at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1128)
>         at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070)
>         at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>         at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
>         at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
>         at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.IndexOutOfBoundsException: readerIndex: 0, writerIndex: -1294617291
(expected: 0 <= readerIndex <= writerIndex <= capacity(-1294617291))
>         at io.netty.buffer.AbstractByteBuf.setIndex(AbstractByteBuf.java:129)
>         at io.netty.buffer.CompositeByteBuf.setIndex(CompositeByteBuf.java:1688)
>         at io.netty.buffer.CompositeByteBuf.<init>(CompositeByteBuf.java:110)
>         at io.netty.buffer.Unpooled.wrappedBuffer(Unpooled.java:359)
>         at org.apache.spark.util.io.ChunkedByteBuffer.toNetty(ChunkedByteBuffer.scala:87)
>         at org.apache.spark.storage.ByteBufferBlockData.toNetty(BlockManager.scala:95)
>         at org.apache.spark.storage.BlockManagerManagedBuffer.convertToNetty(BlockManagerManagedBuffer.scala:52)
>         at org.apache.spark.network.protocol.MessageEncoder.encode(MessageEncoder.java:58)
>         at org.apache.spark.network.protocol.MessageEncoder.encode(MessageEncoder.java:33)
>         at io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:88)
>         ... 17 more
> {noformat}
> A simple solution to this is to create a "FileRegion" which is backed by a {{ChunkedByteBuffer}}
(spark's existing datastructure to support blocks > 2GB in memory). 
>  A drawback to this approach is that blocks that are cached in memory as deserialized
values would need to have the *entire* block serialized into memory before it can be pushed.
   However, that would involve a larger change to the block manager as well, and is not strictly
necessary, so can be handled separately as a performance improvement.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message