spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ngone51 <...@git.apache.org>
Subject [GitHub] spark pull request #19285: [SPARK-22068][CORE]Reduce the duplicate code betw...
Date Fri, 19 Jan 2018 07:10:37 GMT
Github user Ngone51 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19285#discussion_r162549759
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala ---
    @@ -261,37 +263,93 @@ private[spark] class MemoryStore(
               // If this task attempt already owns more unroll memory than is necessary to
store the
               // block, then release the extra memory that will not be used.
               val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
    -          releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
    +          releaseUnrollMemoryForThisTask(memoryMode, excessUnrollMemory)
               transferUnrollToStorage(size)
               true
             }
           }
    +
           if (enoughStorageMemory) {
             entries.synchronized {
    -          entries.put(blockId, entry)
    +          entries.put(blockId, createMemoryEntry())
             }
             logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
               blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
             Right(size)
           } else {
             assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
               "released too much unroll memory")
    +        Left(unrollMemoryUsedByThisBlock)
    +      }
    +    } else {
    +      Left(unrollMemoryUsedByThisBlock)
    +    }
    +  }
    +
    +  /**
    +   * Attempt to put the given block in memory store as values.
    +   *
    +   * It's possible that the iterator is too large to materialize and store in memory.
To avoid
    +   * OOM exceptions, this method will gradually unroll the iterator while periodically
checking
    +   * whether there is enough free memory. If the block is successfully materialized,
then the
    +   * temporary unroll memory used during the materialization is "transferred" to storage
memory,
    +   * so we won't acquire more memory than is actually needed to store the block.
    +   *
    +   * @return in case of success, the estimated size of the stored data. In case of failure,
return
    +   *         an iterator containing the values of the block. The returned iterator will
be backed
    +   *         by the combination of the partially-unrolled block and the remaining elements
of the
    +   *         original input iterator. The caller must either fully consume this iterator
or call
    +   *         `close()` on it in order to free the storage memory consumed by the partially-unrolled
    +   *         block.
    +   */
    +  private[storage] def putIteratorAsValues[T](
    +      blockId: BlockId,
    +      values: Iterator[T],
    +      classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
    +
    +    // Underlying vector for unrolling the block
    +    var vector = new SizeTrackingVector[T]()(classTag)
    +    var arrayValues: Array[T] = null
    +    var preciseSize: Long = -1
    +
    +    def storeValue(value: T): Unit = {
    +      vector += value
    +    }
    +
    +    def estimateSize(precise: Boolean): Long = {
    +      if (precise) {
    +        // We only call need the precise size after all values unrolled.
    +        arrayValues = vector.toArray
    +        preciseSize = SizeEstimator.estimate(arrayValues)
    +        preciseSize
    +      } else {
    +        vector.estimateSize()
    +      }
    +    }
    +
    +    def createMemoryEntry(): MemoryEntry[T] = {
    +      // We successfully unrolled the entirety of this block
    +      DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
    +    }
    +
    +    putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, storeValue,
    +      estimateSize, createMemoryEntry) match {
    +      case Right(storedSize) => Right(storedSize)
    +      case Left(unrollMemoryUsedByThisBlock) =>
    +        // We ran out of space while unrolling the values for this block
    +        val (unrolledIterator, size) = if (vector != null) {
    --- End diff --
    
    Under what situation will vector be null ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message