spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenc...@apache.org
Subject spark git commit: [SPARK-23508][CORE] Fix BlockmanagerId in case blockManagerIdCache cause oom
Date Wed, 28 Feb 2018 15:16:54 GMT
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 fe9cb4afe -> dfa43792f


[SPARK-23508][CORE] Fix BlockmanagerId in case blockManagerIdCache cause oom

… cause oom

## What changes were proposed in this pull request?
blockManagerIdCache in BlockManagerId will not remove old values which may cause oom

`val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()`
Since whenever we apply a new BlockManagerId, it will put into this map.

This patch will use guava cahce for  blockManagerIdCache instead.

A heap dump show in [SPARK-23508](https://issues.apache.org/jira/browse/SPARK-23508)

## How was this patch tested?
Exist tests.

Author: zhoukang <zhoukang199191@gmail.com>

Closes #20667 from caneGuy/zhoukang/fix-history.

(cherry picked from commit 6a8abe29ef3369b387d9bc2ee3459a6611246ab1)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dfa43792
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dfa43792
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dfa43792

Branch: refs/heads/branch-2.3
Commit: dfa43792feb78b4cc3776606b3a13eff3586fbb1
Parents: fe9cb4a
Author: zhoukang <zhoukang199191@gmail.com>
Authored: Wed Feb 28 23:16:29 2018 +0800
Committer: Wenchen Fan <wenchen@databricks.com>
Committed: Wed Feb 28 23:16:47 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManagerId.scala     | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dfa43792/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index 2c3da0e..d4a59c3 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -18,7 +18,8 @@
 package org.apache.spark.storage
 
 import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
-import java.util.concurrent.ConcurrentHashMap
+
+import com.google.common.cache.{CacheBuilder, CacheLoader}
 
 import org.apache.spark.SparkContext
 import org.apache.spark.annotation.DeveloperApi
@@ -132,10 +133,17 @@ private[spark] object BlockManagerId {
     getCachedBlockManagerId(obj)
   }
 
-  val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
+  /**
+   * The max cache size is hardcoded to 10000, since the size of a BlockManagerId
+   * object is about 48B, the total memory cost should be below 1MB which is feasible.
+   */
+  val blockManagerIdCache = CacheBuilder.newBuilder()
+    .maximumSize(10000)
+    .build(new CacheLoader[BlockManagerId, BlockManagerId]() {
+      override def load(id: BlockManagerId) = id
+    })
 
   def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {
-    blockManagerIdCache.putIfAbsent(id, id)
     blockManagerIdCache.get(id)
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message