From commits-return-29950-archive-asf-public=cust-asf.ponee.io@spark.apache.org Thu Jan 25 08:26:01 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 5211A180651 for ; Thu, 25 Jan 2018 08:26:01 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4204D160C3D; Thu, 25 Jan 2018 07:26:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 890E3160C13 for ; Thu, 25 Jan 2018 08:26:00 +0100 (CET) Received: (qmail 23715 invoked by uid 500); 25 Jan 2018 07:25:59 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 23706 invoked by uid 99); 25 Jan 2018 07:25:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Jan 2018 07:25:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7A8DBE0287; Thu, 25 Jan 2018 07:25:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wenchen@apache.org To: commits@spark.apache.org Message-Id: <577233c7c3e24fd2953d574fe76632b1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init lazily Date: Thu, 25 Jan 2018 07:25:59 +0000 (UTC) Repository: spark Updated Branches: refs/heads/branch-2.3 a857ad566 -> 012695256 [SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init lazily ## What changes were proposed in this pull request? Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times. We can avoid this by making deserializeStream init when it is used the first time. This patch make deserializeStream init lazily. ## How was this patch tested? Exist tests Author: zhoukang Closes #20292 from caneGuy/zhoukang/lay-diskmapiterator. (cherry picked from commit 45b4bbfddc18a77011c3bc1bfd71b2cd3466443c) Signed-off-by: Wenchen Fan Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01269525 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01269525 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01269525 Branch: refs/heads/branch-2.3 Commit: 012695256a61f1830ff02780611d4aada00a88a0 Parents: a857ad5 Author: zhoukang Authored: Thu Jan 25 15:24:52 2018 +0800 Committer: Wenchen Fan Committed: Thu Jan 25 15:25:46 2018 +0800 ---------------------------------------------------------------------- .../util/collection/ExternalAppendOnlyMap.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/01269525/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 375f4a6..5c6dd45 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -463,7 +463,7 @@ class ExternalAppendOnlyMap[K, V, C]( // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams - private var deserializeStream = nextBatchStream() + private var deserializeStream: DeserializationStream = null private var nextItem: (K, C) = null private var objectsRead = 0 @@ -528,7 +528,11 @@ class ExternalAppendOnlyMap[K, V, C]( override def hasNext: Boolean = { if (nextItem == null) { if (deserializeStream == null) { - return false + // In case of deserializeStream has not been initialized + deserializeStream = nextBatchStream() + if (deserializeStream == null) { + return false + } } nextItem = readNextItem() } @@ -536,19 +540,18 @@ class ExternalAppendOnlyMap[K, V, C]( } override def next(): (K, C) = { - val item = if (nextItem == null) readNextItem() else nextItem - if (item == null) { + if (!hasNext) { throw new NoSuchElementException } + val item = nextItem nextItem = null item } private def cleanup() { batchIndex = batchOffsets.length // Prevent reading any other batch - val ds = deserializeStream - if (ds != null) { - ds.close() + if (deserializeStream != null) { + deserializeStream.close() deserializeStream = null } if (fileStream != null) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org