spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From joshro...@apache.org
Subject spark git commit: [SPARK-17465][SPARK CORE] Inappropriate memory management in `org.apache.spark.storage.MemoryStore` may lead to memory leak
Date Wed, 14 Sep 2016 20:47:03 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 bf3f6d2f1 -> a447cd888


[SPARK-17465][SPARK CORE] Inappropriate memory management in `org.apache.spark.storage.MemoryStore`
may lead to memory leak

## What changes were proposed in this pull request?

The expression like `if (memoryMap(taskAttemptId) == 0) memoryMap.remove(taskAttemptId)` in
method `releaseUnrollMemoryForThisTask` and `releasePendingUnrollMemoryForThisTask` should
be called after release memory operation, whatever `memoryToRelease` is > 0 or not.

If the memory of a task has been set to 0 when calling a `releaseUnrollMemoryForThisTask`
or a `releasePendingUnrollMemoryForThisTask` method, the key in the memory map corresponding
to that task will never be removed from the hash map.

See the details in [SPARK-17465](https://issues.apache.org/jira/browse/SPARK-17465).

Author: Xing SHI <shi-kou@indetail.co.jp>

Closes #15022 from saturday-shi/SPARK-17465.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a447cd88
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a447cd88
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a447cd88

Branch: refs/heads/branch-1.6
Commit: a447cd88897bc3d76eee0e8757e6545019704f30
Parents: bf3f6d2
Author: Xing SHI <shi-kou@indetail.co.jp>
Authored: Wed Sep 14 13:46:46 2016 -0700
Committer: Josh Rosen <joshrosen@databricks.com>
Committed: Wed Sep 14 13:46:46 2016 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/scheduler/Task.scala    |  1 +
 .../scala/org/apache/spark/storage/MemoryStore.scala    | 12 ++++++------
 2 files changed, 7 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a447cd88/core/src/main/scala/org/apache/spark/scheduler/Task.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
index c7b1199..2f4225e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala
@@ -104,6 +104,7 @@ private[spark] abstract class Task[T](
         Utils.tryLogNonFatalError {
           // Release memory used by this thread for unrolling blocks
           SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
+          SparkEnv.get.blockManager.memoryStore.releasePendingUnrollMemoryForThisTask()
           // Notify any tasks waiting for execution memory to be freed to wake up and try
to
           // acquire memory again. This makes impossible the scenario where a task sleeps
forever
           // because there are no other tasks left to notify it. Since this is safe to do
but may

http://git-wip-us.apache.org/repos/asf/spark/blob/a447cd88/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index aed0da9..1113160 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -511,11 +511,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager:
Memo
         val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
         if (memoryToRelease > 0) {
           unrollMemoryMap(taskAttemptId) -= memoryToRelease
-          if (unrollMemoryMap(taskAttemptId) == 0) {
-            unrollMemoryMap.remove(taskAttemptId)
-          }
           memoryManager.releaseUnrollMemory(memoryToRelease)
         }
+        if (unrollMemoryMap(taskAttemptId) == 0) {
+          unrollMemoryMap.remove(taskAttemptId)
+        }
       }
     }
   }
@@ -530,11 +530,11 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager:
Memo
         val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId))
         if (memoryToRelease > 0) {
           pendingUnrollMemoryMap(taskAttemptId) -= memoryToRelease
-          if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
-            pendingUnrollMemoryMap.remove(taskAttemptId)
-          }
           memoryManager.releaseUnrollMemory(memoryToRelease)
         }
+        if (pendingUnrollMemoryMap(taskAttemptId) == 0) {
+          pendingUnrollMemoryMap.remove(taskAttemptId)
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message