Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id F2405200D2E for ; Tue, 31 Oct 2017 19:53:55 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id F0C5E1609EF; Tue, 31 Oct 2017 18:53:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 426581609E6 for ; Tue, 31 Oct 2017 19:53:55 +0100 (CET) Received: (qmail 62434 invoked by uid 500); 31 Oct 2017 18:53:54 -0000 Mailing-List: contact commits-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list commits@spark.apache.org Received: (qmail 62425 invoked by uid 99); 31 Oct 2017 18:53:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 31 Oct 2017 18:53:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 41620DFC25; Tue, 31 Oct 2017 18:53:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zsxwing@apache.org To: commits@spark.apache.org Message-Id: <7590fd04c0e1421d99ce41574dec9d7e@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: spark git commit: [SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap non-recursively Date: Tue, 31 Oct 2017 18:53:54 +0000 (UTC) archived-at: Tue, 31 Oct 2017 18:53:56 -0000 Repository: spark Updated Branches: refs/heads/master 7986cc09b -> 73231860b [SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap non-recursively ## What changes were proposed in this pull request? Write HDFSBackedStateStoreProvider.loadMap non-recursively. This prevents stack overflow if too many deltas stack up in a low memory environment. ## How was this patch tested? existing unit tests for functional equivalence, new unit test to check for stack overflow Author: Jose Torres Closes #19611 from joseph-torres/SPARK-22305. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73231860 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73231860 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73231860 Branch: refs/heads/master Commit: 73231860baaa40f6001db347e5dcb6b5bb65e032 Parents: 7986cc0 Author: Jose Torres Authored: Tue Oct 31 11:53:50 2017 -0700 Committer: Shixiong Zhu Committed: Tue Oct 31 11:53:50 2017 -0700 ---------------------------------------------------------------------- .../state/HDFSBackedStateStoreProvider.scala | 45 ++++++++++++++++---- 1 file changed, 36 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/73231860/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala index 36d6569..3f5002a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala @@ -297,17 +297,44 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit /** Load the required version of the map data from the backing files */ private def loadMap(version: Long): MapType = { - if (version <= 0) return new MapType - synchronized { loadedMaps.get(version) }.getOrElse { - val mapFromFile = readSnapshotFile(version).getOrElse { - val prevMap = loadMap(version - 1) - val newMap = new MapType(prevMap) - updateFromDeltaFile(version, newMap) - newMap + + // Shortcut if the map for this version is already there to avoid a redundant put. + val loadedCurrentVersionMap = synchronized { loadedMaps.get(version) } + if (loadedCurrentVersionMap.isDefined) { + return loadedCurrentVersionMap.get + } + val snapshotCurrentVersionMap = readSnapshotFile(version) + if (snapshotCurrentVersionMap.isDefined) { + synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) } + return snapshotCurrentVersionMap.get + } + + // Find the most recent map before this version that we can. + // [SPARK-22305] This must be done iteratively to avoid stack overflow. + var lastAvailableVersion = version + var lastAvailableMap: Option[MapType] = None + while (lastAvailableMap.isEmpty) { + lastAvailableVersion -= 1 + + if (lastAvailableVersion <= 0) { + // Use an empty map for versions 0 or less. + lastAvailableMap = Some(new MapType) + } else { + lastAvailableMap = + synchronized { loadedMaps.get(lastAvailableVersion) } + .orElse(readSnapshotFile(lastAvailableVersion)) } - loadedMaps.put(version, mapFromFile) - mapFromFile } + + // Load all the deltas from the version after the last available one up to the target version. + // The last available version is the one with a full snapshot, so it doesn't need deltas. + val resultMap = new MapType(lastAvailableMap.get) + for (deltaVersion <- lastAvailableVersion + 1 to version) { + updateFromDeltaFile(deltaVersion, resultMap) + } + + synchronized { loadedMaps.put(version, resultMap) } + resultMap } private def writeUpdateToDeltaFile( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org For additional commands, e-mail: commits-help@spark.apache.org