spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From JoshRosen <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-13980] Incrementally serialize blocks w...
Date Mon, 21 Mar 2016 23:28:20 GMT
Github user JoshRosen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11791#discussion_r56915182
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -244,13 +244,113 @@ private[spark] class MemoryStore(
         }
       }
     
    +  /**
    +   * Attempt to put the given block in memory store as bytes.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory.
To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically
checking
    +   * whether there is enough free memory. If the block is successfully materialized,
then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage
memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated the estimated size of the stored data.
In case of
    +   *         failure, return a handle which allows the caller to either finish the serialization
    +   *         by spilling to disk or to deserialize the partially-serialized block and
reconstruct
    +   *         the original input iterator. The caller must either fully consume this result
    +   *         iterator or call `discard()` on it in order to free the storage memory consumed
by the
    +   *         partially-unrolled block.
    +   */
    +  private[storage] def putIteratorAsBytes(
    +      blockId: BlockId,
    +      values: Iterator[Any]): Either[PartiallySerializedBlock, Long] = {
    +
    +    require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
    +
    +    // Whether there is still enough memory for us to continue unrolling this block
    +    var keepUnrolling = true
    +    // Initial per-task memory to request for unrolling blocks (bytes).
    +    val initialMemoryThreshold = unrollMemoryThreshold
    +    // Keep track of unroll memory used by this particular block / putIterator() operation
    +    var unrollMemoryUsedByThisBlock = 0L
    +    // Underlying buffer for unrolling the block
    +    val redirectableStream = new RedirectableOutputStream
    +    val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
    +    redirectableStream.setOutputStream(byteArrayChunkOutputStream)
    +    val serializationStream: SerializationStream = {
    +      val ser = blockManager.defaultSerializer.newInstance()
    +      ser.serializeStream(blockManager.wrapForCompression(blockId, redirectableStream))
    +    }
    +
    +    // Request enough memory to begin unrolling
    +    keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
    +
    +    if (!keepUnrolling) {
    +      logWarning(s"Failed to reserve initial memory threshold of " +
    +        s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId
in memory.")
    +    } else {
    +      unrollMemoryUsedByThisBlock += initialMemoryThreshold
    +    }
    +
    +    def reserveAdditionalMemoryIfNecessary(): Unit = {
    +      if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
    --- End diff --
    
    One important implicit assumption which I will make explicit in a line comment: we assume
that we'll always be able to get enough memory to unroll at least one element in between size
calculation. This is the same assumption that we have in the deserialized case, since we only
periodically measure memory usage there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message