spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r...@apache.org
Subject git commit: Merge pull request #139 from aarondav/shuffle-next
Date Tue, 05 Nov 2013 04:48:14 GMT
Updated Branches:
  refs/heads/branch-0.8 518cf22eb -> e80d1cfdd


Merge pull request #139 from aarondav/shuffle-next

Never store shuffle blocks in BlockManager

After the BlockId refactor (PR #114), it became very clear that ShuffleBlocks are of no use
within BlockManager (they had a no-arg constructor!). This patch completely eliminates
them, saving us around 100-150 bytes per shuffle block.
The total, system-wide overhead per shuffle block is now a flat 8 bytes, excluding
state saved by the MapOutputTracker.

Note: This should *not* be merged directly into 0.8.0 -- see #138
(cherry picked from commit 81065321c073fa35028b8aa110704385cba6d5b2)
Signed-off-by: Reynold Xin <rxin@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e80d1cfd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e80d1cfd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e80d1cfd

Branch: refs/heads/branch-0.8
Commit: e80d1cfdd38637f5711c7c796a1675b3f39ff196
Parents: 518cf22
Author: Reynold Xin <rxin@apache.org>
Authored: Mon Nov 4 20:47:14 2013 -0800
Committer: Reynold Xin <rxin@apache.org>
Committed: Mon Nov 4 20:47:44 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/storage/BlockInfo.scala      | 18 +-----------------
 .../org/apache/spark/storage/BlockManager.scala   | 10 ++--------
 .../apache/spark/storage/BlockObjectWriter.scala  | 12 +-----------
 3 files changed, 4 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e80d1cfd/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
index dbe0bda..c8f3976 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockInfo.scala
@@ -19,9 +19,7 @@ package org.apache.spark.storage
 
 import java.util.concurrent.ConcurrentHashMap
 
-private[storage] trait BlockInfo {
-  def level: StorageLevel
-  def tellMaster: Boolean
+private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolean) {
   // To save space, 'pending' and 'failed' are encoded as special sizes:
   @volatile var size: Long = BlockInfo.BLOCK_PENDING
   private def pending: Boolean = size == BlockInfo.BLOCK_PENDING
@@ -81,17 +79,3 @@ private object BlockInfo {
   private val BLOCK_PENDING: Long = -1L
   private val BLOCK_FAILED: Long = -2L
 }
-
-// All shuffle blocks have the same `level` and `tellMaster` properties,
-// so we can save space by not storing them in each instance:
-private[storage] class ShuffleBlockInfo extends BlockInfo {
-  // These need to be defined using 'def' instead of 'val' in order for
-  // the compiler to eliminate the fields:
-  def level: StorageLevel = StorageLevel.DISK_ONLY
-  def tellMaster: Boolean = false
-}
-
-private[storage] class BlockInfoImpl(val level: StorageLevel, val tellMaster: Boolean)
-  extends BlockInfo {
-  // Intentionally left blank
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e80d1cfd/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ccc05f5..16d5358 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -520,13 +520,7 @@ private[spark] class BlockManager(
   def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int)
     : BlockObjectWriter = {
     val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
-    val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
-    writer.registerCloseEventHandler(() => {
-      val myInfo = new ShuffleBlockInfo()
-      blockInfo.put(blockId, myInfo)
-      myInfo.markReady(writer.fileSegment().length)
-    })
-    writer
+    new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
   }
 
   /**
@@ -549,7 +543,7 @@ private[spark] class BlockManager(
     // to be dropped right after it got put into memory. Note, however, that other threads
will
     // not be able to get() this block until we call markReady on its BlockInfo.
     val myInfo = {
-      val tinfo = new BlockInfoImpl(level, tellMaster)
+      val tinfo = new BlockInfo(level, tellMaster)
       // Do atomically !
       val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/e80d1cfd/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
index e49c191..469e68f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala
@@ -34,20 +34,12 @@ import org.apache.spark.serializer.{SerializationStream, Serializer}
  */
 abstract class BlockObjectWriter(val blockId: BlockId) {
 
-  var closeEventHandler: () => Unit = _
-
   def open(): BlockObjectWriter
 
-  def close() {
-    closeEventHandler()
-  }
+  def close()
 
   def isOpen: Boolean
 
-  def registerCloseEventHandler(handler: () => Unit) {
-    closeEventHandler = handler
-  }
-
   /**
    * Flush the partial writes and commit them as a single atomic block. Return the
    * number of bytes written for this commit.
@@ -146,8 +138,6 @@ class DiskBlockObjectWriter(
       ts = null
       objOut = null
     }
-    // Invoke the close callback handler.
-    super.close()
   }
 
   override def isOpen: Boolean = objOut != null


Mime
View raw message