spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From advancedxy <...@git.apache.org>
Subject [GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...
Date Wed, 31 Jan 2018 14:24:55 GMT
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r165066153
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
    @@ -104,9 +104,18 @@ private[spark] class BlockStoreShuffleReader[K, C](
             context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
             context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
             context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
    +        // Use completion callback to stop sorter if task was cancelled.
    +        context.addTaskCompletionListener(tc => {
    +          // Note: we only stop sorter if cancelled as sorter.stop wouldn't be called
in
    +          // CompletionIterator. Another way would be making sorter.stop idempotent.
    +          if (tc.isInterrupted()) { sorter.stop() }
    --- End diff --
    
    One advantage of `CompletionIterator` is that the `completionFunction` will be called
as soon as the wrapped iterator is consumed. So for sorter, it will release memory earlier
rather than at task completion.
    
    As for job cancelling, It's not just `CompletionIterator` that we should consider. The
combiner and sorter pattern(or similar) is something we should look for:
    ``` scala
    combiner.insertAll(iterator) // or sorter.insertAll(iterator)
    // then returns new iterator
    combiner.iterator // or sorter.iterator
    ```



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message