Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D3E17200D56 for ; Mon, 27 Nov 2017 19:41:05 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D2A29160C13; Mon, 27 Nov 2017 18:41:05 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 06320160BFA for ; Mon, 27 Nov 2017 19:41:04 +0100 (CET) Received: (qmail 20811 invoked by uid 500); 27 Nov 2017 18:41:04 -0000 Mailing-List: contact issues-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list issues@spark.apache.org Received: (qmail 20794 invoked by uid 99); 27 Nov 2017 18:41:04 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 27 Nov 2017 18:41:04 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 53B4AC1462 for ; Mon, 27 Nov 2017 18:41:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -99.011 X-Spam-Level: X-Spam-Status: No, score=-99.011 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KB_WAM_FROM_NAME_SINGLEWORD=0.2, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_WHITELIST=-100] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id zQ4XIGuNK3EM for ; Mon, 27 Nov 2017 18:41:02 +0000 (UTC) Received: from mailrelay1-us-west.apache.org (mailrelay1-us-west.apache.org [209.188.14.139]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTP id E7E345F23E for ; Mon, 27 Nov 2017 18:41:01 +0000 (UTC) Received: from jira-lw-us.apache.org (unknown [207.244.88.139]) by mailrelay1-us-west.apache.org (ASF Mail Server at mailrelay1-us-west.apache.org) with ESMTP id F3D9EE0F7D for ; Mon, 27 Nov 2017 18:41:00 +0000 (UTC) Received: from jira-lw-us.apache.org (localhost [127.0.0.1]) by jira-lw-us.apache.org (ASF Mail Server at jira-lw-us.apache.org) with ESMTP id 621C2241BB for ; Mon, 27 Nov 2017 18:41:00 +0000 (UTC) Date: Mon, 27 Nov 2017 18:41:00 +0000 (UTC) From: "Brad (JIRA)" To: issues@spark.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (SPARK-22618) RDD.unpersist can cause fatal exception when used with dynamic allocation MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Mon, 27 Nov 2017 18:41:06 -0000 [ https://issues.apache.org/jira/browse/SPARK-22618?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16267223#comment-16267223 ] Brad commented on SPARK-22618: ------------------------------ I have a PR for this forthcoming. > RDD.unpersist can cause fatal exception when used with dynamic allocation > ------------------------------------------------------------------------- > > Key: SPARK-22618 > URL: https://issues.apache.org/jira/browse/SPARK-22618 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 2.3.0 > Reporter: Brad > Priority: Minor > Fix For: 2.3.0 > > > If you use rdd.unpersist() with dynamic allocation, then an executor can be deallocated while your rdd is being removed, which will throw an uncaught exception killing your job. > I looked into different ways of preventing this error from occurring but couldn't come up with anything that wouldn't require a big change. I propose the best fix is just to catch and log IOExceptions in unpersist() so they don't kill your job. This will match the effective behavior when executors are lost from dynamic allocation in other parts of the code. > In the worst case scenario I think this could lead to RDD partitions getting left on executors after they were unpersisted, but this is probably better than the whole job failing. I think in most cases the IOException would be due to the executor dieing for some reason, which is effectively the same result as unpersisting the rdd from that executor anyway. > I noticed this exception in a job that loads a 100GB dataset on a cluster where we use dynamic allocation heavily. Here is the relevant stack trace > java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276) > at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) > Exception in thread "main" org.apache.spark.SparkException: Exception thrown in awaitResult: > at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205) > at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) > at org.apache.spark.storage.BlockManagerMaster.removeRdd(BlockManagerMaster.scala:131) > at org.apache.spark.SparkContext.unpersistRDD(SparkContext.scala:1806) > at org.apache.spark.rdd.RDD.unpersist(RDD.scala:217) > at com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.doWorkload(CacheTest.scala:62) > at com.ibm.sparktc.sparkbench.workload.Workload$class.run(Workload.scala:40) > at com.ibm.sparktc.sparkbench.workload.exercise.CacheTest.run(CacheTest.scala:33) > at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially$1.apply(SuiteKickoff.scala:78) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.immutable.List.foreach(List.scala:381) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) > at scala.collection.immutable.List.map(List.scala:285) > at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.com$ibm$sparktc$sparkbench$workload$SuiteKickoff$$runSerially(SuiteKickoff.scala:78) > at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:52) > at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$$anonfun$2.apply(SuiteKickoff.scala:47) > at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) > at scala.collection.immutable.Range.foreach(Range.scala:160) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104) > at com.ibm.sparktc.sparkbench.workload.SuiteKickoff$.run(SuiteKickoff.scala:47) > at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$MultipleSuiteKickoff$$runSuitesSerially$1.apply(MultipleSuiteKickoff.scala:24) > at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$com$ibm$sparktc$sparkbench$workload$MultipleSuiteKickoff$$runSuitesSerially$1.apply(MultipleSuiteKickoff.scala:24) > at scala.collection.immutable.List.foreach(List.scala:381) > at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$.com$ibm$sparktc$sparkbench$workload$MultipleSuiteKickoff$$runSuitesSerially(MultipleSuiteKickoff.scala:24) > at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$run$1.apply(MultipleSuiteKickoff.scala:13) > at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$$anonfun$run$1.apply(MultipleSuiteKickoff.scala:10) > at scala.collection.immutable.List.foreach(List.scala:381) > at com.ibm.sparktc.sparkbench.workload.MultipleSuiteKickoff$.run(MultipleSuiteKickoff.scala:10) > at com.ibm.sparktc.sparkbench.cli.CLIKickoff$.main(CLIKickoff.scala:16) > at com.ibm.sparktc.sparkbench.cli.CLIKickoff.main(CLIKickoff.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) > at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:843) > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:188) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:218) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:127) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > Caused by: java.io.IOException: Connection reset by peer > at sun.nio.ch.FileDispatcherImpl.read0(Native Method) > at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) > at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) > at sun.nio.ch.IOUtil.read(IOUtil.java:192) > at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) > at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:221) > at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:899) > at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:276) > at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119) > at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) > at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) > at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) > at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131) > at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org For additional commands, e-mail: issues-help@spark.apache.org