From issues-return-169822-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri Jun 1 10:34:44 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 2385018063A for ; Fri, 1 Jun 2018 10:34:43 +0200 (CEST) Received: (qmail 88251 invoked by uid 500); 1 Jun 2018 08:34:43 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 88239 invoked by uid 99); 1 Jun 2018 08:34:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jun 2018 08:34:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 12C68E0BBD; Fri, 1 Jun 2018 08:34:43 +0000 (UTC) From: StefanRRichter To: issues@flink.apache.org Reply-To: issues@flink.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request #5582: [FLINK-8790][State] Improve performance for recove... Content-Type: text/plain Message-Id: <20180601083443.12C68E0BBD@git1-us-west.apache.org> Date: Fri, 1 Jun 2018 08:34:43 +0000 (UTC) Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5582#discussion_r192330418 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java --- @@ -795,20 +806,241 @@ private void restoreInstance( } /** - * Recovery from local incremental state. + * Recovery from multi incremental states. + * In case of rescaling, this method creates a temporary RocksDB instance for a key-groups shard. All contents + * from the temporary instance are copied into the real restore instance and then the temporary instance is + * discarded. */ - private void restoreInstance(IncrementalLocalKeyedStateHandle localKeyedStateHandle) throws Exception { + void restoreFromMultiHandles(Collection restoreStateHandles) throws Exception { + + KeyGroupRange targetKeyGroupRange = stateBackend.keyGroupRange; + + chooseTheBestStateHandleToInit(restoreStateHandles, targetKeyGroupRange); + + int targetStartKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); + byte[] targetStartKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + targetStartKeyGroupPrefixBytes[j] = (byte) (targetStartKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); + } + + for (KeyedStateHandle rawStateHandle : restoreStateHandles) { + + if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) { + throw new IllegalStateException("Unexpected state handle type, " + + "expected " + IncrementalKeyedStateHandle.class + + ", but found " + rawStateHandle.getClass()); + } + + Path temporaryRestoreInstancePath = new Path(stateBackend.instanceBasePath.getAbsolutePath() + UUID.randomUUID().toString()); + try (RestoredDBInstance tmpRestoreDBInfo = restoreDBFromStateHandle( + (IncrementalKeyedStateHandle) rawStateHandle, + temporaryRestoreInstancePath, + targetKeyGroupRange, + stateBackend.keyGroupPrefixBytes, + false); + RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(stateBackend.db)) { + + List tmpColumnFamilyDescriptors = tmpRestoreDBInfo.columnFamilyDescriptors; + List tmpColumnFamilyHandles = tmpRestoreDBInfo.columnFamilyHandles; + + int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); + byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); + } + + // iterating only the requested descriptors automatically skips the default column family handle + for (int i = 0; i < tmpColumnFamilyDescriptors.size(); ++i) { + ColumnFamilyHandle tmpColumnFamilyHandle = tmpColumnFamilyHandles.get(i); + ColumnFamilyDescriptor tmpColumnFamilyDescriptor = tmpColumnFamilyDescriptors.get(i); + + ColumnFamilyHandle targetColumnFamilyHandle = getOrRegisterColumnFamilyHandle( + tmpColumnFamilyDescriptor, null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i)); + + try (RocksIteratorWrapper iterator = getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) { + + iterator.seek(startKeyGroupPrefixBytes); + + while (iterator.isValid()) { + + int keyGroup = 0; + for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { + keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; + } + + if (stateBackend.keyGroupRange.contains(keyGroup)) { + writeBatchWrapper.put(targetColumnFamilyHandle, iterator.key(), iterator.value()); + } else { + // Since the iterator will visit the record according to the sorted order, + // we can just break here. + break; + } + + iterator.next(); + } + } // releases native iterator resources + } + } finally { + FileSystem restoreFileSystem = temporaryRestoreInstancePath.getFileSystem(); + if (restoreFileSystem.exists(temporaryRestoreInstancePath)) { + restoreFileSystem.delete(temporaryRestoreInstancePath, true); + } + } + } + } + + private class RestoredDBInstance implements AutoCloseable { + + @Nonnull + private final RocksDB db; + + @Nonnull + private final ColumnFamilyHandle defaultColumnFamilyHandle; + + @Nonnull + private final List columnFamilyHandles; + + @Nonnull + private final List columnFamilyDescriptors; + + @Nonnull + private final List> stateMetaInfoSnapshots; + + public RestoredDBInstance(@Nonnull RocksDB db, + @Nonnull List columnFamilyHandles, + @Nonnull List columnFamilyDescriptors, + @Nonnull List> stateMetaInfoSnapshots) { + this.db = db; + this.columnFamilyHandles = columnFamilyHandles; + this.defaultColumnFamilyHandle = this.columnFamilyHandles.remove(0); + this.columnFamilyDescriptors = columnFamilyDescriptors; + this.stateMetaInfoSnapshots = stateMetaInfoSnapshots; + } + + @Override + public void close() throws Exception { + + IOUtils.closeQuietly(defaultColumnFamilyHandle); + + for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { + IOUtils.closeQuietly(columnFamilyHandle); + } + + IOUtils.closeQuietly(db); + } + } + + private RestoredDBInstance restoreDBFromStateHandle( + IncrementalKeyedStateHandle restoreStateHandle, + Path temporaryRestoreInstancePath, + KeyGroupRange targetKeyGroupRange, + int keyGroupPrefixBytes, + boolean needClip) throws Exception { + + transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath); + // read meta data List> stateMetaInfoSnapshots = - readMetaData(localKeyedStateHandle.getMetaDataState()); + readMetaData(restoreStateHandle.getMetaStateHandle()); List columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots); - restoreLocalStateIntoFullInstance( - localKeyedStateHandle, + List columnFamilyHandles = + new ArrayList<>(stateMetaInfoSnapshots.size() + 1); + + RocksDB restoreDb = stateBackend.openDB( + temporaryRestoreInstancePath.getPath(), columnFamilyDescriptors, - stateMetaInfoSnapshots); + columnFamilyHandles); + + if (needClip) { + RocksDBIncrementalCheckpointUtils.clipDBWithKeyGroupRange( + restoreDb, + columnFamilyHandles, + targetKeyGroupRange, + restoreStateHandle.getKeyGroupRange(), + keyGroupPrefixBytes); + } + + return new RestoredDBInstance(restoreDb, columnFamilyHandles, columnFamilyDescriptors, stateMetaInfoSnapshots); + } + + private ColumnFamilyHandle getOrRegisterColumnFamilyHandle( + ColumnFamilyDescriptor columnFamilyDescriptor, + ColumnFamilyHandle columnFamilyHandle, + RegisteredKeyedBackendStateMetaInfo.Snapshot stateMetaInfoSnapshot) throws RocksDBException { + + Tuple2> registeredStateMetaInfoEntry = + stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName()); + + if (null == registeredStateMetaInfoEntry) { + RegisteredKeyedBackendStateMetaInfo stateMetaInfo = + new RegisteredKeyedBackendStateMetaInfo<>( + stateMetaInfoSnapshot.getStateType(), + stateMetaInfoSnapshot.getName(), + stateMetaInfoSnapshot.getNamespaceSerializer(), + stateMetaInfoSnapshot.getStateSerializer()); + + registeredStateMetaInfoEntry = + new Tuple2<>( + columnFamilyHandle != null ? columnFamilyHandle : stateBackend.db.createColumnFamily(columnFamilyDescriptor), + stateMetaInfo); + + stateBackend.kvStateInformation.put( + stateMetaInfoSnapshot.getName(), + registeredStateMetaInfoEntry); + } + + return registeredStateMetaInfoEntry.f0; + } + + private void chooseTheBestStateHandleToInit( --- End diff -- I think the name of this method is no longer accurate: it does not only chose the best handle, it already restores as db instance. Maybe we can we still break this up into two methods, so that each method only does one thing. I think it is not so nice if that creating the db is a side effect of a method that claims to only find something. ---