spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Wenbo Zhao (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-24578) Reading remote cache block behavior changes and causes timeout issue
Date Mon, 18 Jun 2018 15:02:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-24578?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Wenbo Zhao updated SPARK-24578:
-------------------------------
    Description: 
After Spark 2.3, we observed lots of errors like the following
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003,
chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865;
closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
 

Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2) each
with 8 cores (the memory of driver and executors are not a import factor here as long as it
is big enough, say 20G).
{code:java}
val n = 100000000
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show()
}
{code}
 

In the above example, we generated a random DataFrame of size around 7G; cache it and then
did a parallel DataFrame operations by using `array.par.map`. Because of the parallel computation,
with high possibility, some task will be scheduled to a host-2 where it needs to read the
cache block data from host-1. This will follow the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] then
try to transfer a big block (~ 600MB) of cache block from host-1 to host-2. Often, this big
transfer made the cluster suffer time out issue (it will retry 3 times, each with 120s timeout,
and then do recompute to put the cache block into the local MemoryStore).

We couldn't to reproduce the same issue in Spark 2.2.1. From the log of Spark 2.2.1, we
found that  
{code:java}
18/06/16 17:23:47 DEBUG BlockManager: Getting local block rdd_3_0 
18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire read lock for rdd_3_0 
18/06/16 17:23:47 DEBUG BlockManager: Block rdd_3_0 was not found 
18/06/16 17:23:47 DEBUG BlockManager: Getting remote block rdd_3_0 
18/06/16 17:23:47 DEBUG BlockManager: Block rdd_3_0 not found 
18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to put rdd_3_0 
18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire read lock for rdd_3_0 
18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire write lock for rdd_3_0

18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 acquired write lock for rdd_3_0 
18/06/16 17:23:58 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated size
538.2 MB, free 11.1 GB)
{code}
That is, when a task is scheduled to a host-2 where it needs to read the cache block
rdd_3_0 data from host-1, the endpoint of `master.getLocations(..)` ( see [https://github.com/apache/spark/blob/v2.2.1/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L622]) reports
a remote cache block is not found and triggered the recompute.  

I believe this behavior change is introduced by this change set  [https://github.com/apache/spark/commit/e1960c3d6f380b0dfbba6ee5d8ac6da4bc29a698#diff-2b643ea78c1add0381754b1f47eec132] 

We have two questions here
 # what is the right behavior, should we re-compute or should we transfer block from remote?
 # if we should transfer from remote, why the performance is so bad for cache block?

 

  was:
After Spark 2.3, we observed lots of errors like the following

 
{code:java}
18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId

{streamId=91672904003, chunkIndex=0}

, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865;
closing connection
java.io.IOException: Broken pipe
at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
at sun.nio.ch.IOUtil.write(IOUtil.java:65)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
{code}
Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2) each
with 8 cores (the memory of driver and executors are not a import factor here as long as it
is big enough, say 10G).

 

 
{code:java}
val n = 100000000
val df0 = sc.parallelize(1 to n).toDF
val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
).withColumn("x1", rand()
).withColumn("x2", rand()
).withColumn("x3", rand()
).withColumn("x4", rand()
).withColumn("x5", rand()
).withColumn("x6", rand()
).withColumn("x7", rand()
).withColumn("x8", rand()
).withColumn("x9", rand())

df.cache; df.count

(1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show()
}
{code}
 

In the above example, we generated a random DataFrame of size around 7G; cache it and then
did a parallel DataFrame operations by using `array.par.map`. Because of the parallel computation,
with high possibility, some task will be scheduled to a host-2 where the task needs to read
the cache block data from host-1. This will follow the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] then
try to transfer a big block (~ 600MB) of cache block from host-1 to host-2. Often, this big
transfer made the cluster suffer time out issue. 

We couldn't to reproduce the same issue in Spark 2.2.1. From the log of Spark 2.2.1, we
found that 

 
{code:java}
18/06/16 17:23:47 DEBUG BlockManager: Getting local block rdd_3_0 
18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire read lock for rdd_3_0 
18/06/16 17:23:47 DEBUG BlockManager: Block rdd_3_0 was not found 
18/06/16 17:23:47 DEBUG BlockManager: Getting remote block rdd_3_0 
18/06/16 17:23:47 DEBUG BlockManager: Block rdd_3_0 not found 
18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to put rdd_3_0 
18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire read lock for rdd_3_0 
18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire write lock for rdd_3_0

18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 acquired write lock for rdd_3_0 
18/06/16 17:23:58 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated size
538.2 MB, free 11.1 GB)
{code}
That is, when a task is scheduled to a host-2 where it needs to read the cache block
data from host-1, the endpoint of `master.getLocations(..)` ( see [https://github.com/apache/spark/blob/v2.2.1/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L622]) reports
a remote cache block is not found and triggered the recompute. 

 

I believe this behavior change is introduced by this change set  [https://github.com/apache/spark/commit/e1960c3d6f380b0dfbba6ee5d8ac6da4bc29a698#diff-2b643ea78c1add0381754b1f47eec132] 

We have two questions here
 # what is the right behavior, should we re-compute or should we transfer block from remote?
 # if we should transfer from remote, why the performance is so bad for cache block?

 


> Reading remote cache block behavior changes and causes timeout issue
> --------------------------------------------------------------------
>
>                 Key: SPARK-24578
>                 URL: https://issues.apache.org/jira/browse/SPARK-24578
>             Project: Spark
>          Issue Type: Bug
>          Components: Input/Output
>    Affects Versions: 2.3.0, 2.3.1
>            Reporter: Wenbo Zhao
>            Priority: Major
>
> After Spark 2.3, we observed lots of errors like the following
> {code:java}
> 18/06/15 20:59:42 ERROR TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=91672904003,
chunkIndex=0}, buffer=org.apache.spark.storage.BlockManagerManagedBuffer@783a9324} to /172.22.18.7:60865;
closing connection
> java.io.IOException: Broken pipe
> at sun.nio.ch.FileDispatcherImpl.write0(Native Method)
> at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47)
> at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93)
> at sun.nio.ch.IOUtil.write(IOUtil.java:65)
> at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471)
> at org.apache.spark.network.protocol.MessageWithHeader.writeNioBuffer(MessageWithHeader.java:156)
> at org.apache.spark.network.protocol.MessageWithHeader.copyByteBuf(MessageWithHeader.java:142)
> at org.apache.spark.network.protocol.MessageWithHeader.transferTo(MessageWithHeader.java:123)
> at io.netty.channel.socket.nio.NioSocketChannel.doWriteFileRegion(NioSocketChannel.java:355)
> at io.netty.channel.nio.AbstractNioByteChannel.doWrite(AbstractNioByteChannel.java:224)
> at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:382)
> at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934)
> at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:362)
> at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:901)
> at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1321)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)
> at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)
> at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)
> at io.netty.channel.DefaultChannelPipeline.flush(DefaultChannelPipeline.java:983)
> at io.netty.channel.AbstractChannel.flush(AbstractChannel.java:248)
> at io.netty.channel.nio.AbstractNioByteChannel$1.run(AbstractNioByteChannel.java:284)
> at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
> at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463)
> at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
> at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
> {code}
>  
> Here is a small reproducible for a small cluster of 2 executors (say host-1 and host-2)
each with 8 cores (the memory of driver and executors are not a import factor here as long
as it is big enough, say 20G).
> {code:java}
> val n = 100000000
> val df0 = sc.parallelize(1 to n).toDF
> val df = df0.withColumn("x0", rand()).withColumn("x0", rand()
> ).withColumn("x1", rand()
> ).withColumn("x2", rand()
> ).withColumn("x3", rand()
> ).withColumn("x4", rand()
> ).withColumn("x5", rand()
> ).withColumn("x6", rand()
> ).withColumn("x7", rand()
> ).withColumn("x8", rand()
> ).withColumn("x9", rand())
> df.cache; df.count
> (1 to 10).toArray.par.map { i => println(i); df.groupBy("x1").agg(count("value")).show()
}
> {code}
>  
> In the above example, we generated a random DataFrame of size around 7G; cache it and
then did a parallel DataFrame operations by using `array.par.map`. Because of the parallel
computation, with high possibility, some task will be scheduled to a host-2 where it needs
to read the cache block data from host-1. This will follow the code path of [https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L691] then
try to transfer a big block (~ 600MB) of cache block from host-1 to host-2. Often, this big
transfer made the cluster suffer time out issue (it will retry 3 times, each with 120s timeout,
and then do recompute to put the cache block into the local MemoryStore).
> We couldn't to reproduce the same issue in Spark 2.2.1. From the log of Spark 2.2.1,
we found that  
> {code:java}
> 18/06/16 17:23:47 DEBUG BlockManager: Getting local block rdd_3_0 
> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire read lock for rdd_3_0

> 18/06/16 17:23:47 DEBUG BlockManager: Block rdd_3_0 was not found 
> 18/06/16 17:23:47 DEBUG BlockManager: Getting remote block rdd_3_0 
> 18/06/16 17:23:47 DEBUG BlockManager: Block rdd_3_0 not found 
> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to put rdd_3_0 
> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire read lock for rdd_3_0

> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 trying to acquire write lock for rdd_3_0

> 18/06/16 17:23:47 TRACE BlockInfoManager: Task 0 acquired write lock for rdd_3_0 
> 18/06/16 17:23:58 INFO MemoryStore: Block rdd_3_0 stored as values in memory (estimated
size 538.2 MB, free 11.1 GB)
> {code}
> That is, when a task is scheduled to a host-2 where it needs to read the cache block
rdd_3_0 data from host-1, the endpoint of `master.getLocations(..)` ( see [https://github.com/apache/spark/blob/v2.2.1/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L622]) reports
a remote cache block is not found and triggered the recompute.  
> I believe this behavior change is introduced by this change set  [https://github.com/apache/spark/commit/e1960c3d6f380b0dfbba6ee5d8ac6da4bc29a698#diff-2b643ea78c1add0381754b1f47eec132] 
> We have two questions here
>  # what is the right behavior, should we re-compute or should we transfer block from
remote?
>  # if we should transfer from remote, why the performance is so bad for cache block?
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message