spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject spark git commit: [SPARK-11019] [STREAMING] [FLUME] Gracefully shutdown Flume receiver th…
Date Fri, 09 Oct 2015 01:50:32 GMT
Repository: spark
Updated Branches:
  refs/heads/master 8e67882b9 -> fa3e4d8f5


[SPARK-11019] [STREAMING] [FLUME] Gracefully shutdown Flume receiver th…

…reads.

Wait for a minute for the receiver threads to shutdown before interrupting them.

Author: Hari Shreedharan <hshreedharan@apache.org>

Closes #9041 from harishreedharan/flume-graceful-shutdown.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa3e4d8f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa3e4d8f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa3e4d8f

Branch: refs/heads/master
Commit: fa3e4d8f52995bf632e7eda60dbb776c9f637546
Parents: 8e67882
Author: Hari Shreedharan <hshreedharan@apache.org>
Authored: Thu Oct 8 18:50:27 2015 -0700
Committer: Tathagata Das <tathagata.das1565@gmail.com>
Committed: Thu Oct 8 18:50:27 2015 -0700

----------------------------------------------------------------------
 .../spark/streaming/flume/FlumePollingInputDStream.scala     | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fa3e4d8f/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
----------------------------------------------------------------------
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
index 3b936d8..6737750 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala
@@ -18,7 +18,7 @@ package org.apache.spark.streaming.flume
 
 
 import java.net.InetSocketAddress
-import java.util.concurrent.{LinkedBlockingQueue, Executors}
+import java.util.concurrent.{Executors, LinkedBlockingQueue, TimeUnit}
 
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
@@ -93,7 +93,11 @@ private[streaming] class FlumePollingReceiver(
 
   override def onStop(): Unit = {
     logInfo("Shutting down Flume Polling Receiver")
-    receiverExecutor.shutdownNow()
+    receiverExecutor.shutdown()
+    // Wait upto a minute for the threads to die
+    if (!receiverExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
+      receiverExecutor.shutdownNow()
+    }
     connections.asScala.foreach(_.transceiver.close())
     channelFactory.releaseExternalResources()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message