flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From StefanRRichter <...@git.apache.org>
Subject [GitHub] flink pull request #5582: [FLINK-8790][State] Improve performance for recove...
Date Fri, 01 Jun 2018 08:34:43 GMT
Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5582#discussion_r192330418
  
    --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
---
    @@ -795,20 +806,241 @@ private void restoreInstance(
     		}
     
     		/**
    -		 * Recovery from local incremental state.
    +		 * Recovery from multi incremental states.
    +		 * In case of rescaling, this method creates a temporary RocksDB instance for a key-groups
shard. All contents
    +		 * from the temporary instance are copied into the real restore instance and then the
temporary instance is
    +		 * discarded.
     		 */
    -		private void restoreInstance(IncrementalLocalKeyedStateHandle localKeyedStateHandle)
throws Exception {
    +		void restoreFromMultiHandles(Collection<KeyedStateHandle> restoreStateHandles)
throws Exception {
    +
    +			KeyGroupRange targetKeyGroupRange = stateBackend.keyGroupRange;
    +
    +			chooseTheBestStateHandleToInit(restoreStateHandles, targetKeyGroupRange);
    +
    +			int targetStartKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
    +			byte[] targetStartKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
    +			for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
    +				targetStartKeyGroupPrefixBytes[j] = (byte) (targetStartKeyGroup >>> ((stateBackend.keyGroupPrefixBytes
- j - 1) * Byte.SIZE));
    +			}
    +
    +			for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
    +
    +				if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) {
    +					throw new IllegalStateException("Unexpected state handle type, " +
    +						"expected " + IncrementalKeyedStateHandle.class +
    +						", but found " + rawStateHandle.getClass());
    +				}
    +
    +				Path temporaryRestoreInstancePath = new Path(stateBackend.instanceBasePath.getAbsolutePath()
+ UUID.randomUUID().toString());
    +				try (RestoredDBInstance tmpRestoreDBInfo = restoreDBFromStateHandle(
    +						(IncrementalKeyedStateHandle) rawStateHandle,
    +						temporaryRestoreInstancePath,
    +						targetKeyGroupRange,
    +						stateBackend.keyGroupPrefixBytes,
    +						false);
    +					RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db))
{
    +
    +					List<ColumnFamilyDescriptor> tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors;
    +					List<ColumnFamilyHandle> tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles;
    +
    +					int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
    +					byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
    +					for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
    +						startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes
- j - 1) * Byte.SIZE));
    +					}
    +
    +					// iterating only the requested descriptors automatically skips the default column
family handle
    +					for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) {
    +						ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i);
    +						ColumnFamilyDescriptor tmpColumnFamilyDescriptor = tmpColumnFamilyDescriptors.get(i);
    +
    +						ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterColumnFamilyHandle(
    +							tmpColumnFamilyDescriptor, null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i));
    +
    +						try (RocksIteratorWrapper iterator = getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle))
{
    +
    +							iterator.seek(startKeyGroupPrefixBytes);
    +
    +							while (iterator.isValid()) {
    +
    +								int keyGroup = 0;
    +								for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
    +									keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
    +								}
    +
    +								if (stateBackend.keyGroupRange.contains(keyGroup)) {
    +									writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value());
    +								} else {
    +									// Since the iterator will visit the record according to the sorted order,
    +									// we can just break here.
    +									break;
    +								}
    +
    +								iterator.next();
    +							}
    +						} // releases native iterator resources
    +					}
    +				} finally {
    +					FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem();
    +					if (restoreFileSystem.exists(temporaryRestoreInstancePath)) {
    +						restoreFileSystem.delete(temporaryRestoreInstancePath, true);
    +					}
    +				}
    +			}
    +		}
    +
    +		private class RestoredDBInstance implements AutoCloseable {
    +
    +			@Nonnull
    +			private final RocksDB db;
    +
    +			@Nonnull
    +			private final ColumnFamilyHandle defaultColumnFamilyHandle;
    +
    +			@Nonnull
    +			private final List<ColumnFamilyHandle> columnFamilyHandles;
    +
    +			@Nonnull
    +			private final List<ColumnFamilyDescriptor> columnFamilyDescriptors;
    +
    +			@Nonnull
    +			private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>>
stateMetaInfoSnapshots;
    +
    +			public RestoredDBInstance(@Nonnull RocksDB db,
    +								@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
    +								@Nonnull List<ColumnFamilyDescriptor> columnFamilyDescriptors,
    +								@Nonnull List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>>
stateMetaInfoSnapshots) {
    +				this.db = db;
    +				this.columnFamilyHandles = columnFamilyHandles;
    +				this.defaultColumnFamilyHandle = this.columnFamilyHandles.remove(0);
    +				this.columnFamilyDescriptors = columnFamilyDescriptors;
    +				this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
    +			}
    +
    +			@Override
    +			public void close() throws Exception {
    +
    +				IOUtils.closeQuietly(defaultColumnFamilyHandle);
    +
    +				for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
    +					IOUtils.closeQuietly(columnFamilyHandle);
    +				}
    +
    +				IOUtils.closeQuietly(db);
    +			}
    +		}
    +
    +		private RestoredDBInstance restoreDBFromStateHandle(
    +			IncrementalKeyedStateHandle restoreStateHandle,
    +			Path temporaryRestoreInstancePath,
    +			KeyGroupRange targetKeyGroupRange,
    +			int keyGroupPrefixBytes,
    +			boolean needClip) throws Exception {
    +
    +			transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath);
    +
     			// read meta data
     			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots
=
    -				readMetaData(localKeyedStateHandle.getMetaDataState());
    +				readMetaData(restoreStateHandle.getMetaStateHandle());
     
     			List<ColumnFamilyDescriptor> columnFamilyDescriptors =
     				createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
     
    -			restoreLocalStateIntoFullInstance(
    -				localKeyedStateHandle,
    +			List<ColumnFamilyHandle> columnFamilyHandles =
    +				new ArrayList<>(stateMetaInfoSnapshots.size() + 1);
    +
    +			RocksDB restoreDb = stateBackend.openDB(
    +				temporaryRestoreInstancePath.getPath(),
     				columnFamilyDescriptors,
    -				stateMetaInfoSnapshots);
    +				columnFamilyHandles);
    +
    +			if (needClip) {
    +				RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange(
    +					restoreDb,
    +					columnFamilyHandles,
    +					targetKeyGroupRange,
    +					restoreStateHandle.getKeyGroupRange(),
    +					keyGroupPrefixBytes);
    +			}
    +
    +			return new RestoredDBInstance(restoreDb, columnFamilyHandles, columnFamilyDescriptors,
stateMetaInfoSnapshots);
    +		}
    +
    +		private ColumnFamilyHandle getOrRegisterColumnFamilyHandle(
    +			ColumnFamilyDescriptor columnFamilyDescriptor,
    +			ColumnFamilyHandle columnFamilyHandle,
    +			RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot) throws
RocksDBException {
    +
    +			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>
registeredStateMetaInfoEntry =
    +				stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
    +
    +			if (null == registeredStateMetaInfoEntry) {
    +				RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
    +					new RegisteredKeyedBackendStateMetaInfo<>(
    +						stateMetaInfoSnapshot.getStateType(),
    +						stateMetaInfoSnapshot.getName(),
    +						stateMetaInfoSnapshot.getNamespaceSerializer(),
    +						stateMetaInfoSnapshot.getStateSerializer());
    +
    +				registeredStateMetaInfoEntry =
    +					new Tuple2<>(
    +						columnFamilyHandle != null ? columnFamilyHandle : stateBackend.db.createColumnFamily(columnFamilyDescriptor),
    +						stateMetaInfo);
    +
    +				stateBackend.kvStateInformation.put(
    +					stateMetaInfoSnapshot.getName(),
    +					registeredStateMetaInfoEntry);
    +			}
    +
    +			return registeredStateMetaInfoEntry.f0;
    +		}
    +
    +		private void chooseTheBestStateHandleToInit(
    --- End diff --
    
    I think the name of this method is no longer accurate: it does not only chose the best
handle, it already restores as db instance. Maybe we can we still break this up into two methods,
so that each method only does one thing. I think it is not so nice if that creating the db
is a side effect of a method that claims to only find something.


---

Mime
View raw message