spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-11104] [STREAMING] Fix a deadlock in StreamingContex.stop
Date Fri, 16 Oct 2015 20:57:29 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.5 9814de0bd -> fa7c3ead7


[SPARK-11104] [STREAMING] Fix a deadlock in StreamingContex.stop

The following deadlock may happen if shutdownHook and StreamingContext.stop are running at
the same time.
```
Java stack information for the threads listed above:
===================================================
"Thread-2":
	at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:699)
	- waiting to lock <0x00000005405a1680> (a org.apache.spark.streaming.StreamingContext)
	at org.apache.spark.streaming.StreamingContext.org$apache$spark$streaming$StreamingContext$$stopOnShutdown(StreamingContext.scala:729)
	at org.apache.spark.streaming.StreamingContext$$anonfun$start$1.apply$mcV$sp(StreamingContext.scala:625)
	at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:266)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:236)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1697)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:236)
	at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:236)
	at scala.util.Try$.apply(Try.scala:161)
	at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:236)
	- locked <0x00000005405b6a00> (a org.apache.spark.util.SparkShutdownHookManager)
	at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
"main":
	at org.apache.spark.util.SparkShutdownHookManager.remove(ShutdownHookManager.scala:248)
	- waiting to lock <0x00000005405b6a00> (a org.apache.spark.util.SparkShutdownHookManager)
	at org.apache.spark.util.ShutdownHookManager$.removeShutdownHook(ShutdownHookManager.scala:199)
	at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:712)
	- locked <0x00000005405a1680> (a org.apache.spark.streaming.StreamingContext)
	at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:684)
	- locked <0x00000005405a1680> (a org.apache.spark.streaming.StreamingContext)
	at org.apache.spark.streaming.SessionByKeyBenchmark$.main(SessionByKeyBenchmark.scala:108)
	at org.apache.spark.streaming.SessionByKeyBenchmark.main(SessionByKeyBenchmark.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:680)
	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```

This PR just moved `ShutdownHookManager.removeShutdownHook` out of `synchronized` to avoid
deadlock.

Author: zsxwing <zsxwing@gmail.com>

Closes #9116 from zsxwing/stop-deadlock.

(cherry picked from commit e1eef248f13f6c334fe4eea8a29a1de5470a2e62)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa7c3ead
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa7c3ead
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa7c3ead

Branch: refs/heads/branch-1.5
Commit: fa7c3ead7cf964785691e8d1d8cd4bd5f41b1cec
Parents: 9814de0
Author: zsxwing <zsxwing@gmail.com>
Authored: Fri Oct 16 13:56:51 2015 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Fri Oct 16 13:57:25 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/StreamingContext.scala      | 55 +++++++++++---------
 1 file changed, 31 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fa7c3ead/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index bcd98ea..e97dd4c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -700,32 +700,39 @@ class StreamingContext private[streaming] (
    * @param stopGracefully if true, stops gracefully by waiting for the processing of all
    *                       received data to be completed
    */
-  def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = synchronized {
-    try {
-      state match {
-        case INITIALIZED =>
-          logWarning("StreamingContext has not been started yet")
-        case STOPPED =>
-          logWarning("StreamingContext has already been stopped")
-        case ACTIVE =>
-          scheduler.stop(stopGracefully)
-          // Removing the streamingSource to de-register the metrics on stop()
-          env.metricsSystem.removeSource(streamingSource)
-          uiTab.foreach(_.detach())
-          StreamingContext.setActiveContext(null)
-          waiter.notifyStop()
-          if (shutdownHookRef != null) {
-            ShutdownHookManager.removeShutdownHook(shutdownHookRef)
-          }
-          logInfo("StreamingContext stopped successfully")
+  def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {
+    var shutdownHookRefToRemove: AnyRef = null
+    synchronized {
+      try {
+        state match {
+          case INITIALIZED =>
+            logWarning("StreamingContext has not been started yet")
+          case STOPPED =>
+            logWarning("StreamingContext has already been stopped")
+          case ACTIVE =>
+            scheduler.stop(stopGracefully)
+            // Removing the streamingSource to de-register the metrics on stop()
+            env.metricsSystem.removeSource(streamingSource)
+            uiTab.foreach(_.detach())
+            StreamingContext.setActiveContext(null)
+            waiter.notifyStop()
+            if (shutdownHookRef != null) {
+              shutdownHookRefToRemove = shutdownHookRef
+              shutdownHookRef = null
+            }
+            logInfo("StreamingContext stopped successfully")
+        }
+      } finally {
+        // The state should always be Stopped after calling `stop()`, even if we haven't
started yet
+        state = STOPPED
       }
-      // Even if we have already stopped, we still need to attempt to stop the SparkContext
because
-      // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext
= true).
-      if (stopSparkContext) sc.stop()
-    } finally {
-      // The state should always be Stopped after calling `stop()`, even if we haven't started
yet
-      state = STOPPED
     }
+    if (shutdownHookRefToRemove != null) {
+      ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)
+    }
+    // Even if we have already stopped, we still need to attempt to stop the SparkContext
because
+    // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext =
true).
+    if (stopSparkContext) sc.stop()
   }
 
   private def stopOnShutdown(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message