From reviews-return-610612-archive-asf-public=cust-asf.ponee.io@spark.apache.org Wed Jan 31 15:24:58 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 2E751180662 for ; Wed, 31 Jan 2018 15:24:58 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 1CC44160C35; Wed, 31 Jan 2018 14:24:58 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8A8F6160C25 for ; Wed, 31 Jan 2018 15:24:57 +0100 (CET) Received: (qmail 63047 invoked by uid 500); 31 Jan 2018 14:24:56 -0000 Mailing-List: contact reviews-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list reviews@spark.apache.org Received: (qmail 63032 invoked by uid 99); 31 Jan 2018 14:24:56 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 31 Jan 2018 14:24:56 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E5CF5E01EC; Wed, 31 Jan 2018 14:24:55 +0000 (UTC) From: advancedxy To: reviews@spark.apache.org Reply-To: reviews@spark.apache.org References: In-Reply-To: Subject: [GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat... Content-Type: text/plain Message-Id: <20180131142455.E5CF5E01EC@git1-us-west.apache.org> Date: Wed, 31 Jan 2018 14:24:55 +0000 (UTC) Github user advancedxy commented on a diff in the pull request: https://github.com/apache/spark/pull/20449#discussion_r165066153 --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala --- @@ -104,9 +104,18 @@ private[spark] class BlockStoreShuffleReader[K, C]( context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes) + // Use completion callback to stop sorter if task was cancelled. + context.addTaskCompletionListener(tc => { + // Note: we only stop sorter if cancelled as sorter.stop wouldn't be called in + // CompletionIterator. Another way would be making sorter.stop idempotent. + if (tc.isInterrupted()) { sorter.stop() } --- End diff -- One advantage of `CompletionIterator` is that the `completionFunction` will be called as soon as the wrapped iterator is consumed. So for sorter, it will release memory earlier rather than at task completion. As for job cancelling, It's not just `CompletionIterator` that we should consider. The combiner and sorter pattern(or similar) is something we should look for: ``` scala combiner.insertAll(iterator) // or sorter.insertAll(iterator) // then returns new iterator combiner.iterator // or sorter.iterator ``` --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org For additional commands, e-mail: reviews-help@spark.apache.org