spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject spark git commit: [core] [minor] Make sure ConnectionManager stops.
Date Sat, 18 Apr 2015 09:15:01 GMT
Repository: spark
Updated Branches:
  refs/heads/master 5f095d560 -> 327ebf0cb


[core] [minor] Make sure ConnectionManager stops.

My previous fix (force a selector wakeup) didn't seem to work since
I ran into the hang again. So change the code a bit to be more
explicit about the condition when the selector thread should exit.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #5566 from vanzin/conn-mgr-hang and squashes the following commits:

ddb2c03 [Marcelo Vanzin] [core] [minor] Make sure ConnectionManager stops.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/327ebf0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/327ebf0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/327ebf0c

Branch: refs/heads/master
Commit: 327ebf0cb5e236579bece057eda27b21aed0e2dc
Parents: 5f095d5
Author: Marcelo Vanzin <vanzin@cloudera.com>
Authored: Sat Apr 18 10:14:56 2015 +0100
Committer: Sean Owen <sowen@cloudera.com>
Committed: Sat Apr 18 10:14:56 2015 +0100

----------------------------------------------------------------------
 .../spark/network/nio/ConnectionManager.scala       | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/327ebf0c/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 5a74c13..1a68e62 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -188,6 +188,7 @@ private[nio] class ConnectionManager(
   private val writeRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
   private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
 
+  @volatile private var isActive = true
   private val selectorThread = new Thread("connection-manager-thread") {
     override def run(): Unit = ConnectionManager.this.run()
   }
@@ -342,7 +343,7 @@ private[nio] class ConnectionManager(
 
   def run() {
     try {
-      while(!selectorThread.isInterrupted) {
+      while (isActive) {
         while (!registerRequests.isEmpty) {
           val conn: SendingConnection = registerRequests.dequeue()
           addListeners(conn)
@@ -398,7 +399,7 @@ private[nio] class ConnectionManager(
           } catch {
             // Explicitly only dealing with CancelledKeyException here since other exceptions
             // should be dealt with differently.
-            case e: CancelledKeyException => {
+            case e: CancelledKeyException =>
               // Some keys within the selectors list are invalid/closed. clear them.
               val allKeys = selector.keys().iterator()
 
@@ -420,8 +421,11 @@ private[nio] class ConnectionManager(
                   }
                 }
               }
-            }
-            0
+              0
+
+            case e: ClosedSelectorException =>
+              logDebug("Failed select() as selector is closed.", e)
+              return
           }
 
         if (selectedKeysCount == 0) {
@@ -988,11 +992,11 @@ private[nio] class ConnectionManager(
   }
 
   def stop() {
+    isActive = false
     ackTimeoutMonitor.stop()
-    selector.wakeup()
+    selector.close()
     selectorThread.interrupt()
     selectorThread.join()
-    selector.close()
     val connections = connectionsByKey.values
     connections.foreach(_.close())
     if (connectionsByKey.size != 0) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message