flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shixiaogang <...@git.apache.org>
Subject [GitHub] flink pull request #3801: [FLINK-6364] [checkpoints] Implement incremental c...
Date Thu, 04 May 2017 05:58:00 GMT
Github user shixiaogang commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3801#discussion_r114703775
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
    @@ -621,6 +692,237 @@ private static void checkInterrupted() throws InterruptedException
{
     		}
     	}
     
    +	private static class RocksDBIncrementalSnapshotOperation {
    +
    +		private final RocksDBKeyedStateBackend<?> stateBackend;
    +
    +		private final CheckpointStreamFactory checkpointStreamFactory;
    +
    +		private final long checkpointId;
    +
    +		private final long checkpointTimestamp;
    +
    +		private Map<String, StreamStateHandle> baseSstFiles;
    +
    +		private List<KeyedBackendSerializationProxy.StateMetaInfo<?, ?>> stateMetaInfos
= new ArrayList<>();
    +
    +		private FileSystem backupFileSystem;
    +		private Path backupPath;
    +
    +		private FSDataInputStream inputStream = null;
    +		private CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
    +
    +		// new sst files since the last completed checkpoint
    +		private Set<String> newSstFileNames = new HashSet<>();
    +
    +		// handles to the sst files in the current snapshot
    +		private Map<String, StreamStateHandle> sstFiles = new HashMap<>();
    +
    +		// handles to the misc files in the current snapshot
    +		private Map<String, StreamStateHandle> miscFiles = new HashMap<>();
    +
    +		private StreamStateHandle metaStateHandle = null;
    +
    +		private RocksDBIncrementalSnapshotOperation(
    +				RocksDBKeyedStateBackend<?> stateBackend,
    +				CheckpointStreamFactory checkpointStreamFactory,
    +				long checkpointId,
    +				long checkpointTimestamp) {
    +
    +			this.stateBackend = stateBackend;
    +			this.checkpointStreamFactory = checkpointStreamFactory;
    +			this.checkpointId = checkpointId;
    +			this.checkpointTimestamp = checkpointTimestamp;
    +		}
    +
    +		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
    +			try {
    +				final byte[] buffer = new byte[1024];
    +
    +				FileSystem backupFileSystem = backupPath.getFileSystem();
    +				inputStream = backupFileSystem.open(filePath);
    +				stateBackend.cancelStreamRegistry.registerClosable(inputStream);
    +
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				while (true) {
    +					int numBytes = inputStream.read(buffer);
    +
    +					if (numBytes == -1) {
    +						break;
    +					}
    +
    +					outputStream.write(buffer, 0, numBytes);
    +				}
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (inputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +					inputStream.close();
    +					inputStream = null;
    +				}
    +
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		private StreamStateHandle materializeMetaData() throws Exception {
    +			try {
    +				outputStream = checkpointStreamFactory
    +					.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
    +				stateBackend.cancelStreamRegistry.registerClosable(outputStream);
    +
    +				KeyedBackendSerializationProxy serializationProxy =
    +					new KeyedBackendSerializationProxy(stateBackend.keySerializer, stateMetaInfos);
    +				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
    +
    +				serializationProxy.write(out);
    +
    +				return outputStream.closeAndGetHandle();
    +			} finally {
    +				if (outputStream != null) {
    +					stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +					outputStream.close();
    +					outputStream = null;
    +				}
    +			}
    +		}
    +
    +		void takeSnapshot() throws Exception {
    +			// use the last completed checkpoint as the comparison base.
    +			baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
    +
    +			// save meta data
    +			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredBackendStateMetaInfo<?,
?>>> stateMetaInfoEntry : stateBackend.kvStateInformation.entrySet()) {
    +
    +				RegisteredBackendStateMetaInfo<?, ?> metaInfo = stateMetaInfoEntry.getValue().f1;
    +
    +				KeyedBackendSerializationProxy.StateMetaInfo<?, ?> metaInfoProxy =
    +					new KeyedBackendSerializationProxy.StateMetaInfo<>(
    +						metaInfo.getStateType(),
    +						metaInfo.getName(),
    +						metaInfo.getNamespaceSerializer(),
    +						metaInfo.getStateSerializer());
    +
    +				stateMetaInfos.add(metaInfoProxy);
    +			}
    +
    +			// save state data
    +			backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
    +			backupFileSystem = backupPath.getFileSystem();
    +			if (backupFileSystem.exists(backupPath)) {
    +				LOG.warn("Deleting an existing local checkpoint directory " +
    +					backupPath + ".");
    +
    +				backupFileSystem.delete(backupPath, true);
    +			}
    +
    +			// create hard links of living files in the checkpoint path
    +			Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
    +			checkpoint.createCheckpoint(backupPath.getPath());
    +		}
    +
    +		KeyedStateHandle materializeSnapshot() throws Exception {
    +			// write meta data
    +			metaStateHandle = materializeMetaData();
    +
    +			// write state data
    +			Preconditions.checkState(backupFileSystem.exists(backupPath));
    +
    +			FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath);
    +			if (fileStatuses != null ) {
    +				for (FileStatus fileStatus : fileStatuses) {
    +					Path filePath = fileStatus.getPath();
    +					String fileName = filePath.getName();
    +
    +					if (fileName.endsWith(SST_FILE_SUFFIX)) {
    +						StreamStateHandle fileHandle =
    +							baseSstFiles == null ? null : baseSstFiles.get(fileName);
    +
    +						if (fileHandle == null) {
    +							newSstFileNames.add(fileName);
    +							fileHandle = materializeStateData(filePath);
    +						}
    +
    +						sstFiles.put(fileName, fileHandle);
    +					} else {
    +						StreamStateHandle fileHandle = materializeStateData(filePath);
    +						miscFiles.put(fileName, fileHandle);
    +					}
    +				}
    +			}
    +
    +			stateBackend.materializedSstFiles.put(checkpointId, sstFiles);
    +
    +			return new RocksDBKeyedStateHandle(stateBackend.jobId,
    +				stateBackend.operatorIdentifier, stateBackend.keyGroupRange,
    +				newSstFileNames, sstFiles, miscFiles, metaStateHandle);
    +		}
    +
    +		void releaseResources(boolean canceled) {
    +
    +			if (inputStream != null) {
    +				stateBackend.cancelStreamRegistry.unregisterClosable(inputStream);
    +				try {
    +					inputStream.close();
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly close the input stream.", e);
    +				}
    +				inputStream = null;
    +			}
    +
    +			if (outputStream != null) {
    +				stateBackend.cancelStreamRegistry.unregisterClosable(outputStream);
    +				try {
    +					outputStream.close();
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly close the output stream.", e);
    +				}
    +				outputStream = null;
    +			}
    +
    +			if (backupPath != null) {
    +				try {
    +					if (backupFileSystem.exists(backupPath)) {
    +						backupFileSystem.delete(backupPath, true);
    +					}
    +				} catch (Exception e) {
    +					LOG.warn("Could not properly delete the checkpoint directory.", e);
    +				}
    +			}
    +
    +			if (canceled) {
    +				List<StateObject> statesToDiscard = new ArrayList<>();
    +
    +				if (metaStateHandle != null) {
    +					statesToDiscard.add(metaStateHandle);
    +				}
    +
    +				statesToDiscard.addAll(miscFiles.values());
    +
    +				for (String newSstFileName : newSstFileNames) {
    +					StreamStateHandle fileHandle = sstFiles.get(newSstFileName);
    +					if (fileHandle != null) {
    +						statesToDiscard.add(fileHandle);
    +					}
    +				}
    +
    +				try {
    +					StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
    --- End diff --
    
    You are absolutely right. Global cleanup hooks are urgently needed here to clean unused
states here. The hook at job manager instead of the shared state registry is supposed to do
the work because unused private states should be cleaned as well.   At the startup, the hook
will know the checkpoint from which we are restoring and only retain the data in restored
completed checkpoints.
    
    The local checkpointing directories will be deleted once the checkpoint completed at the
TM. Since local checkpoint directories are all under the directory for the backend which is
deleted when the backend is disposed, they can also be deleted if the backend is correctly
closed. 
    
    But in the cases when the TM fails during the closing of the backend, the local checkpoint
directories will be left on the file system. The problem does not matter in Yarn clusters
but may be very severe in standalone clusters. What do you think of the problem?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message