spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewo...@apache.org
Subject spark git commit: [SPARK-12155][SPARK-12253] Fix executor OOM in unified memory management
Date Thu, 10 Dec 2015 23:30:11 GMT
Repository: spark
Updated Branches:
  refs/heads/master 23a9e62ba -> 5030923ea


[SPARK-12155][SPARK-12253] Fix executor OOM in unified memory management

**Problem.** In unified memory management, acquiring execution memory may lead to eviction
of storage memory. However, the space freed from evicting cached blocks is distributed among
all active tasks. Thus, an incorrect upper bound on the execution memory per task can cause
the acquisition to fail, leading to OOM's and premature spills.

**Example.** Suppose total memory is 1000B, cached blocks occupy 900B, `spark.memory.storageFraction`
is 0.4, and there are two active tasks. In this case, the cap on task execution memory is
100B / 2 = 50B. If task A tries to acquire 200B, it will evict 100B of storage but can only
acquire 50B because of the incorrect cap. For another example, see this [regression test](https://github.com/andrewor14/spark/blob/fix-oom/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala#L233)
that I stole from JoshRosen.

**Solution.** Fix the cap on task execution memory. It should take into account the space
that could have been freed by storage in addition to the current amount of memory available
to execution. In the example above, the correct cap should have been 600B / 2 = 300B.

This patch also guards against the race condition (SPARK-12253):
(1) Existing tasks collectively occupy all execution memory
(2) New task comes in and blocks while existing tasks spill
(3) After tasks finish spilling, another task jumps in and puts in a large block, stealing
the freed memory
(4) New task still cannot acquire memory and goes back to sleep

Author: Andrew Or <andrew@databricks.com>

Closes #10240 from andrewor14/fix-oom.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5030923e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5030923e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5030923e

Branch: refs/heads/master
Commit: 5030923ea8bb94ac8fa8e432de9fc7089aa93986
Parents: 23a9e62
Author: Andrew Or <andrew@databricks.com>
Authored: Thu Dec 10 15:30:08 2015 -0800
Committer: Andrew Or <andrew@databricks.com>
Committed: Thu Dec 10 15:30:08 2015 -0800

----------------------------------------------------------------------
 .../spark/memory/ExecutionMemoryPool.scala      | 57 ++++++++++++++------
 .../spark/memory/UnifiedMemoryManager.scala     | 57 +++++++++++++++-----
 .../scala/org/apache/spark/scheduler/Task.scala |  6 +++
 .../memory/UnifiedMemoryManagerSuite.scala      | 25 +++++++++
 4 files changed, 114 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5030923e/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
index 9023e1a..dbb0ad8 100644
--- a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
@@ -70,11 +70,28 @@ private[memory] class ExecutionMemoryPool(
    * active tasks) before it is forced to spill. This can happen if the number of tasks increase
    * but an older task had a lot of memory already.
    *
+   * @param numBytes number of bytes to acquire
+   * @param taskAttemptId the task attempt acquiring memory
+   * @param maybeGrowPool a callback that potentially grows the size of this pool. It takes
in
+   *                      one parameter (Long) that represents the desired amount of memory
by
+   *                      which this pool should be expanded.
+   * @param computeMaxPoolSize a callback that returns the maximum allowable size of this
pool
+   *                           at this given moment. This is not a field because the max
pool
+   *                           size is variable in certain cases. For instance, in unified
+   *                           memory management, the execution pool can be expanded by evicting
+   *                           cached blocks, thereby shrinking the storage pool.
+   *
    * @return the number of bytes granted to the task.
    */
-  def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = lock.synchronized {
+  private[memory] def acquireMemory(
+      numBytes: Long,
+      taskAttemptId: Long,
+      maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
+      computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
     assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
 
+    // TODO: clean up this clunky method signature
+
     // Add this task to the taskMemory map just so we can keep an accurate count of the number
     // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
     if (!memoryForTask.contains(taskAttemptId)) {
@@ -91,25 +108,31 @@ private[memory] class ExecutionMemoryPool(
       val numActiveTasks = memoryForTask.keys.size
       val curMem = memoryForTask(taskAttemptId)
 
-      // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks;
-      // don't let it be negative
-      val maxToGrant =
-        math.min(numBytes, math.max(0, (poolSize / numActiveTasks) - curMem))
+      // In every iteration of this loop, we should first try to reclaim any borrowed execution
+      // space from storage. This is necessary because of the potential race condition where
new
+      // storage blocks may steal the free execution memory that this task was waiting for.
+      maybeGrowPool(numBytes - memoryFree)
+
+      // Maximum size the pool would have after potentially growing the pool.
+      // This is used to compute the upper bound of how much memory each task can occupy.
This
+      // must take into account potential free memory as well as the amount this pool currently
+      // occupies. Otherwise, we may run into SPARK-12155 where, in unified memory management,
+      // we did not take into account space that could have been freed by evicting cached
blocks.
+      val maxPoolSize = computeMaxPoolSize()
+      val maxMemoryPerTask = maxPoolSize / numActiveTasks
+      val minMemoryPerTask = poolSize / (2 * numActiveTasks)
+
+      // How much we can grant this task; keep its share within 0 <= X <= 1 / numActiveTasks
+      val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
       // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
       val toGrant = math.min(maxToGrant, memoryFree)
 
-      if (curMem < poolSize / (2 * numActiveTasks)) {
-        // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
-        // if we can't give it this much now, wait for other tasks to free up memory
-        // (this happens if older tasks allocated lots of memory before N grew)
-        if (memoryFree >= math.min(maxToGrant, poolSize / (2 * numActiveTasks) - curMem))
{
-          memoryForTask(taskAttemptId) += toGrant
-          return toGrant
-        } else {
-          logInfo(
-            s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
-          lock.wait()
-        }
+      // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
+      // if we can't give it this much now, wait for other tasks to free up memory
+      // (this happens if older tasks allocated lots of memory before N grew)
+      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
+        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
+        lock.wait()
       } else {
         memoryForTask(taskAttemptId) += toGrant
         return toGrant

http://git-wip-us.apache.org/repos/asf/spark/blob/5030923e/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 0b9f6a9..829f054 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -81,22 +81,51 @@ private[spark] class UnifiedMemoryManager private[memory] (
     assert(numBytes >= 0)
     memoryMode match {
       case MemoryMode.ON_HEAP =>
-        if (numBytes > onHeapExecutionMemoryPool.memoryFree) {
-          val extraMemoryNeeded = numBytes - onHeapExecutionMemoryPool.memoryFree
-          // There is not enough free memory in the execution pool, so try to reclaim memory
from
-          // storage. We can reclaim any free memory from the storage pool. If the storage
pool
-          // has grown to become larger than `storageRegionSize`, we can evict blocks and
reclaim
-          // the memory that storage has borrowed from execution.
-          val memoryReclaimableFromStorage =
-            math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
-          if (memoryReclaimableFromStorage > 0) {
-            // Only reclaim as much space as is necessary and available:
-            val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
-              math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
-            onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+
+        /**
+         * Grow the execution pool by evicting cached blocks, thereby shrinking the storage
pool.
+         *
+         * When acquiring memory for a task, the execution pool may need to make multiple
+         * attempts. Each attempt must be able to evict storage in case another task jumps
in
+         * and caches a large block between the attempts. This is called once per attempt.
+         */
+        def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
+          if (extraMemoryNeeded > 0) {
+            // There is not enough free memory in the execution pool, so try to reclaim memory
from
+            // storage. We can reclaim any free memory from the storage pool. If the storage
pool
+            // has grown to become larger than `storageRegionSize`, we can evict blocks and
reclaim
+            // the memory that storage has borrowed from execution.
+            val memoryReclaimableFromStorage =
+              math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
+            if (memoryReclaimableFromStorage > 0) {
+              // Only reclaim as much space as is necessary and available:
+              val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
+                math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
+              onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+            }
           }
         }
-        onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+
+        /**
+         * The size the execution pool would have after evicting storage memory.
+         *
+         * The execution memory pool divides this quantity among the active tasks evenly
to cap
+         * the execution memory allocation for each task. It is important to keep this greater
+         * than the execution pool size, which doesn't take into account potential memory
that
+         * could be freed by evicting storage. Otherwise we may hit SPARK-12155.
+         *
+         * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness
+         * in execution memory allocation across tasks, Otherwise, a task may occupy more
than
+         * its fair share of execution memory, mistakenly thinking that other tasks can acquire
+         * the portion of storage memory that cannot be evicted.
+         */
+        def computeMaxExecutionPoolSize(): Long = {
+          maxMemory - math.min(storageMemoryUsed, storageRegionSize)
+        }
+
+        onHeapExecutionMemoryPool.acquireMemory(
+          numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize)
+
       case MemoryMode.OFF_HEAP =>
         // For now, we only support on-heap caching of data, so we do not need to interact
with
         // the storage pool when allocating off-heap memory. This will change in the future,
though.

http://git-wip-us.apache.org/repos/asf/spark/blob/5030923e/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index d4bc3a5..9f27eed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -92,6 +92,12 @@ private[spark] abstract class Task[T](
         Utils.tryLogNonFatalError {
           // Release memory used by this thread for unrolling blocks
           SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
+          // Notify any tasks waiting for execution memory to be freed to wake up and try
to
+          // acquire memory again. This makes impossible the scenario where a task sleeps
forever
+          // because there are no other tasks left to notify it. Since this is safe to do
but may
+          // not be strictly necessary, we should revisit whether we can remove this in the
future.
+          val memoryManager = SparkEnv.get.memoryManager
+          memoryManager.synchronized { memoryManager.notifyAll() }
         }
       } finally {
         TaskContext.unset()

http://git-wip-us.apache.org/repos/asf/spark/blob/5030923e/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index e21a028..6cc4859 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -230,4 +230,29 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     assert(exception.getMessage.contains("larger heap size"))
   }
 
+  test("execution can evict cached blocks when there are multiple active tasks (SPARK-12155)")
{
+    val conf = new SparkConf()
+      .set("spark.memory.fraction", "1")
+      .set("spark.memory.storageFraction", "0")
+      .set("spark.testing.memory", "1000")
+    val mm = UnifiedMemoryManager(conf, numCores = 2)
+    val ms = makeMemoryStore(mm)
+    assert(mm.maxMemory === 1000)
+    // Have two tasks each acquire some execution memory so that the memory pool registers
that
+    // there are two active tasks:
+    assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
+    assert(mm.acquireExecutionMemory(100L, 1, MemoryMode.ON_HEAP) === 100L)
+    // Fill up all of the remaining memory with storage.
+    assert(mm.acquireStorageMemory(dummyBlock, 800L, evictedBlocks))
+    assertEvictBlocksToFreeSpaceNotCalled(ms)
+    assert(mm.storageMemoryUsed === 800)
+    assert(mm.executionMemoryUsed === 200)
+    // A task should still be able to allocate 100 bytes execution memory by evicting blocks
+    assert(mm.acquireExecutionMemory(100L, 0, MemoryMode.ON_HEAP) === 100L)
+    assertEvictBlocksToFreeSpaceCalled(ms, 100L)
+    assert(mm.executionMemoryUsed === 300)
+    assert(mm.storageMemoryUsed === 700)
+    assert(evictedBlocks.nonEmpty)
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message