spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dav...@apache.org
Subject spark git commit: [SPARK-15260] Atomically resize memory pools
Date Wed, 11 May 2016 19:58:59 GMT
Repository: spark
Updated Branches:
  refs/heads/master 81c68eceb -> bb88ad4e0


[SPARK-15260] Atomically resize memory pools

## What changes were proposed in this pull request?

When we acquire execution memory, we do a lot of things between shrinking the storage memory
pool and enlarging the execution memory pool. In particular, we call `memoryStore.evictBlocksToFreeSpace`,
which may do a lot of I/O and can throw exceptions. If an exception is thrown, the pool sizes
on that executor will be in a bad state.

This patch minimizes the things we do between the two calls to make the resizing more atomic.

## How was this patch tested?

Jenkins.

Author: Andrew Or <andrew@databricks.com>

Closes #13039 from andrewor14/safer-pool.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bb88ad4e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bb88ad4e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bb88ad4e

Branch: refs/heads/master
Commit: bb88ad4e0e870c88d474c71939a19541522a3023
Parents: 81c68ec
Author: Andrew Or <andrew@databricks.com>
Authored: Wed May 11 12:58:57 2016 -0700
Committer: Davies Liu <davies.liu@gmail.com>
Committed: Wed May 11 12:58:57 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/memory/StorageMemoryPool.scala | 11 +++++-----
 .../spark/memory/UnifiedMemoryManager.scala     |  5 +++--
 .../spark/memory/MemoryManagerSuite.scala       | 15 +++++++++++++
 .../memory/UnifiedMemoryManagerSuite.scala      | 23 ++++++++++++++++++++
 4 files changed, 46 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bb88ad4e/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
index 0b552ca..4c6b639 100644
--- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -116,13 +116,13 @@ private[memory] class StorageMemoryPool(
   }
 
   /**
-   * Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the
number
-   * of bytes removed from the pool's capacity.
+   * Free space to shrink the size of this storage memory pool by `spaceToFree` bytes.
+   * Note: this method doesn't actually reduce the pool size but relies on the caller to
do so.
+   *
+   * @return number of bytes to be removed from the pool's capacity.
    */
-  def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
-    // First, shrink the pool by reclaiming free memory:
+  def freeSpaceToShrinkPool(spaceToFree: Long): Long = lock.synchronized {
     val spaceFreedByReleasingUnusedMemory = math.min(spaceToFree, memoryFree)
-    decrementPoolSize(spaceFreedByReleasingUnusedMemory)
     val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
     if (remainingSpaceToFree > 0) {
       // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
@@ -130,7 +130,6 @@ private[memory] class StorageMemoryPool(
         memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
       // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so
we do
       // not need to decrement _memoryUsed here. However, we do need to decrement the pool
size.
-      decrementPoolSize(spaceFreedByEviction)
       spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
     } else {
       spaceFreedByReleasingUnusedMemory

http://git-wip-us.apache.org/repos/asf/spark/blob/bb88ad4e/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index 82023b5..ae747c1 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -113,9 +113,10 @@ private[spark] class UnifiedMemoryManager private[memory] (
           storagePool.poolSize - storageRegionSize)
         if (memoryReclaimableFromStorage > 0) {
           // Only reclaim as much space as is necessary and available:
-          val spaceReclaimed = storagePool.shrinkPoolToFreeSpace(
+          val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
             math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
-          executionPool.incrementPoolSize(spaceReclaimed)
+          storagePool.decrementPoolSize(spaceToReclaim)
+          executionPool.incrementPoolSize(spaceToReclaim)
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/bb88ad4e/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index a128652..2c4928a 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -79,6 +79,21 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
   }
 
   /**
+   * Make a mocked [[MemoryStore]] whose [[MemoryStore.evictBlocksToFreeSpace]] method is
+   * stubbed to always throw [[RuntimeException]].
+   */
+  protected def makeBadMemoryStore(mm: MemoryManager): MemoryStore = {
+    val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
+    when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())).thenAnswer(new Answer[Long]
{
+      override def answer(invocation: InvocationOnMock): Long = {
+        throw new RuntimeException("bad memory store!")
+      }
+    })
+    mm.setMemoryStore(ms)
+    ms
+  }
+
+  /**
    * Simulate the part of [[MemoryStore.evictBlocksToFreeSpace]] that releases storage memory.
    *
    * This is a significant simplification of the real method, which actually drops existing

http://git-wip-us.apache.org/repos/asf/spark/blob/bb88ad4e/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 1425581..c821054 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -280,4 +280,27 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
     assert(evictedBlocks.nonEmpty)
   }
 
+  test("SPARK-15260: atomically resize memory pools") {
+    val conf = new SparkConf()
+      .set("spark.memory.fraction", "1")
+      .set("spark.memory.storageFraction", "0")
+      .set("spark.testing.memory", "1000")
+    val mm = UnifiedMemoryManager(conf, numCores = 2)
+    makeBadMemoryStore(mm)
+    val memoryMode = MemoryMode.ON_HEAP
+    // Acquire 1000 then release 600 bytes of storage memory, leaving the
+    // storage memory pool at 1000 bytes but only 400 bytes of which are used.
+    assert(mm.acquireStorageMemory(dummyBlock, 1000L, memoryMode))
+    mm.releaseStorageMemory(600L, memoryMode)
+    // Before the fix for SPARK-15260, we would first shrink the storage pool by the amount
of
+    // unused storage memory (600 bytes), try to evict blocks, then enlarge the execution
pool
+    // by the same amount. If the eviction threw an exception, then we would shrink one pool
+    // without enlarging the other, resulting in an assertion failure.
+    intercept[RuntimeException] {
+      mm.acquireExecutionMemory(1000L, 0, memoryMode)
+    }
+    val assertInvariants = PrivateMethod[Unit]('assertInvariants)
+    mm.invokePrivate[Unit](assertInvariants())
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message