spark-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zsxw...@apache.org
Subject spark git commit: [SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap non-recursively
Date Tue, 31 Oct 2017 18:53:54 GMT
Repository: spark
Updated Branches:
  refs/heads/master 7986cc09b -> 73231860b


[SPARK-22305] Write HDFSBackedStateStoreProvider.loadMap non-recursively

## What changes were proposed in this pull request?
Write HDFSBackedStateStoreProvider.loadMap non-recursively. This prevents stack overflow if
too many deltas stack up in a low memory environment.

## How was this patch tested?

existing unit tests for functional equivalence, new unit test to check for stack overflow

Author: Jose Torres <jose@databricks.com>

Closes #19611 from joseph-torres/SPARK-22305.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73231860
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73231860
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73231860

Branch: refs/heads/master
Commit: 73231860baaa40f6001db347e5dcb6b5bb65e032
Parents: 7986cc0
Author: Jose Torres <jose@databricks.com>
Authored: Tue Oct 31 11:53:50 2017 -0700
Committer: Shixiong Zhu <zsxwing@gmail.com>
Committed: Tue Oct 31 11:53:50 2017 -0700

----------------------------------------------------------------------
 .../state/HDFSBackedStateStoreProvider.scala    | 45 ++++++++++++++++----
 1 file changed, 36 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/73231860/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 36d6569..3f5002a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -297,17 +297,44 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider
wit
 
   /** Load the required version of the map data from the backing files */
   private def loadMap(version: Long): MapType = {
-    if (version <= 0) return new MapType
-    synchronized { loadedMaps.get(version) }.getOrElse {
-      val mapFromFile = readSnapshotFile(version).getOrElse {
-        val prevMap = loadMap(version - 1)
-        val newMap = new MapType(prevMap)
-        updateFromDeltaFile(version, newMap)
-        newMap
+
+    // Shortcut if the map for this version is already there to avoid a redundant put.
+    val loadedCurrentVersionMap = synchronized { loadedMaps.get(version) }
+    if (loadedCurrentVersionMap.isDefined) {
+      return loadedCurrentVersionMap.get
+    }
+    val snapshotCurrentVersionMap = readSnapshotFile(version)
+    if (snapshotCurrentVersionMap.isDefined) {
+      synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
+      return snapshotCurrentVersionMap.get
+    }
+
+    // Find the most recent map before this version that we can.
+    // [SPARK-22305] This must be done iteratively to avoid stack overflow.
+    var lastAvailableVersion = version
+    var lastAvailableMap: Option[MapType] = None
+    while (lastAvailableMap.isEmpty) {
+      lastAvailableVersion -= 1
+
+      if (lastAvailableVersion <= 0) {
+        // Use an empty map for versions 0 or less.
+        lastAvailableMap = Some(new MapType)
+      } else {
+        lastAvailableMap =
+          synchronized { loadedMaps.get(lastAvailableVersion) }
+            .orElse(readSnapshotFile(lastAvailableVersion))
       }
-      loadedMaps.put(version, mapFromFile)
-      mapFromFile
     }
+
+    // Load all the deltas from the version after the last available one up to the target
version.
+    // The last available version is the one with a full snapshot, so it doesn't need deltas.
+    val resultMap = new MapType(lastAvailableMap.get)
+    for (deltaVersion <- lastAvailableVersion + 1 to version) {
+      updateFromDeltaFile(deltaVersion, resultMap)
+    }
+
+    synchronized { loadedMaps.put(version, resultMap) }
+    resultMap
   }
 
   private def writeUpdateToDeltaFile(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


Mime
View raw message