flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [5/5] flink git commit: [FLINK-5283] Fix closing streams when restoring old savepoint in keyed backends
Date Wed, 14 Dec 2016 16:58:04 GMT
[FLINK-5283] Fix closing streams when restoring old savepoint in keyed backends


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35f4ea78
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35f4ea78
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35f4ea78

Branch: refs/heads/master
Commit: 35f4ea787c55eceede5154fc1ff23c70cdc522b4
Parents: bf2874e
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Wed Dec 7 21:23:35 2016 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Wed Dec 14 17:50:51 2016 +0100

----------------------------------------------------------------------
 .../contrib/streaming/state/RocksDBKeyedStateBackend.java    | 8 +++++---
 .../flink/runtime/state/heap/HeapKeyedStateBackend.java      | 6 ++++--
 2 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35f4ea78/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 8637f6b..5fef5e5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1090,8 +1090,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		}
 
 		Preconditions.checkState(1 == restoreState.size(), "Only one element expected here.");
-		HashMap<String, RocksDBStateBackend.FinalFullyAsyncSnapshot> namedStates =
-				InstantiationUtil.deserializeObject(restoreState.iterator().next().openInputStream(),
userCodeClassLoader);
+		HashMap<String, RocksDBStateBackend.FinalFullyAsyncSnapshot> namedStates;
+		try (FSDataInputStream inputStream = restoreState.iterator().next().openInputStream())
{
+			namedStates = InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader);
+		}
 
 		Preconditions.checkState(1 == namedStates.size(), "Only one element expected here.");
 		DataInputView inputView = namedStates.values().iterator().next().stateHandle.getState(userCodeClassLoader);
@@ -1101,7 +1103,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 		// first get the column family mapping
 		int numColumns = inputView.readInt();
-		Map<Byte, StateDescriptor> columnFamilyMapping = new HashMap<>(numColumns);
+		Map<Byte, StateDescriptor<?, ?>> columnFamilyMapping = new HashMap<>(numColumns);
 		for (int i = 0; i < numColumns; i++) {
 			byte mappingByte = inputView.readByte();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/35f4ea78/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index aab2ee5..6e85b72 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -378,8 +378,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 		Preconditions.checkState(1 == stateHandles.size(), "Only one element expected here.");
 
-		HashMap<String, KvStateSnapshot<K, ?, ?, ?>> namedStates =
-				InstantiationUtil.deserializeObject(stateHandles.iterator().next().openInputStream(),
userCodeClassLoader);
+		HashMap<String, KvStateSnapshot<K, ?, ?, ?>> namedStates;
+		try (FSDataInputStream inputStream = stateHandles.iterator().next().openInputStream())
{
+			namedStates = InstantiationUtil.deserializeObject(inputStream, userCodeClassLoader);
+		}
 
 		for (Map.Entry<String, KvStateSnapshot<K, ?, ?, ?>> nameToState : namedStates.entrySet())
{
 


Mime
View raw message