spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-18751][CORE] Fix deadlock when SparkContext.stop is called in Utils.tryOrStopSparkContext
Date Thu, 08 Dec 2016 19:54:07 GMT
Repository: spark
Updated Branches:
  refs/heads/master c3d3a9d0e -> 26432df9c


[SPARK-18751][CORE] Fix deadlock when SparkContext.stop is called in Utils.tryOrStopSparkContext

## What changes were proposed in this pull request?

When `SparkContext.stop` is called in `Utils.tryOrStopSparkContext` (the following three places),
it will cause deadlock because the `stop` method needs to wait for the thread running `stop`
to exit.

- ContextCleaner.keepCleaning
- LiveListenerBus.listenerThread.run
- TaskSchedulerImpl.start

This PR adds `SparkContext.stopInNewThread` and uses it to eliminate the potential deadlock.
I also removed my changes in #15775 since they are not necessary now.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16178 from zsxwing/fix-stop-deadlock.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26432df9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26432df9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26432df9

Branch: refs/heads/master
Commit: 26432df9cc6ffe569583aa628c6ecd7050b38316
Parents: c3d3a9d
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Thu Dec 8 11:54:04 2016 -0800
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Thu Dec 8 11:54:04 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 35 +++++++++++---------
 .../scala/org/apache/spark/rpc/RpcEnv.scala     |  5 ---
 .../org/apache/spark/rpc/netty/Dispatcher.scala |  1 -
 .../apache/spark/rpc/netty/NettyRpcEnv.scala    |  5 ---
 .../apache/spark/scheduler/DAGScheduler.scala   |  2 +-
 .../cluster/StandaloneSchedulerBackend.scala    |  2 +-
 .../scala/org/apache/spark/util/Utils.scala     |  2 +-
 .../org/apache/spark/rpc/RpcEnvSuite.scala      | 13 --------
 8 files changed, 23 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index be4dae1..b42820a 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1760,25 +1760,30 @@ class SparkContext(config: SparkConf) extends Logging {
   def listJars(): Seq[String] = addedJars.keySet.toSeq
 
   /**
-   * Shut down the SparkContext.
+   * When stopping SparkContext inside Spark components, it's easy to cause dead-lock since
Spark
+   * may wait for some internal threads to finish. It's better to use this method to stop
+   * SparkContext instead.
    */
-  def stop(): Unit = {
-    if (env.rpcEnv.isInRPCThread) {
-      // `stop` will block until all RPC threads exit, so we cannot call stop inside a RPC
thread.
-      // We should launch a new thread to call `stop` to avoid dead-lock.
-      new Thread("stop-spark-context") {
-        setDaemon(true)
-
-        override def run(): Unit = {
-          _stop()
+  private[spark] def stopInNewThread(): Unit = {
+    new Thread("stop-spark-context") {
+      setDaemon(true)
+
+      override def run(): Unit = {
+        try {
+          SparkContext.this.stop()
+        } catch {
+          case e: Throwable =>
+            logError(e.getMessage, e)
+            throw e
         }
-      }.start()
-    } else {
-      _stop()
-    }
+      }
+    }.start()
   }
 
-  private def _stop() {
+  /**
+   * Shut down the SparkContext.
+   */
+  def stop(): Unit = {
     if (LiveListenerBus.withinListenerThread.value) {
       throw new SparkException(
         s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}")

http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
index bbc4163..530743c 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
@@ -146,11 +146,6 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
    * @param uri URI with location of the file.
    */
   def openChannel(uri: String): ReadableByteChannel
-
-  /**
-   * Return if the current thread is a RPC thread.
-   */
-  def isInRPCThread: Boolean
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
index 67baabd..a02cf30 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala
@@ -201,7 +201,6 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging
{
   /** Message loop used for dispatching messages. */
   private class MessageLoop extends Runnable {
     override def run(): Unit = {
-      NettyRpcEnv.rpcThreadFlag.value = true
       try {
         while (true) {
           try {

http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
index 0b8cd14..e56943d 100644
--- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala
@@ -407,14 +407,9 @@ private[netty] class NettyRpcEnv(
     }
 
   }
-
-  override def isInRPCThread: Boolean = NettyRpcEnv.rpcThreadFlag.value
 }
 
 private[netty] object NettyRpcEnv extends Logging {
-
-  private[netty] val rpcThreadFlag = new DynamicVariable[Boolean](false)
-
   /**
    * When deserializing the [[NettyRpcEndpointRef]], it needs a reference to [[NettyRpcEnv]].
    * Use `currentEnv` to wrap the deserialization codes. E.g.,

http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 7fde34d..9378f15 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -1661,7 +1661,7 @@ private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler:
DAGScheduler
     } catch {
       case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
     }
-    dagScheduler.sc.stop()
+    dagScheduler.sc.stopInNewThread()
   }
 
   override def onStop(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 368cd30..7befdb0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -139,7 +139,7 @@ private[spark] class StandaloneSchedulerBackend(
         scheduler.error(reason)
       } finally {
         // Ensure the application terminates, as we can no longer run jobs.
-        sc.stop()
+        sc.stopInNewThread()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 91f5606..c6ad154 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1249,7 +1249,7 @@ private[spark] object Utils extends Logging {
         val currentThreadName = Thread.currentThread().getName
         if (sc != null) {
           logError(s"uncaught error in thread $currentThreadName, stopping SparkContext",
t)
-          sc.stop()
+          sc.stopInNewThread()
         }
         if (!NonFatal(t)) {
           logError(s"throw uncaught fatal error in thread $currentThreadName", t)

http://git-wip-us.apache.org/repos/asf/spark/blob/26432df9/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
index aa07059..acdf21d 100644
--- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala
@@ -870,19 +870,6 @@ abstract class RpcEnvSuite extends SparkFunSuite with BeforeAndAfterAll
{
     verify(endpoint, never()).onDisconnected(any())
     verify(endpoint, never()).onNetworkError(any(), any())
   }
-
-  test("isInRPCThread") {
-    val rpcEndpointRef = env.setupEndpoint("isInRPCThread", new RpcEndpoint {
-      override val rpcEnv = env
-
-      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] =
{
-        case m => context.reply(rpcEnv.isInRPCThread)
-      }
-    })
-    assert(rpcEndpointRef.askWithRetry[Boolean]("hello") === true)
-    assert(env.isInRPCThread === false)
-    env.stop(rpcEndpointRef)
-  }
 }
 
 class UnserializableClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message