spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pwend...@apache.org
Subject git commit: [SPARK-3139] Made ContextCleaner to not block on shuffles
Date Wed, 27 Aug 2014 07:17:43 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 6f82a4b13 -> 5cf1e4401


[SPARK-3139] Made ContextCleaner to not block on shuffles

As a workaround for SPARK-3015, the ContextCleaner was made "blocking", that is, it cleaned
items one-by-one. But shuffles can take a long time to be deleted. Given that the RC for 1.1
is imminent, this PR makes a narrow change in the context cleaner - not wait for shuffle cleanups
to complete. Also it changes the error messages on failure to delete to be milder warnings,
as exceptions in the delete code path for one item does not really stop the actual functioning
of the system.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #2143 from tdas/cleaner-shuffle-fix and squashes the following commits:

9c84202 [Tathagata Das] Restoring default blocking behavior in ContextCleanerSuite, and added
docs to identify that spark.cleaner.referenceTracking.blocking does not control shuffle.
2181329 [Tathagata Das] Mark shuffle cleanup as non-blocking.
e337cc2 [Tathagata Das] Changed semantics based on PR comments.
387b578 [Tathagata Das] Made ContextCleaner to not block on shuffles
(cherry picked from commit 3e2864e40472b32e6a7eec5ba3bc83562d2a1a62)

Signed-off-by: Patrick Wendell <pwendell@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5cf1e440
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5cf1e440
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5cf1e440

Branch: refs/heads/branch-1.1
Commit: 5cf1e440137006eedd6846ac8fa57ccf9fd1958d
Parents: 6f82a4b
Author: Tathagata Das <tathagata.das1565@gmail.com>
Authored: Wed Aug 27 00:13:38 2014 -0700
Committer: Patrick Wendell <pwendell@gmail.com>
Committed: Wed Aug 27 00:17:37 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/ContextCleaner.scala   | 18 ++++++++++++++++--
 .../apache/spark/storage/BlockManagerMaster.scala | 12 +++++++-----
 .../org/apache/spark/ContextCleanerSuite.scala    |  3 +++
 3 files changed, 26 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5cf1e440/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 3848734..ede1e23 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -65,7 +65,8 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
   private val cleaningThread = new Thread() { override def run() { keepCleaning() }}
 
   /**
-   * Whether the cleaning thread will block on cleanup tasks.
+   * Whether the cleaning thread will block on cleanup tasks (other than shuffle, which
+   * is controlled by the `spark.cleaner.referenceTracking.blocking.shuffle` parameter).
    *
    * Due to SPARK-3015, this is set to true by default. This is intended to be only a temporary
    * workaround for the issue, which is ultimately caused by the way the BlockManager actors
@@ -76,6 +77,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
   private val blockOnCleanupTasks = sc.conf.getBoolean(
     "spark.cleaner.referenceTracking.blocking", true)
 
+  /**
+   * Whether the cleaning thread will block on shuffle cleanup tasks.
+   *
+   * When context cleaner is configured to block on every delete request, it can throw timeout
+   * exceptions on cleanup of shuffle blocks, as reported in SPARK-3139. To avoid that, this
+   * parameter by default disables blocking on shuffle cleanups. Note that this does not
affect
+   * the cleanup of RDDs and broadcasts. This is intended to be a temporary workaround,
+   * until the real Akka issue (referred to in the comment above `blockOnCleanupTasks`) is
+   * resolved.
+   */
+  private val blockOnShuffleCleanupTasks = sc.conf.getBoolean(
+    "spark.cleaner.referenceTracking.blocking.shuffle", false)
+
   @volatile private var stopped = false
 
   /** Attach a listener object to get information of when objects are cleaned. */
@@ -128,7 +142,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging
{
             case CleanRDD(rddId) =>
               doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
             case CleanShuffle(shuffleId) =>
-              doCleanupShuffle(shuffleId, blocking = blockOnCleanupTasks)
+              doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
             case CleanBroadcast(broadcastId) =>
               doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/5cf1e440/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index 6693077..e67b3dc 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -101,7 +101,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends
Log
   def removeRdd(rddId: Int, blocking: Boolean) {
     val future = askDriverWithReply[Future[Seq[Int]]](RemoveRdd(rddId))
     future.onFailure {
-      case e: Throwable => logError("Failed to remove RDD " + rddId, e)
+      case e: Exception =>
+        logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}}")
     }
     if (blocking) {
       Await.result(future, timeout)
@@ -112,7 +113,8 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends
Log
   def removeShuffle(shuffleId: Int, blocking: Boolean) {
     val future = askDriverWithReply[Future[Seq[Boolean]]](RemoveShuffle(shuffleId))
     future.onFailure {
-      case e: Throwable => logError("Failed to remove shuffle " + shuffleId, e)
+      case e: Exception =>
+        logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}}")
     }
     if (blocking) {
       Await.result(future, timeout)
@@ -124,9 +126,9 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends
Log
     val future = askDriverWithReply[Future[Seq[Int]]](
       RemoveBroadcast(broadcastId, removeFromMaster))
     future.onFailure {
-      case e: Throwable =>
-        logError("Failed to remove broadcast " + broadcastId +
-          " with removeFromMaster = " + removeFromMaster, e)
+      case e: Exception =>
+        logWarning(s"Failed to remove broadcast $broadcastId" +
+          s" with removeFromMaster = $removeFromMaster - ${e.getMessage}}")
     }
     if (blocking) {
       Await.result(future, timeout)

http://git-wip-us.apache.org/repos/asf/spark/blob/5cf1e440/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index 4bc4346..2744894 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -52,6 +52,7 @@ abstract class ContextCleanerSuiteBase(val shuffleManager: Class[_] = classOf[Ha
     .setMaster("local[2]")
     .setAppName("ContextCleanerSuite")
     .set("spark.cleaner.referenceTracking.blocking", "true")
+    .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
     .set("spark.shuffle.manager", shuffleManager.getName)
 
   before {
@@ -243,6 +244,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase {
       .setMaster("local-cluster[2, 1, 512]")
       .setAppName("ContextCleanerSuite")
       .set("spark.cleaner.referenceTracking.blocking", "true")
+      .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
       .set("spark.shuffle.manager", shuffleManager.getName)
     sc = new SparkContext(conf2)
 
@@ -319,6 +321,7 @@ class SortShuffleContextCleanerSuite extends ContextCleanerSuiteBase(classOf[Sor
       .setMaster("local-cluster[2, 1, 512]")
       .setAppName("ContextCleanerSuite")
       .set("spark.cleaner.referenceTracking.blocking", "true")
+      .set("spark.cleaner.referenceTracking.blocking.shuffle", "true")
       .set("spark.shuffle.manager", shuffleManager.getName)
     sc = new SparkContext(conf2)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message