spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject spark git commit: [SPARK-4772] Clear local copies of accumulators as soon as we're done with them
Date Wed, 10 Dec 2014 07:57:37 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.1 9b9923744 -> 6dcafa7ce


[SPARK-4772] Clear local copies of accumulators as soon as we're done with them

Accumulators keep thread-local copies of themselves.  These copies were only cleared at the
beginning of a task.  This meant that (a) the memory they used was tied up until the next
task ran on that thread, and (b) if a thread died, the memory it had used for accumulators
was locked up forever on that worker.

This PR clears the thread-local copies of accumulators at the end of each task, in the tasks
finally block, to make sure they are cleaned up between tasks.  It also stores them in a ThreadLocal
object, so that if, for some reason, the thread dies, any memory they are using at the time
should be freed up.

Author: Nathan Kronenfeld <nkronenfeld@oculusinfo.com>

Closes #3570 from nkronenfeld/Accumulator-Improvements and squashes the following commits:

a581f3f [Nathan Kronenfeld] Change Accumulators to private[spark] instead of adding mima exclude
to get around false positive in mima tests
b6c2180 [Nathan Kronenfeld] Include MiMa exclude as per build error instructions - this version
incompatibility should be irrelevent, as it will only surface if a master is talking to a
worker running a different version of spark.
537baad [Nathan Kronenfeld] Fuller refactoring as intended, incorporating JR's suggestions
for ThreadLocal localAccums, and keeping clear(), but also calling it in tasks' finally block,
rather than just at the beginning of the task.
39a82f2 [Nathan Kronenfeld] Clear local copies of accumulators as soon as we're done with
them

(cherry picked from commit 94b377f94487109a1cc3e07dd230b1df7a96e28d)
Signed-off-by: Josh Rosen <joshrosen@databricks.com>

Conflicts:
	core/src/main/scala/org/apache/spark/Accumulators.scala
	core/src/main/scala/org/apache/spark/executor/Executor.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6dcafa7c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6dcafa7c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6dcafa7c

Branch: refs/heads/branch-1.1
Commit: 6dcafa7ce53c4850a8d6dda138caf88cc8b640c9
Parents: 9b99237
Author: Nathan Kronenfeld <nkronenfeld@oculusinfo.com>
Authored: Tue Dec 9 23:53:17 2014 -0800
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Tue Dec 9 23:56:59 2014 -0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/Accumulators.scala    | 14 ++++++++------
 .../scala/org/apache/spark/executor/Executor.scala    |  3 ++-
 2 files changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6dcafa7c/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 2301caa..c9665d9 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -18,6 +18,7 @@
 package org.apache.spark
 
 import java.io.{ObjectInputStream, Serializable}
+import java.lang.ThreadLocal
 
 import scala.collection.generic.Growable
 import scala.collection.mutable.Map
@@ -246,10 +247,12 @@ trait AccumulatorParam[T] extends AccumulableParam[T, T] {
 
 // TODO: The multi-thread support in accumulators is kind of lame; check
 // if there's a more intuitive way of doing it right
-private object Accumulators {
+private[spark] object Accumulators {
   // TODO: Use soft references? => need to make readObject work properly then
   val originals = Map[Long, Accumulable[_, _]]()
-  val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
+  val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
+    override protected def initialValue() = Map[Long, Accumulable[_, _]]()
+  }
   var lastId: Long = 0
 
   def newId: Long = synchronized {
@@ -261,22 +264,21 @@ private object Accumulators {
     if (original) {
       originals(a.id) = a
     } else {
-      val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
-      accums(a.id) = a
+      localAccums.get()(a.id) = a
     }
   }
 
   // Clear the local (non-original) accumulators for the current thread
   def clear() {
     synchronized {
-      localAccums.remove(Thread.currentThread)
+      localAccums.get.clear
     }
   }
 
   // Get the values of the local accumulators for the current thread (by ID)
   def values: Map[Long, Any] = synchronized {
     val ret = Map[Long, Any]()
-    for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) {
+    for ((id, accum) <- localAccums.get) {
       ret(id) = accum.localValue
     }
     return ret

http://git-wip-us.apache.org/repos/asf/spark/blob/6dcafa7c/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 4f49b07..fbdc7ec 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -154,7 +154,6 @@ private[spark] class Executor(
 
       try {
         SparkEnv.set(env)
-        Accumulators.clear()
         val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
         updateDependencies(taskFiles, taskJars)
         task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
@@ -258,6 +257,8 @@ private[spark] class Executor(
         env.shuffleMemoryManager.releaseMemoryForThisThread()
         // Release memory used by this thread for unrolling blocks
         env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
+        // Release memory used by this thread for accumulators
+        Accumulators.clear()
         runningTasks.remove(taskId)
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message