spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andrewor14 <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-13696] Remove BlockStore class & simpli...
Date Thu, 10 Mar 2016 01:02:44 GMT
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11534#discussion_r55619576
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -428,77 +458,46 @@ private[spark] class BlockManager(
           Option(
             shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
         } else {
    -      doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
    -    }
    -  }
    -
    -  private def doGetLocal(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
    -    blockInfoManager.lockForReading(blockId) match {
    -      case None =>
    -        logDebug(s"Block $blockId was not found")
    -        None
    -      case Some(info) =>
    -        doGetLocal(blockId, info, asBlockResult)
    +      blockInfoManager.lockForReading(blockId) match {
    +        case None =>
    +          logDebug(s"Block $blockId was not found")
    +          None
    +        case Some(info) =>
    +          Some(doGetLocalBytes(blockId, info))
    +      }
         }
       }
     
       /**
    -   * Get a local block from the block manager.
    -   * Assumes that the caller holds a read lock on the block.
    +   * Get block from the local block manager as serialized bytes.
    +   *
    +   * Must be called while holding a read lock on the block.
    +   * Releases the read lock upon exception; keeps the read lock upon successful return.
        */
    -  private def doGetLocal(
    -      blockId: BlockId,
    -      info: BlockInfo,
    -      asBlockResult: Boolean): Option[Any] = {
    +  private def doGetLocalBytes(blockId: BlockId, info: BlockInfo): ByteBuffer = {
         val level = info.level
         logDebug(s"Level for block $blockId is $level")
    -
    -    // Look for the block in memory
    -    if (level.useMemory) {
    -      logDebug(s"Getting block $blockId from memory")
    -      val result = if (asBlockResult) {
    -        memoryStore.getValues(blockId).map { iter =>
    -          val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
    -          new BlockResult(ci, DataReadMethod.Memory, info.size)
    -        }
    +    // In order, try to read the serialized bytes from memory, then from disk, then fall
back to
    +    // serializing in-memory objects, and, finally, throw an exception if the block does
not exist.
    +    if (level.deserialized) {
    +      // Try to avoid expensive serialization by reading a pre-serialized copy from disk:
    +      if (level.useDisk && diskStore.contains(blockId)) {
    +        diskStore.getBytes(blockId)
    --- End diff --
    
    yeah, that's what I meant


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message