spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thomas Graves (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation
Date Tue, 27 Mar 2018 21:59:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16416309#comment-16416309
] 

Thomas Graves commented on SPARK-22618:
---------------------------------------

thanks for fixing this, hitting it now in spark 2.2, I think this same issue can happen with
broadcast variables if its told to wait, did you happen to look at that at the same time?  

> RDD.unpersist can cause fatal exception when used with dynamic allocation
> -------------------------------------------------------------------------
>
>                 Key: SPARK-22618
>                 URL: https://issues.apache.org/jira/browse/SPARK-22618
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.3.0
>            Reporter: Brad
>            Assignee: Brad
>            Priority: Minor
>             Fix For: 2.3.0
>
>
> If you use rdd.unpersist() with dynamic allocation, then an executor can be deallocated
while your rdd is being removed, which will throw an uncaught exception killing your job.

> I looked into different ways of preventing this error from occurring but couldn't come
up with anything that wouldn't require a big change. I propose the best fix is just to catch
and log IOExceptions in unpersist() so they don't kill your job. This will match the effective
behavior when executors are lost from dynamic allocation in other parts of the code.
> In the worst case scenario I think this could lead to RDD partitions getting left on
executors after they were unpersisted, but this is probably better than the whole job failing.
I think in most cases the IOException would be due to the executor dieing for some reason,
which is effectively the same result as unpersisting the rdd from that executor anyway.
> I noticed this exception in a job that loads a 100GB dataset on a cluster where we use
dynamic allocation heavily. Here is the relevant stack trace
> java.io.IOException: Connection reset by peer
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>         at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
>         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
>         at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
>         at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>         at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>         at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>         at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>         at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>         at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
>         at java.lang.Thread.run(Thread.java:748)
> Exception in thread "main" org.apache.spark.SparkException: Exception thrown in awaitResult:
>         at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)
>         at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
>         at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131)
>         at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806)
>         at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217)
>         at com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62)
>         at com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40)
>         at com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33)
>         at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
>         at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>         at scala.collection.immutable.List.map(List.scala:285)
>         at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78)
>         at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:52)
>         at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:47)
>         at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>         at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
>         at scala.collection.immutable.Range.foreach(Range.scala:160)
>         at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
>         at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
>         at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.run(SuiteKickoff.scala:47)
>         at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$MultipleSuiteKickoff$$runSuitesSerially$1.apply(MultipleSuiteKickoff.scala:24)
>         at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$MultipleSuiteKickoff$$runSuitesSerially$1.apply(MultipleSuiteKickoff.scala:24)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$.com$ibm$sparktc$sparkbench$workload$MultipleSuiteKickoff$$runSuitesSerially(MultipleSuiteKickoff.scala:24)
>         at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$run$1.apply(MultipleSuiteKickoff.scala:13)
>         at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$run$1.apply(MultipleSuiteKickoff.scala:10)
>         at scala.collection.immutable.List.foreach(List.scala:381)
>         at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$.run(MultipleSuiteKickoff.scala:10)
>         at com.ibm.sparktc.sparkbench.cli.CLIKickoff$.main(CLIKickoff.scala:16)
>         at com.ibm.sparktc.sparkbench.cli.CLIKickoff.main(CLIKickoff.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
>         at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:843)
>         at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:188)
>         at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:218)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.io.IOException: Connection reset by peer
>         at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
>         at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
>         at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
>         at sun.nio.ch.IOUtil.read(IOUtil.java:192)
>         at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
>         at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221)
>         at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899)
>         at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276)
>         at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
>         at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
>         at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
>         at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
>         at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
>         at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
>         at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
>         at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message