flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6364) Implement incremental checkpointing in RocksDBStateBackend
Date Tue, 02 May 2017 17:33:07 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15993354#comment-15993354
] 

ASF GitHub Bot commented on FLINK-6364:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114365154
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
    @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException
{
     		}
     	}
     
    +	private static class RocksDBIncrementalRestoreOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend)
{
    +			this.stateBackend = stateBackend;
    +		}
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> readMetaData(
    +				StreamStateHandle metaStateHandle) throws Exception {
    +
    +			FSDataInputStream inputStream = null;
    +
    +			try {
    +				inputStream = metaStateHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
    +				DataInputView in = new DataInputViewStreamWrapper(inputStream);
    +				serializationProxy.read(in);
    +
    +				return serializationProxy.getNamedStateSerializationProxies();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void readStateData(
    +				Path restoreFilePath,
    +				StreamStateHandle remoteFileHandle) throws IOException {
    +
    +			FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
    +
    +			FSDataInputStream inputStream = null;
    +			FSDataOutputStream outputStream = null;
    +
    +			try {
    +				inputStream = remoteFileHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				byte[] buffer = new byte[1024];
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void restoreInstance(
    +				RocksDBKeyedStateHandle restoreStateHandle,
    +				boolean hasExtraKeys) throws Exception {
    +
    +			// read state data
    +			Path restoreInstancePath = new Path(
    +				stateBackend.instanceBasePath.getAbsolutePath(),
    +				UUID.randomUUID().toString());
    +
    +			try {
    +				Map<String, StreamStateHandle> sstFiles = restoreStateHandle.getSstFiles();
    +				for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet())
{
    +					String fileName = sstFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = sstFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				Map<String, StreamStateHandle> miscFiles = restoreStateHandle.getMiscFiles();
    +				for (Map.Entry<String, StreamStateHandle> miscFileEntry : miscFiles.entrySet())
{
    +					String fileName = miscFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = miscFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				// read meta data
    +				List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies
=
    +					readMetaData(restoreStateHandle.getMetaStateHandle());
    +
    +				List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
    +				columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
    +
    +				for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy
: stateMetaInfoProxies) {
    +
    +					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +						stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
    +						stateBackend.columnOptions);
    +
    +					columnFamilyDescriptors.add(columnFamilyDescriptor);
    +				}
    +
    +				if (hasExtraKeys) {
    +
    +					List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
    +
    +					RocksDB restoreDb = RocksDB.open(
    +						stateBackend.dbOptions,
    +						restoreInstancePath.getPath(),
    +						columnFamilyDescriptors,
    +						columnFamilyHandles);
    +
    +					for (int i = 1; i < columnFamilyHandles.size(); ++i) {
    +						ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
    +						ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
    +						KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy = stateMetaInfoProxies.get(i
- 1);
    +
    +						Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry
=
    +							stateBackend.kvStateInformation.get(stateMetaInfoProxy.getStateName());
    +
    +						if (null == registeredStateMetaInfoEntry) {
    +
    +							RegisteredBackendStateMetaInfo<?, ?> stateMetaInfo = new RegisteredBackendStateMetaInfo<>(stateMetaInfoProxy);
    +
    +							registeredStateMetaInfoEntry =
    +								new Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?, ?>>(
    +									stateBackend.db.createColumnFamily(columnFamilyDescriptor),
    +									stateMetaInfo);
    +
    +							stateBackend.kvStateInformation.put(stateMetaInfoProxy.getStateName(), registeredStateMetaInfoEntry);
    +						}
    +
    +						ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
    +
    +						try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {
    +
    +							iterator.seekToFirst();
    --- End diff --
    
    Instead of `seekToFirst`, can we not seek to the first key-group in the backend's range
(via `seek(keygroupPrefixBytes)` to potentially save some entries?


> Implement incremental checkpointing in RocksDBStateBackend
> ----------------------------------------------------------
>
>                 Key: FLINK-6364
>                 URL: https://issues.apache.org/jira/browse/FLINK-6364
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Xiaogang Shi
>            Assignee: Xiaogang Shi
>
> {{RocksDBStateBackend}} is well suited for incremental checkpointing because RocksDB
is base on LSM trees,  which record updates in new sst files and all sst files are immutable.
By only materializing those new sst files, we can significantly improve the performance of
checkpointing.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message