spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject git commit: SPARK-1587 Fix thread leak
Date Thu, 24 Apr 2014 06:21:07 GMT
Repository: spark
Updated Branches:
  refs/heads/master bb68f4774 -> dd681f502


SPARK-1587 Fix thread leak

mvn test fails (intermittently) due to thread leak - since scalatest runs all tests in same
vm.

Author: Mridul Muralidharan <mridulm80@apache.org>

Closes #504 from mridulm/resource_leak_fixes and squashes the following commits:

a5d10d0 [Mridul Muralidharan] Prevent thread leaks while running tests : cleanup all threads
when SparkContext.stop is invoked. Causes tests to fail
7b5e19c [Mridul Muralidharan] Prevent NPE while running tests


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd681f50
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd681f50
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd681f50

Branch: refs/heads/master
Commit: dd681f502eafe39cfb8a5a62ea2d28016ac6013d
Parents: bb68f47
Author: Mridul Muralidharan <mridulm80@apache.org>
Authored: Wed Apr 23 23:20:55 2014 -0700
Committer: Aaron Davidson <aaron@databricks.com>
Committed: Wed Apr 23 23:20:55 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/metrics/MetricsSystem.scala    | 22 ++++++++-------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  1 +
 .../org/apache/spark/storage/BlockManager.scala |  2 ++
 .../apache/spark/storage/DiskBlockManager.scala | 28 ++++++++++++--------
 .../spark/storage/ShuffleBlockManager.scala     |  4 +++
 .../scala/org/apache/spark/ui/JettyUtils.scala  |  1 +
 .../spark/storage/DiskBlockManagerSuite.scala   |  5 ++++
 7 files changed, 42 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index c5bda20..651511d 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -129,17 +129,19 @@ private[spark] class MetricsSystem private (val instance: String,
 
     sinkConfigs.foreach { kv =>
       val classPath = kv._2.getProperty("class")
-      try {
-        val sink = Class.forName(classPath)
-          .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
-          .newInstance(kv._2, registry, securityMgr)
-        if (kv._1 == "servlet") {
-           metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
-        } else {
-          sinks += sink.asInstanceOf[Sink]
+      if (null != classPath) {
+        try {
+          val sink = Class.forName(classPath)
+            .getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
+            .newInstance(kv._2, registry, securityMgr)
+          if (kv._1 == "servlet") {
+            metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
+          } else {
+            sinks += sink.asInstanceOf[Sink]
+          }
+        } catch {
+          case e: Exception => logError("Sink class " + classPath + " cannot be instantialized",
e)
         }
-      } catch {
-        case e: Exception => logError("Sink class " + classPath + " cannot be instantialized",
e)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index be19d9b..5a68f38 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -356,6 +356,7 @@ private[spark] class TaskSchedulerImpl(
     if (taskResultGetter != null) {
       taskResultGetter.stop()
     }
+    starvationTimer.cancel()
 
     // sleeping for an arbitrary 1 seconds to ensure that messages are sent out.
     Thread.sleep(1000L)

http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index f15fa4d..ccd5c53 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -1021,6 +1021,8 @@ private[spark] class BlockManager(
       heartBeatTask.cancel()
     }
     connectionManager.stop()
+    shuffleBlockManager.stop()
+    diskBlockManager.stop()
     actorSystem.stop(slaveActor)
     blockInfo.clear()
     memoryStore.clear()

http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 7a24c8f..054f66a 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -150,20 +150,26 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager,
rootD
     Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") {
       override def run() {
         logDebug("Shutdown hook called")
-        localDirs.foreach { localDir =>
-          try {
-            if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
-          } catch {
-            case t: Throwable =>
-              logError("Exception while deleting local spark dir: " + localDir, t)
-          }
-        }
+        stop()
+      }
+    })
+  }
 
-        if (shuffleSender != null) {
-          shuffleSender.stop()
+  private[spark] def stop() {
+    localDirs.foreach { localDir =>
+      if (localDir.isDirectory() && localDir.exists()) {
+        try {
+          if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir)
+        } catch {
+          case t: Throwable =>
+            logError("Exception while deleting local spark dir: " + localDir, t)
         }
       }
-    })
+    }
+
+    if (shuffleSender != null) {
+      shuffleSender.stop()
+    }
   }
 
   private[storage] def startShuffleBlockSender(port: Int): Int = {

http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
index 4cd4cdb..35910e5 100644
--- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala
@@ -207,6 +207,10 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging
{
   private def cleanup(cleanupTime: Long) {
     shuffleStates.clearOldValues(cleanupTime, (shuffleId, state) => removeShuffleBlocks(shuffleId))
   }
+
+  def stop() {
+    metadataCleaner.cancel()
+  }
 }
 
 private[spark]

http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
index 750f5a5..fdeb15b 100644
--- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
@@ -195,6 +195,7 @@ private[spark] object JettyUtils extends Logging {
           (server, server.getConnectors.head.getLocalPort)
         case f: Failure[_] =>
           server.stop()
+          pool.stop()
           logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))
           logInfo("Error was: " + f.toString)
           connect((currentPort + 1) % 65536)

http://git-wip-us.apache.org/repos/asf/spark/blob/dd681f50/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 9b29e2a..42bfbf1 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -53,6 +53,11 @@ class DiskBlockManagerSuite extends FunSuite with BeforeAndAfterEach {
     shuffleBlockManager.idToSegmentMap.clear()
   }
 
+  override def afterEach() {
+    diskBlockManager.stop()
+    shuffleBlockManager.idToSegmentMap.clear()
+  }
+
   test("basic block creation") {
     val blockId = new TestBlockId("test")
     assertSegmentEquals(blockId, blockId.name, 0, 0)


Mime
View raw message