spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Imran Rashid (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
Date Tue, 19 Jun 2018 03:25:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16516611#comment-16516611
] 

Imran Rashid commented on SPARK-24578:
--------------------------------------

btw to answer your initial questions:

{quote}
1. what is the right behavior, should we re-compute or should we transfer block from remote?
2. if we should transfer from remote, why the performance is so bad for cache block?
{quote}

Spark should try to fetch from the remote, and if that fails for some reason, it should fall
back to recomputing.  so you should actually see a similar sequence of events in the logs
in spark 2.3.0 as you do in your spark 2.2.1 snippet -- the difference being the remote fetch
fails in 2.3.0, and so instead it does the recomputation.  The real issue here is why the
remote fetches are failing.  Unfortunately there isn't a ton of info in that stack trace --
are there any other warning messages before that in the logs?

I can see how setting spark.locality.wait lets you workaround this, but its definitely not
an ideal solution.

I wouldn't rule out that commit you mentioned as the issue -- its certainly possible.

> Reading remote cache block behavior changes and causes timeout issue
> --------------------------------------------------------------------
>
>                 Key: SPARK-24578
>                 URL: https://issues.apache.org/jira/browse/SPARK-24578
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.0, 2.3.1
>            Reporter: Wenbo Zhao
>            Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following in some of our production
job
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003,
chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865;
closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2)
each with 8 cores. Here, the memory of driver and executors are not an import factor here
as long as it is big enough, say 20G. 
> {code:java}
> val n = 100000000
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show()
}
> {code}
>  
> In the above example, we generate a random DataFrame of size around 7G; cache it and
then perform a parallel DataFrame operations by using `array.par.map`. Because of the parallel
computation, with high possibility, some task could be scheduled to a host-2 where it needs
to read the cache block data from host-1. This follows the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] and
then tries to transfer a big block (~ 500MB) of cache block from host-1 to host-2. Often,
this big transfer makes the cluster suffer time out issue (it will retry 3 times, each with
120s timeout, and then do recompute to put the cache block into the local MemoryStore).
> We couldn't to reproduce the same issue in Spark 2.2.1. From the log of Spark 2.2.1,
we found that 
> {code:java}
> 18/06/16 17:23:47 DEBUG BlockManager: Getting local block rdd_3_0 
> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire read lock for rdd_3_0

> 18/06/16 17:23:47 DEBUG BlockManager: Block rdd_3_0 was not found 
> 18/06/16 17:23:47 DEBUG BlockManager: Getting remote block rdd_3_0 
> 18/06/16 17:23:47 DEBUG BlockManager: Block rdd_3_0 not found 
> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to put rdd_3_0 
> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire read lock for rdd_3_0

> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire write lock for rdd_3_0

> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 acquired write lock for rdd_3_0 
> 18/06/16 17:23:58 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated
size 538.2 MB, free 11.1 GB)
> {code}
> That is, when a task is scheduled to a host-2 where it needs to read the cache block
rdd_3_0 data from host-1, the endpoint of `master.getLocations(..)` ( see [https://github.com/apache/spark/blob/v2.2.1/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L622]) reports
a remote cache block is not found and triggered the recompute.  
> -I believe this behavior change is introduced by this change set  [https://github.com/apache/spark/commit/e1960c3d6f380b0dfbba6ee5d8ac6da4bc29a698#diff-2b643ea78c1add0381754b1f47eec132]- 
> We have two questions here
>  # what is the right behavior, should we re-compute or should we transfer block from
remote?
>  # if we should transfer from remote, why the performance is so bad for cache block?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message