spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-11872] Prevent the call to SparkContext#stop() in the listener bus's thread
Date Tue, 24 Nov 2015 20:24:02 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 927070d6d -> 0419fd361


[SPARK-11872] Prevent the call to SparkContext#stop() in the listener bus's thread

This is continuation of SPARK-11761

Andrew suggested adding this protection. See tail of https://github.com/apache/spark/pull/9741

Author: tedyu <yuzhihong@gmail.com>

Closes #9852 from tedyu/master.

(cherry picked from commit 81012546ee5a80d2576740af0dad067b0f5962c5)
Signed-off-by: Shixiong Zhu <shixiong@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0419fd36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0419fd36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0419fd36

Branch: refs/heads/branch-1.6
Commit: 0419fd3615ec73be1b2774e768238bf3f0cb4403
Parents: 927070d
Author: tedyu <yuzhihong@gmail.com>
Authored: Tue Nov 24 12:22:33 2015 -0800
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Tue Nov 24 12:23:51 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  4 +++
 .../spark/scheduler/SparkListenerSuite.scala    | 31 ++++++++++++++++++++
 2 files changed, 35 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0419fd36/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index af4456c..90480e5 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1694,6 +1694,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
   // Shut down the SparkContext.
   def stop() {
+    if (AsynchronousListenerBus.withinListenerThread.value) {
+      throw new SparkException("Cannot stop SparkContext within listener thread of" +
+        " AsynchronousListenerBus")
+    }
     // Use the stopping variable to ensure no contention for the stop scenario.
     // Still track the stopped variable for use elsewhere in the code.
     if (!stopped.compareAndSet(false, true)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0419fd36/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 84e5458..f20d5be 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 
 import org.scalatest.Matchers
 
+import org.apache.spark.SparkException
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.util.ResetSystemProperties
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
@@ -36,6 +37,21 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with
Match
 
   val jobCompletionTime = 1421191296660L
 
+  test("don't call sc.stop in listener") {
+    sc = new SparkContext("local", "SparkListenerSuite")
+    val listener = new SparkContextStoppingListener(sc)
+    val bus = new LiveListenerBus
+    bus.addListener(listener)
+
+    // Starting listener bus should flush all buffered events
+    bus.start(sc)
+    bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
+    bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
+
+    bus.stop()
+    assert(listener.sparkExSeen)
+  }
+
   test("basic creation and shutdown of LiveListenerBus") {
     val counter = new BasicJobCounter
     val bus = new LiveListenerBus
@@ -443,6 +459,21 @@ private class BasicJobCounter extends SparkListener {
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1
 }
 
+/**
+ * A simple listener that tries to stop SparkContext.
+ */
+private class SparkContextStoppingListener(val sc: SparkContext) extends SparkListener {
+  @volatile var sparkExSeen = false
+  override def onJobEnd(job: SparkListenerJobEnd): Unit = {
+    try {
+      sc.stop()
+    } catch {
+      case se: SparkException =>
+        sparkExSeen = true
+    }
+  }
+}
+
 private class ListenerThatAcceptsSparkConf(conf: SparkConf) extends SparkListener {
   var count = 0
   override def onJobEnd(job: SparkListenerJobEnd): Unit = count += 1


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message