spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject [1/4] git commit: Restructure BlockInfo fields to reduce memory use.
Date Wed, 30 Oct 2013 06:47:15 GMT
Updated Branches:
  refs/heads/master f0e23a023 -> 745dc4290


Restructure BlockInfo fields to reduce memory use.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2d7cf6a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2d7cf6a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2d7cf6a2

Branch: refs/heads/master
Commit: 2d7cf6a271dbd494f1d351e6db7db8568733edc3
Parents: aec9bf9
Author: Josh Rosen <joshrosen@apache.org>
Authored: Sun Oct 27 12:51:54 2013 -0700
Committer: Josh Rosen <joshrosen@apache.org>
Committed: Sun Oct 27 23:01:03 2013 -0700

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockManager.scala | 32 +++++++++++---------
 1 file changed, 17 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2d7cf6a2/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index dbe573d..285cf02 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.storage
 
 import java.io.{InputStream, OutputStream}
 import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.util.concurrent.ConcurrentHashMap
 
 import scala.collection.mutable.{HashMap, ArrayBuffer}
 import scala.util.Random
@@ -46,11 +47,17 @@ private[spark] class BlockManager(
     maxMemory: Long)
   extends Logging {
 
+
+  // initThread is logically a BlockInfo field, but we store it here because
+  // it's only needed while this block is in the 'pending' state and we want
+  // to minimize BlockInfo's memory footprint.
+  private val blockInfoInitThreads = new ConcurrentHashMap[BlockInfo, Thread]
+
   private class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
-    @volatile var pending: Boolean = true
-    @volatile var size: Long = -1L
-    @volatile var initThread: Thread = null
-    @volatile var failed = false
+    @volatile var size: Long = -1L  // also encodes 'pending' and 'failed' to save space
+    private def pending: Boolean = size == -1L
+    private def failed: Boolean = size == -2L
+    private def initThread: Thread = blockInfoInitThreads.get(this)
 
     setInitThread()
 
@@ -58,7 +65,7 @@ private[spark] class BlockManager(
       // Set current thread as init thread - waitForReady will not block this thread
       // (in case there is non trivial initialization which ends up calling waitForReady
as part of
       // initialization itself)
-      this.initThread = Thread.currentThread()
+      blockInfoInitThreads.put(this, Thread.currentThread())
     }
 
     /**
@@ -66,7 +73,7 @@ private[spark] class BlockManager(
      * Return true if the block is available, false otherwise.
      */
     def waitForReady(): Boolean = {
-      if (initThread != Thread.currentThread() && pending) {
+      if (pending && initThread != Thread.currentThread()) {
         synchronized {
           while (pending) this.wait()
         }
@@ -76,12 +83,10 @@ private[spark] class BlockManager(
 
     /** Mark this BlockInfo as ready (i.e. block is finished writing) */
     def markReady(sizeInBytes: Long) {
+      require (sizeInBytes >= 0, "sizeInBytes was negative: " + sizeInBytes)
       assert (pending)
       size = sizeInBytes
-      initThread = null
-      failed = false
-      initThread = null
-      pending = false
+      blockInfoInitThreads.remove(this)
       synchronized {
         this.notifyAll()
       }
@@ -90,11 +95,8 @@ private[spark] class BlockManager(
     /** Mark this BlockInfo as ready but failed */
     def markFailure() {
       assert (pending)
-      size = 0
-      initThread = null
-      failed = true
-      initThread = null
-      pending = false
+      size = -2L
+      blockInfoInitThreads.remove(this)
       synchronized {
         this.notifyAll()
       }


Mime
View raw message