flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StefanRRichter <...@git.apache.org>
Subject [GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery
Date Wed, 14 Feb 2018 14:12:44 GMT
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5239#discussion_r168184825
  
    --- Diff: flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
    @@ -363,1691 +372,1780 @@ public int getKeyGroupPrefixBytes() {
     		final CheckpointStreamFactory streamFactory,
     		CheckpointOptions checkpointOptions) throws Exception {
     
    -		if (checkpointOptions.getCheckpointType() != CheckpointOptions.CheckpointType.SAVEPOINT
&&
    -			enableIncrementalCheckpointing) {
    -			return snapshotIncrementally(checkpointId, timestamp, streamFactory);
    -		} else {
    -			return snapshotFully(checkpointId, timestamp, streamFactory);
    -		}
    +		return snapshotStrategy.performSnapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
     	}
     
    -	private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotIncrementally(
    -		final long checkpointId,
    -		final long checkpointTimestamp,
    -		final CheckpointStreamFactory checkpointStreamFactory) throws Exception {
    -
    -		if (db == null) {
    -			throw new IOException("RocksDB closed.");
    -		}
    +	@Override
    +	public void restore(StateObjectCollection<KeyedStateHandle> restoreState) throws
Exception {
    +		LOG.info("Initializing RocksDB keyed state backend from snapshot.");
     
    -		if (kvStateInformation.isEmpty()) {
    -			if (LOG.isDebugEnabled()) {
    -				LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " +
    -					checkpointTimestamp + " . Returning null.");
    -			}
    -			return DoneFuture.nullValue();
    +		if (LOG.isDebugEnabled()) {
    +			LOG.debug("Restoring snapshot from state handles: {}.", restoreState);
     		}
     
    -		final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
    -			new RocksDBIncrementalSnapshotOperation<>(
    -				this,
    -				checkpointStreamFactory,
    -				checkpointId,
    -				checkpointTimestamp);
    -
    -		snapshotOperation.takeSnapshot();
    -
    -		return new FutureTask<SnapshotResult<KeyedStateHandle>>(
    -			() -> snapshotOperation.materializeSnapshot()
    -		) {
    -			@Override
    -			public boolean cancel(boolean mayInterruptIfRunning) {
    -				snapshotOperation.stop();
    -				return super.cancel(mayInterruptIfRunning);
    -			}
    +		// clear all meta data
    +		kvStateInformation.clear();
    +		restoredKvStateMetaInfos.clear();
     
    -			@Override
    -			protected void done() {
    -				snapshotOperation.releaseResources(isCancelled());
    +		try {
    +			if (restoreState == null || restoreState.isEmpty()) {
    +				createDB();
    +			} else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle)
{
    +				RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
    +				restoreOperation.restore(restoreState);
    +			} else {
    +				RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
    +				restoreOperation.doRestore(restoreState);
     			}
    -		};
    +		} catch (Exception ex) {
    +			dispose();
    +			throw ex;
    +		}
     	}
     
    -	private RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotFully(
    -		final long checkpointId,
    -		final long timestamp,
    -		final CheckpointStreamFactory streamFactory) throws Exception {
    -
    -		long startTime = System.currentTimeMillis();
    -		final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
    -
    -		final RocksDBFullSnapshotOperation<K> snapshotOperation;
    -
    -		if (kvStateInformation.isEmpty()) {
    -			if (LOG.isDebugEnabled()) {
    -				LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp
+
    -					" . Returning null.");
    -			}
    +	@Override
    +	public void notifyCheckpointComplete(long completedCheckpointId) {
     
    -			return DoneFuture.nullValue();
    +		if (!enableIncrementalCheckpointing) {
    +			return;
     		}
     
    -		snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry);
    -		snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
    -
    -		// implementation of the async IO operation, based on FutureTask
    -		AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>> ioCallable
=
    -			new AbstractAsyncCallableWithResources<SnapshotResult<KeyedStateHandle>>()
{
    +		synchronized (materializedSstFiles) {
     
    -				@Override
    -				protected void acquireResources() throws Exception {
    -					cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
    -					snapshotOperation.openCheckpointStream();
    -				}
    +			if (completedCheckpointId < lastCompletedCheckpointId) {
    +				return;
    +			}
     
    -				@Override
    -				protected void releaseResources() throws Exception {
    -					closeLocalRegistry();
    -					releaseSnapshotOperationResources();
    -				}
    +			materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
     
    -				private void releaseSnapshotOperationResources() {
    -					// hold the db lock while operation on the db to guard us against async db disposal
    -					snapshotOperation.releaseSnapshotResources();
    -				}
    +			lastCompletedCheckpointId = completedCheckpointId;
    +		}
    +	}
     
    -				@Override
    -				protected void stopOperation() throws Exception {
    -					closeLocalRegistry();
    -				}
    +	private void createDB() throws IOException {
    +		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
    +		this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
    +		this.defaultColumnFamily = columnFamilyHandles.get(0);
    +	}
     
    -				private void closeLocalRegistry() {
    -					if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
    -						try {
    -							snapshotCloseableRegistry.close();
    -						} catch (Exception ex) {
    -							LOG.warn("Error closing local registry", ex);
    -						}
    -					}
    -				}
    +	private RocksDB openDB(
    +		String path,
    +		List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
    +		List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException {
     
    -				@Override
    -				public SnapshotResult<KeyedStateHandle> performOperation() throws Exception
{
    -					long startTime = System.currentTimeMillis();
    +		List<ColumnFamilyDescriptor> columnFamilyDescriptors =
    +			new ArrayList<>(1 + stateColumnFamilyDescriptors.size());
     
    -					if (isStopped()) {
    -						throw new IOException("RocksDB closed.");
    -					}
    +		columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
     
    -					snapshotOperation.writeDBSnapshot();
    +		// we add the required descriptor for the default CF in last position.
    +		columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES,
columnOptions));
     
    -					LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took
{} ms.",
    -						streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
    +		RocksDB dbRef;
     
    -					KeyGroupsStateHandle stateHandle = snapshotOperation.getSnapshotResultStateHandle();
    -					return new SnapshotResult<>(stateHandle, null);
    -				}
    -			};
    +		try {
    +			dbRef = RocksDB.open(
    +				Preconditions.checkNotNull(dbOptions),
    +				Preconditions.checkNotNull(path),
    +				columnFamilyDescriptors,
    +				stateColumnFamilyHandles);
    +		} catch (RocksDBException e) {
    +			throw new IOException("Error while opening RocksDB instance.", e);
    +		}
     
    -		LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in
thread " +
    -			Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms.");
    +		// requested + default CF
    +		Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
    +			"Not all requested column family handles have been created");
     
    -		return AsyncStoppableTaskWithCallback.from(ioCallable);
    +		return dbRef;
     	}
     
     	/**
    -	 * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend.
    +	 * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a full snapshot.
     	 */
    -	static final class RocksDBFullSnapshotOperation<K> {
    +	private static final class RocksDBFullRestoreOperation<K> {
     
    -		static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
    -		static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
    -
    -		private final RocksDBKeyedStateBackend<K> stateBackend;
    -		private final KeyGroupRangeOffsets keyGroupRangeOffsets;
    -		private final CheckpointStreamFactory checkpointStreamFactory;
    -		private final CloseableRegistry snapshotCloseableRegistry;
    -		private final ResourceGuard.Lease dbLease;
    -
    -		private long checkpointId;
    -		private long checkpointTimeStamp;
    -
    -		private Snapshot snapshot;
    -		private ReadOptions readOptions;
    -		private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
    -
    -		private CheckpointStreamFactory.CheckpointStateOutputStream outStream;
    -		private DataOutputView outputView;
    -
    -		RocksDBFullSnapshotOperation(
    -			RocksDBKeyedStateBackend<K> stateBackend,
    -			CheckpointStreamFactory checkpointStreamFactory,
    -			CloseableRegistry registry) throws IOException {
    +		private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend;
     
    -			this.stateBackend = stateBackend;
    -			this.checkpointStreamFactory = checkpointStreamFactory;
    -			this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange);
    -			this.snapshotCloseableRegistry = registry;
    -			this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource();
    -		}
    +		/** Current key-groups state handle from which we restore key-groups. */
    +		private KeyGroupsStateHandle currentKeyGroupsStateHandle;
    +		/** Current input stream we obtained from currentKeyGroupsStateHandle. */
    +		private FSDataInputStream currentStateHandleInStream;
    +		/** Current data input view that wraps currentStateHandleInStream. */
    +		private DataInputView currentStateHandleInView;
    +		/** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle.
*/
    +		private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
    +		/** The compression decorator that was used for writing the state, as determined by
the meta data. */
    +		private StreamCompressionDecorator keygroupStreamCompressionDecorator;
     
     		/**
    -		 * 1) Create a snapshot object from RocksDB.
    +		 * Creates a restore operation object for the given state backend instance.
     		 *
    -		 * @param checkpointId id of the checkpoint for which we take the snapshot
    -		 * @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot
    +		 * @param rocksDBKeyedStateBackend the state backend into which we restore
     		 */
    -		public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) {
    -			Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
    -			this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size());
    -			this.checkpointId = checkpointId;
    -			this.checkpointTimeStamp = checkpointTimeStamp;
    -			this.snapshot = stateBackend.db.getSnapshot();
    +		public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend)
{
    +			this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend);
     		}
     
     		/**
    -		 * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which
we will write.
    +		 * Restores all key-groups data that is referenced by the passed state handles.
     		 *
    -		 * @throws Exception
    +		 * @param keyedStateHandles List of all key groups state handles that shall be restored.
     		 */
    -		public void openCheckpointStream() throws Exception {
    -			Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already
set.");
    -			outStream = checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId,
checkpointTimeStamp);
    -			snapshotCloseableRegistry.registerCloseable(outStream);
    -			outputView = new DataOutputViewStreamWrapper(outStream);
    -		}
    +		public void doRestore(Collection<KeyedStateHandle> keyedStateHandles)
    +			throws IOException, StateMigrationException, RocksDBException {
     
    -		/**
    -		 * 3) Write the actual data from RocksDB from the time we took the snapshot object
in (1).
    -		 *
    -		 * @throws IOException
    -		 */
    -		public void writeDBSnapshot() throws IOException, InterruptedException {
    +			rocksDBKeyedStateBackend.createDB();
     
    -			if (null == snapshot) {
    -				throw new IOException("No snapshot available. Might be released due to cancellation.");
    -			}
    +			for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
    +				if (keyedStateHandle != null) {
     
    -			Preconditions.checkNotNull(outStream, "No output stream to write snapshot.");
    -			writeKVStateMetaData();
    -			writeKVStateData();
    +					if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
    +						throw new IllegalStateException("Unexpected state handle type, " +
    +							"expected: " + KeyGroupsStateHandle.class +
    +							", but found: " + keyedStateHandle.getClass());
    +					}
    +					this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
    +					restoreKeyGroupsInStateHandle();
    +				}
    +			}
     		}
     
     		/**
    -		 * 4) Returns a state handle to the snapshot after the snapshot procedure is completed
and null before.
    -		 *
    -		 * @return state handle to the completed snapshot
    +		 * Restore one key groups state handle.
     		 */
    -		public KeyGroupsStateHandle getSnapshotResultStateHandle() throws IOException {
    -
    -			if (snapshotCloseableRegistry.unregisterCloseable(outStream)) {
    -
    -				StreamStateHandle stateHandle = outStream.closeAndGetHandle();
    -				outStream = null;
    -
    -				if (stateHandle != null) {
    -					return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
    +		private void restoreKeyGroupsInStateHandle()
    +			throws IOException, StateMigrationException, RocksDBException {
    +			try {
    +				currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
    +				rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
    +				currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
    +				restoreKVStateMetaData();
    +				restoreKVStateData();
    +			} finally {
    +				if (rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream))
{
    +					IOUtils.closeQuietly(currentStateHandleInStream);
     				}
     			}
    -			return null;
     		}
     
     		/**
    -		 * 5) Release the snapshot object for RocksDB and clean up.
    +		 * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the
current state handle.
    +		 *
    +		 * @throws IOException
    +		 * @throws ClassNotFoundException
    +		 * @throws RocksDBException
     		 */
    -		public void releaseSnapshotResources() {
    +		private void restoreKVStateMetaData() throws IOException, StateMigrationException,
RocksDBException {
     
    -			outStream = null;
    +			KeyedBackendSerializationProxy<K> serializationProxy =
    +				new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
     
    -			if (null != kvStateIterators) {
    -				for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) {
    -					IOUtils.closeQuietly(kvStateIterator.f0);
    -				}
    -				kvStateIterators = null;
    -			}
    +			serializationProxy.read(currentStateHandleInView);
     
    -			if (null != snapshot) {
    -				if (null != stateBackend.db) {
    -					stateBackend.db.releaseSnapshot(snapshot);
    -				}
    -				IOUtils.closeQuietly(snapshot);
    -				snapshot = null;
    -			}
    +			// check for key serializer compatibility; this also reconfigures the
    +			// key serializer to be compatible, if it is required and is possible
    +			if (CompatibilityUtil.resolveCompatibilityResult(
    +				serializationProxy.getKeySerializer(),
    +				UnloadableDummyTypeSerializer.class,
    +				serializationProxy.getKeySerializerConfigSnapshot(),
    +				rocksDBKeyedStateBackend.keySerializer)
    +				.isRequiresMigration()) {
     
    -			if (null != readOptions) {
    -				IOUtils.closeQuietly(readOptions);
    -				readOptions = null;
    +				// TODO replace with state migration; note that key hash codes need to remain the
same after migration
    +				throw new StateMigrationException("The new key serializer is not compatible to read
previous keys. " +
    +					"Aborting now since state migration is currently not available");
     			}
     
    -			this.dbLease.close();
    -		}
    -
    -		private void writeKVStateMetaData() throws IOException {
    +			this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression()
?
    +				SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
     
    -			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots
=
    -				new ArrayList<>(stateBackend.kvStateInformation.size());
    +			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos
=
    +				serializationProxy.getStateMetaInfoSnapshots();
    +			currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
    +			//rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size());
     
    -			int kvStateId = 0;
    -			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?,
?>>> column :
    -				stateBackend.kvStateInformation.entrySet()) {
    +			for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos)
{
     
    -				metaInfoSnapshots.add(column.getValue().f1.snapshot());
    +				Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>
registeredColumn =
    +					rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
     
    -				//retrieve iterator for this k/v states
    -				readOptions = new ReadOptions();
    -				readOptions.setSnapshot(snapshot);
    +				if (registeredColumn == null) {
    +					byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
     
    -				kvStateIterators.add(
    -					new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions),
kvStateId));
    +					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
    +						nameBytes,
    +						rocksDBKeyedStateBackend.columnOptions);
     
    -				++kvStateId;
    -			}
    +					RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
    +						new RegisteredKeyedBackendStateMetaInfo<>(
    +							restoredMetaInfo.getStateType(),
    +							restoredMetaInfo.getName(),
    +							restoredMetaInfo.getNamespaceSerializer(),
    +							restoredMetaInfo.getStateSerializer());
     
    -			KeyedBackendSerializationProxy<K> serializationProxy =
    -				new KeyedBackendSerializationProxy<>(
    -					stateBackend.getKeySerializer(),
    -					metaInfoSnapshots,
    -					!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator));
    +					rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(),
restoredMetaInfo);
     
    -			serializationProxy.write(outputView);
    -		}
    +					ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
     
    -		private void writeKVStateData() throws IOException, InterruptedException {
    +					registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo);
    +					rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn);
     
    -			byte[] previousKey = null;
    -			byte[] previousValue = null;
    -			OutputStream kgOutStream = null;
    -			DataOutputView kgOutView = null;
    +				} else {
    +					// TODO with eager state registration in place, check here for serializer migration
strategies
    +				}
    +				currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
    +			}
    +		}
     
    -			try {
    -				// Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
    -				try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
    -					kvStateIterators, stateBackend.keyGroupPrefixBytes)) {
    +		/**
    +		 * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current
state handle.
    +		 *
    +		 * @throws IOException
    +		 * @throws RocksDBException
    +		 */
    +		private void restoreKVStateData() throws IOException, RocksDBException {
    +			//for all key-groups in the current state handle...
    +			for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets())
{
    +				int keyGroup = keyGroupOffset.f0;
     
    -					// handover complete, null out to prevent double close
    -					kvStateIterators = null;
    +				// Check that restored key groups all belong to the backend
    +				Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
    +					"The key group must belong to the backend");
     
    -					//preamble: setup with first key-group as our lookahead
    -					if (mergeIterator.isValid()) {
    -						//begin first key-group by recording the offset
    -						keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
    -						//write the k/v-state id as metadata
    -						kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
    -						kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
    +				long offset = keyGroupOffset.f1;
    +				//not empty key-group?
    +				if (0L != offset) {
    +					currentStateHandleInStream.seek(offset);
    +					try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream))
{
    +						DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
     						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
    -						kgOutView.writeShort(mergeIterator.kvStateId());
    -						previousKey = mergeIterator.key();
    -						previousValue = mergeIterator.value();
    -						mergeIterator.next();
    +						int kvStateId = compressedKgInputView.readShort();
    +						ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
    +						//insert all k/v pairs into DB
    +						boolean keyGroupHasMoreKeys = true;
    +						while (keyGroupHasMoreKeys) {
    +							byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
    +							byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
    +							if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
    +								//clear the signal bit in the key to make it ready for insertion again
    +								RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
    +								rocksDBKeyedStateBackend.db.put(handle, key, value);
    +								//TODO this could be aware of keyGroupPrefixBytes and write only one byte if
possible
    +								kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
    +									& compressedKgInputView.readShort();
    +								if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
    +									keyGroupHasMoreKeys = false;
    +								} else {
    +									handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
    +								}
    +							} else {
    +								rocksDBKeyedStateBackend.db.put(handle, key, value);
    +							}
    +						}
     					}
    +				}
    +			}
    +		}
    +	}
     
    -					//main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking
key-group offsets.
    -					while (mergeIterator.isValid()) {
    +	/**
    +	 * Encapsulates the process of restoring a RocksDBKeyedStateBackend from an incremental
snapshot.
    +	 */
    +	private static class RocksDBIncrementalRestoreOperation<T> {
    --- End diff --
    
    In general, I think that makes sense by now, but I would do this in a different PR.


---

Mime
View raw message