spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject spark git commit: [SPARK-7930] [CORE] [STREAMING] Fixed shutdown hook priorities
Date Fri, 29 May 2015 05:29:01 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 aee046dfa -> f7cb272b7


[SPARK-7930] [CORE] [STREAMING] Fixed shutdown hook priorities

Shutdown hook for temp directories had priority 100 while SparkContext was 50. So the local
root directory was deleted before SparkContext was shutdown. This leads to scary errors on
running jobs, at the time of shutdown. This is especially a problem when running streaming
examples, where Ctrl-C is the only way to shutdown.

The fix in this PR is to make the temp directory shutdown priority lower than SparkContext,
so that the temp dirs are the last thing to get deleted, after the SparkContext has been shut
down. Also, the DiskBlockManager shutdown priority is change from default 100 to temp_dir_prio
+ 1, so that it gets invoked just before all temp dirs are cleared.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #6482 from tdas/SPARK-7930 and squashes the following commits:

d7cbeb5 [Tathagata Das] Removed unnecessary line
1514d0b [Tathagata Das] Fixed shutdown hook priorities

(cherry picked from commit cd3d9a5c0c3e77098a72c85dffe4a27737009ae7)
Signed-off-by: Patrick Wendell <patrick@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7cb272b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7cb272b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7cb272b

Branch: refs/heads/branch-1.4
Commit: f7cb272b7c77de42681287925922d41248efca46
Parents: aee046d
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Thu May 28 22:28:13 2015 -0700
Committer: Patrick Wendell <patrick@databricks.com>
Committed: Thu May 28 22:28:31 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/storage/DiskBlockManager.scala     |  4 ++--
 core/src/main/scala/org/apache/spark/util/Utils.scala   | 12 ++++++++++--
 2 files changed, 12 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f7cb272b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 2a44477..d441a4d 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -139,8 +139,8 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf:
SparkCon
   }
 
   private def addShutdownHook(): AnyRef = {
-    Utils.addShutdownHook { () =>
-      logDebug("Shutdown hook called")
+    Utils.addShutdownHook(Utils.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
+      logInfo("Shutdown hook called")
       DiskBlockManager.this.doStop()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f7cb272b/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 763d4db..693e1a0 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -73,6 +73,13 @@ private[spark] object Utils extends Logging {
    */
   val SPARK_CONTEXT_SHUTDOWN_PRIORITY = 50
 
+  /**
+   * The shutdown priority of temp directory must be lower than the SparkContext shutdown
+   * priority. Otherwise cleaning the temp directories while Spark jobs are running can
+   * throw undesirable errors at the time of shutdown.
+   */
+  val TEMP_DIR_SHUTDOWN_PRIORITY = 25
+
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   @volatile private var localRootDirs: Array[String] = null
 
@@ -189,10 +196,11 @@ private[spark] object Utils extends Logging {
   private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
 
   // Add a shutdown hook to delete the temp dirs when the JVM exits
-  addShutdownHook { () =>
-    logDebug("Shutdown hook called")
+  addShutdownHook(TEMP_DIR_SHUTDOWN_PRIORITY) { () =>
+    logInfo("Shutdown hook called")
     shutdownDeletePaths.foreach { dirPath =>
       try {
+        logInfo("Deleting directory " + dirPath)
         Utils.deleteRecursively(new File(dirPath))
       } catch {
         case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath",
e)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message