Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 10A9C200BD5 for ; Thu, 8 Dec 2016 20:54:09 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0F5F3160B1F; Thu, 8 Dec 2016 19:54:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 31C7B160B0A for ; Thu, 8 Dec 2016 20:54:08 +0100 (CET) Received: (qmail 93267 invoked by uid 500); 8 Dec 2016 19:54:07 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 93258 invoked by uid 99); 8 Dec 2016 19:54:07 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 08 Dec 2016 19:54:07 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4C4A6E108E; Thu, 8 Dec 2016 19:54:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zsxwing@apache.org To: commits@spark.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-18751][CORE] Fix deadlock when SparkContext.stop is called in Utils.tryOrStopSparkContext Date: Thu, 8 Dec 2016 19:54:07 +0000 (UTC) archived-at: Thu, 08 Dec 2016 19:54:09 -0000 Repository: spark Updated Branches: refs/heads/master c3d3a9d0e -> 26432df9c [SPARK-18751][CORE] Fix deadlock when SparkContext.stop is called in Utils.tryOrStopSparkContext ## What changes were proposed in this pull request? When `SparkContext.stop` is called in `Utils.tryOrStopSparkContext` (the following three places), it will cause deadlock because the `stop` method needs to wait for the thread running `stop` to exit. - ContextCleaner.keepCleaning - LiveListenerBus.listenerThread.run - TaskSchedulerImpl.start This PR adds `SparkContext.stopInNewThread` and uses it to eliminate the potential deadlock. I also removed my changes in #15775 since they are not necessary now. ## How was this patch tested? Jenkins Author: Shixiong Zhu Closes #16178 from zsxwing/fix-stop-deadlock. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26432df9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26432df9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26432df9 Branch: refs/heads/master Commit: 26432df9cc6ffe569583aa628c6ecd7050b38316 Parents: c3d3a9d Author: Shixiong Zhu Authored: Thu Dec 8 11:54:04 2016 -0800 Committer: Shixiong Zhu Committed: Thu Dec 8 11:54:04 2016 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 35 +++++++++++--------- .../scala/org/apache/spark/rpc/RpcEnv.scala | 5 --- .../org/apache/spark/rpc/netty/Dispatcher.scala | 1 - .../apache/spark/rpc/netty/NettyRpcEnv.scala | 5 --- .../apache/spark/scheduler/DAGScheduler.scala | 2 +- .../cluster/StandaloneSchedulerBackend.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 2 +- .../org/apache/spark/rpc/RpcEnvSuite.scala | 13 -------- 8 files changed, 23 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index be4dae1..b42820a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1760,25 +1760,30 @@ class SparkContext(config: SparkConf) extends Logging { def listJars(): Seq[String] = addedJars.keySet.toSeq /** - * Shut down the SparkContext. + * When stopping SparkContext inside Spark components, it's easy to cause dead-lock since Spark + * may wait for some internal threads to finish. It's better to use this method to stop + * SparkContext instead. */ - def stop(): Unit = { - if (env.rpcEnv.isInRPCThread) { - // `stop` will block until all RPC threads exit, so we cannot call stop inside a RPC thread. - // We should launch a new thread to call `stop` to avoid dead-lock. - new Thread("stop-spark-context") { - setDaemon(true) - - override def run(): Unit = { - _stop() + private[spark] def stopInNewThread(): Unit = { + new Thread("stop-spark-context") { + setDaemon(true) + + override def run(): Unit = { + try { + SparkContext.this.stop() + } catch { + case e: Throwable => + logError(e.getMessage, e) + throw e } - }.start() - } else { - _stop() - } + } + }.start() } - private def _stop() { + /** + * Shut down the SparkContext. + */ + def stop(): Unit = { if (LiveListenerBus.withinListenerThread.value) { throw new SparkException( s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}") http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index bbc4163..530743c 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -146,11 +146,6 @@ private[spark] abstract class RpcEnv(conf: SparkConf) { * @param uri URI with location of the file. */ def openChannel(uri: String): ReadableByteChannel - - /** - * Return if the current thread is a RPC thread. - */ - def isInRPCThread: Boolean } /** http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala index 67baabd..a02cf30 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala @@ -201,7 +201,6 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { /** Message loop used for dispatching messages. */ private class MessageLoop extends Runnable { override def run(): Unit = { - NettyRpcEnv.rpcThreadFlag.value = true try { while (true) { try { http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 0b8cd14..e56943d 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -407,14 +407,9 @@ private[netty] class NettyRpcEnv( } } - - override def isInRPCThread: Boolean = NettyRpcEnv.rpcThreadFlag.value } private[netty] object NettyRpcEnv extends Logging { - - private[netty] val rpcThreadFlag = new DynamicVariable[Boolean](false) - /** * When deserializing the [[NettyRpcEndpointRef]], it needs a reference to [[NettyRpcEnv]]. * Use `currentEnv` to wrap the deserialization codes. E.g., http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 7fde34d..9378f15 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1661,7 +1661,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler } catch { case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t) } - dagScheduler.sc.stop() + dagScheduler.sc.stopInNewThread() } override def onStop(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 368cd30..7befdb0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -139,7 +139,7 @@ private[spark] class StandaloneSchedulerBackend( scheduler.error(reason) } finally { // Ensure the application terminates, as we can no longer run jobs. - sc.stop() + sc.stopInNewThread() } } } http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 91f5606..c6ad154 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1249,7 +1249,7 @@ private[spark] object Utils extends Logging { val currentThreadName = Thread.currentThread().getName if (sc != null) { logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t) - sc.stop() + sc.stopInNewThread() } if (!NonFatal(t)) { logError(s"throw uncaught fatal error in thread $currentThreadName", t) http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index aa07059..acdf21d 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -870,19 +870,6 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll { verify(endpoint, never()).onDisconnected(any()) verify(endpoint, never()).onNetworkError(any(), any()) } - - test("isInRPCThread") { - val rpcEndpointRef = env.setupEndpoint("isInRPCThread", new RpcEndpoint { - override val rpcEnv = env - - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case m => context.reply(rpcEnv.isInRPCThread) - } - }) - assert(rpcEndpointRef.askWithRetry[Boolean]("hello") === true) - assert(env.isInRPCThread === false) - env.stop(rpcEndpointRef) - } } class UnserializableClass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org