spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-19831][CORE] Reuse the existing cleanupThreadExecutor to clean up the directories of finished applications to avoid the block
Date Sun, 12 Mar 2017 17:29:05 GMT
Repository: spark
Updated Branches:
  refs/heads/master e29a74d5b -> 2f5187bde


[SPARK-19831][CORE] Reuse the existing cleanupThreadExecutor to clean up the directories of
finished applications to avoid the block

Cleaning the application may cost much time at worker, then it will block that the worker
send heartbeats master because the worker is extend ThreadSafeRpcEndpoint. If the heartbeat
from a worker is blocked by the message ApplicationFinished, master will think the worker
is dead. If the worker has a driver, the driver will be scheduled by master again.
It had better reuse the existing cleanupThreadExecutor to clean up the directories of finished
applications to avoid the block.

Author: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com>

Closes #17189 from hustfxj/worker-hearbeat.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2f5187bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2f5187bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2f5187bd

Branch: refs/heads/master
Commit: 2f5187bde1544c452fe5116a2bd243653332a079
Parents: e29a74d
Author: xiaojian.fxj <xiaojian.fxj@alibaba-inc.com>
Authored: Sun Mar 12 10:29:00 2017 -0700
Committer: Shixiong Zhu <shixiong@databricks.com>
Committed: Sun Mar 12 10:29:00 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/deploy/worker/Worker.scala    | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2f5187bd/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index e48817e..00b9d1a 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -62,8 +62,8 @@ private[deploy] class Worker(
   private val forwordMessageScheduler =
     ThreadUtils.newDaemonSingleThreadScheduledExecutor("worker-forward-message-scheduler")
 
-  // A separated thread to clean up the workDir. Used to provide the implicit parameter of
`Future`
-  // methods.
+  // A separated thread to clean up the workDir and the directories of finished applications.
+  // Used to provide the implicit parameter of `Future` methods.
   private val cleanupThreadExecutor = ExecutionContext.fromExecutorService(
     ThreadUtils.newDaemonSingleThreadExecutor("worker-cleanup-thread"))
 
@@ -578,10 +578,15 @@ private[deploy] class Worker(
     if (shouldCleanup) {
       finishedApps -= id
       appDirectories.remove(id).foreach { dirList =>
-        logInfo(s"Cleaning up local directories for application $id")
-        dirList.foreach { dir =>
-          Utils.deleteRecursively(new File(dir))
-        }
+        concurrent.Future {
+          logInfo(s"Cleaning up local directories for application $id")
+          dirList.foreach { dir =>
+            Utils.deleteRecursively(new File(dir))
+          }
+        }(cleanupThreadExecutor).onFailure {
+          case e: Throwable =>
+            logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e)
+        }(cleanupThreadExecutor)
       }
       shuffleService.applicationRemoved(id)
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message