spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sro...@apache.org
Subject [1/2] spark git commit: [SPARK-6132] ContextCleaner race condition across SparkContexts
Date Sun, 22 Mar 2015 13:09:02 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 c5836816f -> 39761f515


[SPARK-6132] ContextCleaner race condition across SparkContexts

The problem is that `ContextCleaner` may clean variables that belong to a different `SparkContext`.
This can happen if the `SparkContext` to which the cleaner belongs stops, and a new one is
started immediately afterwards in the same JVM. In this case, if the cleaner is in the middle
of cleaning a broadcast, for instance, it will do so through `SparkEnv.get.blockManager`,
which could be one that belongs to a different `SparkContext`.

JoshRosen and I suspect that this is the cause of many flaky tests, most notably the `JavaAPISuite`.
We were able to reproduce the failure locally (though it is not deterministic and very hard
to reproduce).

Author: Andrew Or <andrew@databricks.com>

Closes #4869 from andrewor14/cleaner-masquerade and squashes the following commits:

29168c0 [Andrew Or] Synchronize ContextCleaner stop


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e445ce61
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e445ce61
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e445ce61

Branch: refs/heads/branch-1.1
Commit: e445ce61d02778beaa30a2d1867e5b06fe09fb5d
Parents: c583681
Author: Andrew Or <andrew@databricks.com>
Authored: Tue Mar 3 13:44:05 2015 -0800
Committer: Sean Owen <sowen@cloudera.com>
Committed: Sun Mar 22 13:08:14 2015 +0000

----------------------------------------------------------------------
 .../scala/org/apache/spark/ContextCleaner.scala | 35 ++++++++++++++------
 1 file changed, 24 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e445ce61/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index ede1e23..201e5ec 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -104,9 +104,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging
{
     cleaningThread.start()
   }
 
-  /** Stop the cleaner. */
+  /**
+   * Stop the cleaning thread and wait until the thread has finished running its current
task.
+   */
   def stop() {
     stopped = true
+    // Interrupt the cleaning thread, but wait until the current task has finished before
+    // doing so. This guards against the race condition where a cleaning thread may
+    // potentially clean similarly named variables created by a different SparkContext,
+    // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
+    synchronized {
+      cleaningThread.interrupt()
+    }
+    cleaningThread.join()
   }
 
   /** Register a RDD for cleanup when it is garbage collected. */
@@ -135,16 +145,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging
{
       try {
         val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
           .map(_.asInstanceOf[CleanupTaskWeakReference])
-        reference.map(_.task).foreach { task =>
-          logDebug("Got cleaning task " + task)
-          referenceBuffer -= reference.get
-          task match {
-            case CleanRDD(rddId) =>
-              doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
-            case CleanShuffle(shuffleId) =>
-              doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
-            case CleanBroadcast(broadcastId) =>
-              doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
+        // Synchronize here to avoid being interrupted on stop()
+        synchronized {
+          reference.map(_.task).foreach { task =>
+            logDebug("Got cleaning task " + task)
+            referenceBuffer -= reference.get
+            task match {
+              case CleanRDD(rddId) =>
+                doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
+              case CleanShuffle(shuffleId) =>
+                doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
+              case CleanBroadcast(broadcastId) =>
+                doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
+            }
           }
         }
       } catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message