spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [STREAMING] SPARK-4986 Wait for receivers to deregister and receiver job to terminate
Date Tue, 03 Feb 2015 22:54:00 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 d644bd96a -> 092d4ba57


[STREAMING] SPARK-4986 Wait for receivers to deregister and receiver job to terminate

A slow receiver might not have enough time to shutdown cleanly even when graceful shutdown
is used. This PR extends graceful waiting to make sure all receivers have deregistered and
that the receiver job has terminated.

Author: Jesper Lundgren <jesper.lundgren@vpon.com>

Closes #4338 from cleaton/stopreceivers and squashes the following commits:

a9cf223 [Jesper Lundgren] remove cleaner.ttl config
f969b6e [Jesper Lundgren] fix inversed logic in unit test
3d0bd35 [Jesper Lundgren] switch boleans to match running status instead of terminated
9a9ff88 [Jesper Lundgren] wait for receivers to shutdown and receiver job to terminate
d179372 [Jesper Lundgren] Add graceful shutdown unit test covering slow receiver onStop

(cherry picked from commit 1e8b5394b44a0d3b36f64f10576c3ae3b977810c)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/092d4ba5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/092d4ba5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/092d4ba5

Branch: refs/heads/branch-1.3
Commit: 092d4ba5706687200c005d4b275f7ec4a86daf19
Parents: d644bd9
Author: Jesper Lundgren <jesper.lundgren@vpon.com>
Authored: Tue Feb 3 14:53:39 2015 -0800
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Tue Feb 3 14:53:54 2015 -0800

----------------------------------------------------------------------
 .../streaming/scheduler/JobScheduler.scala      |  2 +-
 .../streaming/scheduler/ReceiverTracker.scala   | 19 ++++++-
 .../spark/streaming/StreamingContextSuite.scala | 58 ++++++++++++++++++++
 3 files changed, 75 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/092d4ba5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 0e0f5bd..b3ffc71 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -73,7 +73,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
     logDebug("Stopping JobScheduler")
 
     // First, stop receiving
-    receiverTracker.stop()
+    receiverTracker.stop(processAllReceivedData)
 
     // Second, stop generating jobs. If it has to process all received data,
     // then this will wait for all the processing through JobScheduler to be over.

http://git-wip-us.apache.org/repos/asf/spark/blob/092d4ba5/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 4f99886..00456ab 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -86,10 +86,10 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean
= false
   }
 
   /** Stop the receiver execution thread. */
-  def stop() = synchronized {
+  def stop(graceful: Boolean) = synchronized {
     if (!receiverInputStreams.isEmpty && actor != null) {
       // First, stop the receivers
-      if (!skipReceiverLaunch) receiverExecutor.stop()
+      if (!skipReceiverLaunch) receiverExecutor.stop(graceful)
 
       // Finally, stop the actor
       ssc.env.actorSystem.stop(actor)
@@ -218,6 +218,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean
= false
   /** This thread class runs all the receivers on the cluster.  */
   class ReceiverLauncher {
     @transient val env = ssc.env
+    @volatile @transient private var running = false
     @transient val thread  = new Thread() {
       override def run() {
         try {
@@ -233,7 +234,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean
= false
       thread.start()
     }
 
-    def stop() {
+    def stop(graceful: Boolean) {
       // Send the stop signal to all the receivers
       stopReceivers()
 
@@ -241,6 +242,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean
= false
       // That is, for the receivers to quit gracefully.
       thread.join(10000)
 
+      if (graceful) {
+        val pollTime = 100
+        def done = { receiverInfo.isEmpty && !running }
+        logInfo("Waiting for receiver job to terminate gracefully")
+        while(!done) {
+          Thread.sleep(pollTime)
+        }
+        logInfo("Waited for receiver job to terminate gracefully")
+      }
+
       // Check if all the receivers have been deregistered or not
       if (!receiverInfo.isEmpty) {
         logWarning("All of the receivers have not deregistered, " + receiverInfo)
@@ -295,7 +306,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean
= false
 
       // Distribute the receivers and start them
       logInfo("Starting " + receivers.length + " receivers")
+      running = true
       ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver))
+      running = false
       logInfo("All of the receivers have been terminated")
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/092d4ba5/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 9f352bd..0b5af25 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -205,6 +205,32 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with
Timeouts w
     }
   }
 
+  test("stop slow receiver gracefully") {
+    val conf = new SparkConf().setMaster(master).setAppName(appName)
+    conf.set("spark.streaming.gracefulStopTimeout", "20000")
+    sc = new SparkContext(conf)
+    logInfo("==================================\n\n\n")
+    ssc = new StreamingContext(sc, Milliseconds(100))
+    var runningCount = 0
+    SlowTestReceiver.receivedAllRecords = false
+    //Create test receiver that sleeps in onStop()
+    val totalNumRecords = 15
+    val recordsPerSecond = 1
+    val input = ssc.receiverStream(new SlowTestReceiver(totalNumRecords, recordsPerSecond))
+    input.count().foreachRDD { rdd =>
+      val count = rdd.first()
+      runningCount += count.toInt
+      logInfo("Count = " + count + ", Running count = " + runningCount)
+    }
+    ssc.start()
+    ssc.awaitTermination(500)
+    ssc.stop(stopSparkContext = false, stopGracefully = true)
+    logInfo("Running count = " + runningCount)
+    assert(runningCount > 0)
+    assert(runningCount == totalNumRecords)
+    Thread.sleep(100)
+  }
+
   test("awaitTermination") {
     ssc = new StreamingContext(master, appName, batchDuration)
     val inputStream = addInputStream(ssc)
@@ -319,6 +345,38 @@ object TestReceiver {
   val counter = new AtomicInteger(1)
 }
 
+/** Custom receiver for testing whether a slow receiver can be shutdown gracefully or not
*/
+class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int) extends Receiver[Int](StorageLevel.MEMORY_ONLY)
with Logging {
+
+  var receivingThreadOption: Option[Thread] = None
+
+  def onStart() {
+    val thread = new Thread() {
+      override def run() {
+        logInfo("Receiving started")
+        for(i <- 1 to totalRecords) {
+          Thread.sleep(1000 / recordsPerSecond)
+          store(i)
+        }
+        SlowTestReceiver.receivedAllRecords = true
+        logInfo(s"Received all $totalRecords records")
+      }
+    }
+    receivingThreadOption = Some(thread)
+    thread.start()
+  }
+
+  def onStop() {
+    // Simulate slow receiver by waiting for all records to be produced
+    while(!SlowTestReceiver.receivedAllRecords) Thread.sleep(100)
+    // no cleanup to be done, the receiving thread should stop on it own
+  }
+}
+
+object SlowTestReceiver {
+  var receivedAllRecords = false
+}
+
 /** Streaming application for testing DStream and RDD creation sites */
 package object testPackage extends Assertions {
   def test() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message