spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-12021][STREAMING][TESTS] Fix the potential dead-lock in StreamingListenerSuite
Date Fri, 27 Nov 2015 19:50:23 GMT
Repository: spark
Updated Branches:
  refs/heads/master ba02f6cb5 -> f57e6c9ef


[SPARK-12021][STREAMING][TESTS] Fix the potential dead-lock in StreamingListenerSuite

In StreamingListenerSuite."don't call ssc.stop in listener", after the main thread calls `ssc.stop()`,
 `StreamingContextStoppingCollector` may call  `ssc.stop()` in the listener bus thread, which
is a dead-lock. This PR updated `StreamingContextStoppingCollector` to only call `ssc.stop()`
in the first batch to avoid the dead-lock.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #10011 from zsxwing/fix-test-deadlock.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f57e6c9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f57e6c9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f57e6c9e

Branch: refs/heads/master
Commit: f57e6c9effdb9e282fc8ae66dc30fe053fed5272
Parents: ba02f6c
Author: Shixiong Zhu <shixiong@databricks.com>
Authored: Fri Nov 27 11:50:18 2015 -0800
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Fri Nov 27 11:50:18 2015 -0800

----------------------------------------------------------------------
 .../streaming/StreamingListenerSuite.scala      | 25 +++++++++++++++-----
 1 file changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f57e6c9e/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index df4575a..04cd5bd 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -222,7 +222,11 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
     val batchCounter = new BatchCounter(_ssc)
     _ssc.start()
     // Make sure running at least one batch
-    batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)
+    if (!batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout
= 10000)) {
+      fail("The first batch cannot complete in 10 seconds")
+    }
+    // When reaching here, we can make sure `StreamingContextStoppingCollector` won't call
+    // `ssc.stop()`, so it's safe to call `_ssc.stop()` now.
     _ssc.stop()
     assert(contextStoppingCollector.sparkExSeen)
   }
@@ -345,12 +349,21 @@ class FailureReasonsCollector extends StreamingListener {
  */
 class StreamingContextStoppingCollector(val ssc: StreamingContext) extends StreamingListener
{
   @volatile var sparkExSeen = false
+
+  private var isFirstBatch = true
+
   override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
-    try {
-      ssc.stop()
-    } catch {
-      case se: SparkException =>
-        sparkExSeen = true
+    if (isFirstBatch) {
+      // We should only call `ssc.stop()` in the first batch. Otherwise, it's possible that
the main
+      // thread is calling `ssc.stop()`, while StreamingContextStoppingCollector is also
calling
+      // `ssc.stop()` in the listener thread, which becomes a dead-lock.
+      isFirstBatch = false
+      try {
+        ssc.stop()
+      } catch {
+        case se: SparkException =>
+          sparkExSeen = true
+      }
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message