flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StefanRRichter <...@git.apache.org>
Subject [GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Date Tue, 02 May 2017 17:32:58 GMT
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114364811
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
    @@ -808,6 +1143,240 @@ private void restoreKVStateData() throws IOException, RocksDBException
{
     		}
     	}
     
    +	private static class RocksDBIncrementalRestoreOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<?> stateBackend)
{
    +			this.stateBackend = stateBackend;
    +		}
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> readMetaData(
    +				StreamStateHandle metaStateHandle) throws Exception {
    +
    +			FSDataInputStream inputStream = null;
    +
    +			try {
    +				inputStream = metaStateHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.userCodeClassLoader);
    +				DataInputView in = new DataInputViewStreamWrapper(inputStream);
    +				serializationProxy.read(in);
    +
    +				return serializationProxy.getNamedStateSerializationProxies();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void readStateData(
    +				Path restoreFilePath,
    +				StreamStateHandle remoteFileHandle) throws IOException {
    +
    +			FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
    +
    +			FSDataInputStream inputStream = null;
    +			FSDataOutputStream outputStream = null;
    +
    +			try {
    +				inputStream = remoteFileHandle.openInputStream();
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				byte[] buffer = new byte[1024];
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +				}
    +			}
    +		}
    +
    +		private void restoreInstance(
    +				RocksDBKeyedStateHandle restoreStateHandle,
    +				boolean hasExtraKeys) throws Exception {
    +
    +			// read state data
    +			Path restoreInstancePath = new Path(
    +				stateBackend.instanceBasePath.getAbsolutePath(),
    +				UUID.randomUUID().toString());
    +
    +			try {
    +				Map<String, StreamStateHandle> sstFiles = restoreStateHandle.getSstFiles();
    +				for (Map.Entry<String, StreamStateHandle> sstFileEntry : sstFiles.entrySet())
{
    +					String fileName = sstFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = sstFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				Map<String, StreamStateHandle> miscFiles = restoreStateHandle.getMiscFiles();
    +				for (Map.Entry<String, StreamStateHandle> miscFileEntry : miscFiles.entrySet())
{
    +					String fileName = miscFileEntry.getKey();
    +					StreamStateHandle remoteFileHandle = miscFileEntry.getValue();
    +
    +					readStateData(new Path(restoreInstancePath, fileName), remoteFileHandle);
    +				}
    +
    +				// read meta data
    +				List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfoProxies
=
    +					readMetaData(restoreStateHandle.getMetaStateHandle());
    +
    +				List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
    +				columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes(ConfigConstants.DEFAULT_CHARSET)));
    +
    +				for (KeyedBackendSerializationProxy.StateMetaInfo<?, ?> stateMetaInfoProxy
: stateMetaInfoProxies) {
    +
    +					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +						stateMetaInfoProxy.getStateName().getBytes(ConfigConstants.DEFAULT_CHARSET),
    +						stateBackend.columnOptions);
    +
    +					columnFamilyDescriptors.add(columnFamilyDescriptor);
    +				}
    +
    +				if (hasExtraKeys) {
    --- End diff --
    
    I wonder if we could prune key-groups based on this: https://github.com/facebook/rocksdb/wiki/Delete-A-Range-Of-Keys.
    
    If not, would it make sense to bulk the inserts using the multi-put feature?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message