flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [2/3] flink git commit: [FLINK-8699][state] Deep copy state info to avoid potential concurrency problem in full checkpoint.
Date Fri, 06 Apr 2018 10:34:25 GMT
[FLINK-8699][state] Deep copy state info to avoid potential concurrency problem in full checkpoint.

(cherry picked from commit 21cf59d)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cbad9cf3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cbad9cf3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cbad9cf3

Branch: refs/heads/release-1.5
Commit: cbad9cf34095ef0d709df9b1521387d8fea38f4a
Parents: 340ee26
Author: sihuazhou <summerleafs@163.com>
Authored: Fri Mar 16 16:07:54 2018 +0100
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Fri Apr 6 12:33:41 2018 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 36 +++++++++++++-------
 1 file changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cbad9cf3/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index cdeb608..31b9d99 100644
--- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -1836,7 +1836,13 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 		private Snapshot snapshot;
 		private ReadOptions readOptions;
-		private List<Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?,
?>>> kvStateInformationCopy;
+
+		/** The state meta data. */
+		private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots;
+
+		/** The copied column handle. */
+		private List<ColumnFamilyHandle> copiedColumnFamilyHandles;
+
 		private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
 
 		private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
@@ -1860,7 +1866,19 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 		 */
 		public void takeDBSnapShot() {
 			Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
-			this.kvStateInformationCopy = new ArrayList<>(stateBackend.kvStateInformation.values());
+
+			this.stateMetaInfoSnapshots = new ArrayList<>(stateBackend.kvStateInformation.size());
+
+			this.copiedColumnFamilyHandles = new ArrayList<>(stateBackend.kvStateInformation.size());
+
+			for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>
tuple2 :
+				stateBackend.kvStateInformation.values()) {
+				// snapshot meta info
+				this.stateMetaInfoSnapshots.add(tuple2.f1.snapshot());
+
+				// copy column family handle
+				this.copiedColumnFamilyHandles.add(tuple2.f0);
+			}
 			this.snapshot = stateBackend.db.getSnapshot();
 		}
 
@@ -1946,10 +1964,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 
 		private void writeKVStateMetaData() throws IOException {
 
-			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots
=
-				new ArrayList<>(kvStateInformationCopy.size());
-
-			this.kvStateIterators = new ArrayList<>(kvStateInformationCopy.size());
+			this.kvStateIterators = new ArrayList<>(copiedColumnFamilyHandles.size());
 
 			int kvStateId = 0;
 
@@ -1957,13 +1972,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 			readOptions = new ReadOptions();
 			readOptions.setSnapshot(snapshot);
 
-			for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>
column :
-				kvStateInformationCopy) {
-
-				metaInfoSnapshots.add(column.f1.snapshot());
+			for (ColumnFamilyHandle columnFamilyHandle : copiedColumnFamilyHandles) {
 
 				kvStateIterators.add(
-					new Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId));
+					new Tuple2<>(stateBackend.db.newIterator(columnFamilyHandle, readOptions), kvStateId));
 
 				++kvStateId;
 			}
@@ -1971,7 +1983,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K>
{
 			KeyedBackendSerializationProxy<K> serializationProxy =
 				new KeyedBackendSerializationProxy<>(
 					stateBackend.getKeySerializer(),
-					metaInfoSnapshots,
+					stateMetaInfoSnapshots,
 					!Objects.equals(
 						UncompressedStreamCompressionDecorator.INSTANCE,
 						stateBackend.keyGroupCompressionDecorator));


Mime
View raw message