Return-Path: X-Original-To: apmail-spark-commits-archive@minotaur.apache.org Delivered-To: apmail-spark-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 3297110579 for ; Thu, 26 Sep 2013 22:02:19 +0000 (UTC) Received: (qmail 61494 invoked by uid 500); 26 Sep 2013 22:02:19 -0000 Delivered-To: apmail-spark-commits-archive@spark.apache.org Received: (qmail 61325 invoked by uid 500); 26 Sep 2013 22:02:18 -0000 Mailing-List: contact commits-help@spark.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@spark.incubator.apache.org Delivered-To: mailing list commits@spark.incubator.apache.org Received: (qmail 61295 invoked by uid 99); 26 Sep 2013 22:02:18 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Sep 2013 22:02:18 +0000 X-ASF-Spam-Status: No, hits=-2002.3 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Thu, 26 Sep 2013 22:02:14 +0000 Received: (qmail 59470 invoked by uid 99); 26 Sep 2013 22:01:52 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Sep 2013 22:01:52 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D9ABD90B41C; Thu, 26 Sep 2013 22:01:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rxin@apache.org To: commits@spark.incubator.apache.org Date: Thu, 26 Sep 2013 22:01:56 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [06/11] git commit: Merge pull request #7 from wannabeast/memorystore-fixes X-Virus-Checked: Checked by ClamAV on apache.org Merge pull request #7 from wannabeast/memorystore-fixes some minor fixes to MemoryStore This is a repeat of #5, moved to its own branch in my repo. This makes all updates to on ; it skips on synchronizing the reads where it can get away with it. (cherry picked from commit 560ee5c9bba3f9fde380c831d0c6701343b2fecf) Signed-off-by: Reynold Xin Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8cbc96bd Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8cbc96bd Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8cbc96bd Branch: refs/heads/branch-0.8 Commit: 8cbc96bd49af07953ab921546e56b4e04b8b760e Parents: 240ca93 Author: Reynold Xin Authored: Thu Sep 26 11:27:34 2013 -0700 Committer: Reynold Xin Committed: Thu Sep 26 13:16:05 2013 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/storage/MemoryStore.scala | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8cbc96bd/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 3b3b234..77a39c7 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -30,10 +30,10 @@ import org.apache.spark.util.{SizeEstimator, Utils} private class MemoryStore(blockManager: BlockManager, maxMemory: Long) extends BlockStore(blockManager) { - case class Entry(value: Any, size: Long, deserialized: Boolean, var dropPending: Boolean = false) + case class Entry(value: Any, size: Long, deserialized: Boolean) private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true) - private var currentMemory = 0L + @volatile private var currentMemory = 0L // Object used to ensure that only one thread is putting blocks and if necessary, dropping // blocks from the memory store. private val putLock = new Object() @@ -110,9 +110,8 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def remove(blockId: String): Boolean = { entries.synchronized { - val entry = entries.get(blockId) + val entry = entries.remove(blockId) if (entry != null) { - entries.remove(blockId) currentMemory -= entry.size logInfo("Block %s of size %d dropped from memory (free %d)".format( blockId, entry.size, freeMemory)) @@ -126,6 +125,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) override def clear() { entries.synchronized { entries.clear() + currentMemory = 0 } logInfo("MemoryStore cleared") } @@ -160,8 +160,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) putLock.synchronized { if (ensureFreeSpace(blockId, size)) { val entry = new Entry(value, size, deserialized) - entries.synchronized { entries.put(blockId, entry) } - currentMemory += size + entries.synchronized { + entries.put(blockId, entry) + currentMemory += size + } if (deserialized) { logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format( blockId, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))