spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-6132] ContextCleaner race condition across SparkContexts
Date Tue, 03 Mar 2015 21:44:10 GMT
Repository: spark
Updated Branches:
  refs/heads/master e750a6bfd -> fe63e8229


[SPARK-6132] ContextCleaner race condition across SparkContexts

The problem is that `ContextCleaner` may clean variables that belong to a different `SparkContext`.
This can happen if the `SparkContext` to which the cleaner belongs stops, and a new one is
started immediately afterwards in the same JVM. In this case, if the cleaner is in the middle
of cleaning a broadcast, for instance, it will do so through `SparkEnv.get.blockManager`,
which could be one that belongs to a different `SparkContext`.

JoshRosen and I suspect that this is the cause of many flaky tests, most notably the `JavaAPISuite`.
We were able to reproduce the failure locally (though it is not deterministic and very hard
to reproduce).

Author: Andrew Or <andrew@databricks.com>

Closes #4869 from andrewor14/cleaner-masquerade and squashes the following commits:

29168c0 [Andrew Or] Synchronize ContextCleaner stop


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe63e822
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe63e822
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe63e822

Branch: refs/heads/master
Commit: fe63e822918a01e1c1d741052b932e9944745fb6
Parents: e750a6b
Author: Andrew Or <andrew@databricks.com>
Authored: Tue Mar 3 13:44:05 2015 -0800
Committer: Andrew Or <andrew@databricks.com>
Committed: Tue Mar 3 13:44:05 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/ContextCleaner.scala | 39 +++++++++++++-------
 1 file changed, 26 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fe63e822/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index 4a9d007..4dab886 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -105,9 +105,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging
{
     cleaningThread.start()
   }
 
-  /** Stop the cleaner. */
+  /**
+   * Stop the cleaning thread and wait until the thread has finished running its current
task.
+   */
   def stop() {
     stopped = true
+    // Interrupt the cleaning thread, but wait until the current task has finished before
+    // doing so. This guards against the race condition where a cleaning thread may
+    // potentially clean similarly named variables created by a different SparkContext,
+    // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
+    synchronized {
+      cleaningThread.interrupt()
+    }
+    cleaningThread.join()
   }
 
   /** Register a RDD for cleanup when it is garbage collected. */
@@ -140,18 +150,21 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging
{
       try {
         val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
           .map(_.asInstanceOf[CleanupTaskWeakReference])
-        reference.map(_.task).foreach { task =>
-          logDebug("Got cleaning task " + task)
-          referenceBuffer -= reference.get
-          task match {
-            case CleanRDD(rddId) =>
-              doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
-            case CleanShuffle(shuffleId) =>
-              doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
-            case CleanBroadcast(broadcastId) =>
-              doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
-            case CleanAccum(accId) =>
-              doCleanupAccum(accId, blocking = blockOnCleanupTasks)
+        // Synchronize here to avoid being interrupted on stop()
+        synchronized {
+          reference.map(_.task).foreach { task =>
+            logDebug("Got cleaning task " + task)
+            referenceBuffer -= reference.get
+            task match {
+              case CleanRDD(rddId) =>
+                doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
+              case CleanShuffle(shuffleId) =>
+                doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
+              case CleanBroadcast(broadcastId) =>
+                doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
+              case CleanAccum(accId) =>
+                doCleanupAccum(accId, blocking = blockOnCleanupTasks)
+            }
           }
         }
       } catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message