From commits-return-15910-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Feb 19 18:39:41 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 469981807A3 for ; Mon, 19 Feb 2018 18:39:39 +0100 (CET) Received: (qmail 64184 invoked by uid 500); 19 Feb 2018 17:39:38 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 63948 invoked by uid 99); 19 Feb 2018 17:39:38 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 19 Feb 2018 17:39:38 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3F691ED499; Mon, 19 Feb 2018 17:39:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Mon, 19 Feb 2018 17:39:43 -0000 Message-Id: <4619b67a28d04269a015f35f6806870c@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [7/8] flink git commit: [FLINK-8695] [rocksdb] Move flink-statebackend-rocksdb from 'flink-contrib' to 'flink-state-backends'. http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java deleted file mode 100644 index 5507339..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ /dev/null @@ -1,2033 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.state.AggregatingStateDescriptor; -import org.apache.flink.api.common.state.FoldingStateDescriptor; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.MapStateDescriptor; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.StateDescriptor; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.core.fs.CloseableRegistry; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.core.fs.FileStatus; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.core.memory.ByteArrayInputStreamWithPos; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; -import org.apache.flink.runtime.checkpoint.CheckpointType; -import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources; -import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; -import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.AbstractKeyedStateBackend; -import org.apache.flink.runtime.state.CheckpointStreamFactory; -import org.apache.flink.runtime.state.CheckpointedStateScope; -import org.apache.flink.runtime.state.DoneFuture; -import org.apache.flink.runtime.state.IncrementalKeyedStateHandle; -import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.KeyGroupRangeOffsets; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.KeyedBackendSerializationProxy; -import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; -import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; -import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; -import org.apache.flink.runtime.state.StateHandleID; -import org.apache.flink.runtime.state.StateObject; -import org.apache.flink.runtime.state.StateUtil; -import org.apache.flink.runtime.state.StreamCompressionDecorator; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; -import org.apache.flink.runtime.state.internal.InternalAggregatingState; -import org.apache.flink.runtime.state.internal.InternalFoldingState; -import org.apache.flink.runtime.state.internal.InternalListState; -import org.apache.flink.runtime.state.internal.InternalMapState; -import org.apache.flink.runtime.state.internal.InternalReducingState; -import org.apache.flink.runtime.state.internal.InternalValueState; -import org.apache.flink.util.FileUtils; -import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.IOUtils; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.ResourceGuard; -import org.apache.flink.util.StateMigrationException; - -import org.rocksdb.Checkpoint; -import org.rocksdb.ColumnFamilyDescriptor; -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.ColumnFamilyOptions; -import org.rocksdb.DBOptions; -import org.rocksdb.ReadOptions; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; -import org.rocksdb.Snapshot; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.file.Files; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.NoSuchElementException; -import java.util.Objects; -import java.util.PriorityQueue; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.FutureTask; -import java.util.concurrent.RunnableFuture; -import java.util.stream.Stream; -import java.util.stream.StreamSupport; - -/** - * An {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and serializes state to - * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon - * checkpointing. This state backend can store very large state that exceeds memory and spills - * to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe. - * - *

This class follows the rules for closing/releasing native RocksDB resources as described in - + - * this document. - */ -public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { - - private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class); - - /** The name of the merge operator in RocksDB. Do not change except you know exactly what you do. */ - public static final String MERGE_OPERATOR_NAME = "stringappendtest"; - - /** File suffix of sstable files. */ - private static final String SST_FILE_SUFFIX = ".sst"; - - /** Bytes for the name of the column descriptor for the default column family. */ - public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = "default".getBytes(ConfigConstants.DEFAULT_CHARSET); - - /** String that identifies the operator that owns this backend. */ - private final String operatorIdentifier; - - /** The column family options from the options factory. */ - private final ColumnFamilyOptions columnOptions; - - /** The DB options from the options factory. */ - private final DBOptions dbOptions; - - /** Path where this configured instance stores its data directory. */ - private final File instanceBasePath; - - /** Path where this configured instance stores its RocksDB database. */ - private final File instanceRocksDBPath; - - /** - * Protects access to RocksDB in other threads, like the checkpointing thread from parallel call that disposes the - * RocksDb object. - */ - private final ResourceGuard rocksDBResourceGuard; - - /** - * Our RocksDB database, this is used by the actual subclasses of {@link AbstractRocksDBState} - * to store state. The different k/v states that we have don't each have their own RocksDB - * instance. They all write to this instance but to their own column family. - */ - protected RocksDB db; - - /** - * We are not using the default column family for Flink state ops, but we still need to remember this handle so that - * we can close it properly when the backend is closed. This is required by RocksDB's native memory management. - */ - private ColumnFamilyHandle defaultColumnFamily; - - /** - * Information about the k/v states as we create them. This is used to retrieve the - * column family that is used for a state and also for sanity checks when restoring. - */ - private final Map>> kvStateInformation; - - /** - * Map of state names to their corresponding restored state meta info. - * - *

TODO this map can be removed when eager-state registration is in place. - * TODO we currently need this cached to check state migration strategies when new serializers are registered. - */ - private final Map> restoredKvStateMetaInfos; - - /** Number of bytes required to prefix the key groups. */ - private final int keyGroupPrefixBytes; - - /** True if incremental checkpointing is enabled. */ - private final boolean enableIncrementalCheckpointing; - - /** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */ - private final SortedMap> materializedSstFiles; - - /** The identifier of the last completed checkpoint. */ - private long lastCompletedCheckpointId = -1L; - - /** Unique ID of this backend. */ - private UUID backendUID; - - public RocksDBKeyedStateBackend( - String operatorIdentifier, - ClassLoader userCodeClassLoader, - File instanceBasePath, - DBOptions dbOptions, - ColumnFamilyOptions columnFamilyOptions, - TaskKvStateRegistry kvStateRegistry, - TypeSerializer keySerializer, - int numberOfKeyGroups, - KeyGroupRange keyGroupRange, - ExecutionConfig executionConfig, - boolean enableIncrementalCheckpointing - ) throws IOException { - - super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig); - - this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier); - - this.enableIncrementalCheckpointing = enableIncrementalCheckpointing; - this.rocksDBResourceGuard = new ResourceGuard(); - - // ensure that we use the right merge operator, because other code relies on this - this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions) - .setMergeOperatorName(MERGE_OPERATOR_NAME); - - this.dbOptions = Preconditions.checkNotNull(dbOptions); - - this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath); - this.instanceRocksDBPath = new File(instanceBasePath, "db"); - - if (instanceBasePath.exists()) { - // Clear the base directory when the backend is created - // in case something crashed and the backend never reached dispose() - cleanInstanceBasePath(); - } - - if (!instanceBasePath.mkdirs()) { - throw new IOException( - String.format("Could not create RocksDB data directory at %s.", instanceBasePath.getAbsolutePath())); - } - - this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1; - this.kvStateInformation = new HashMap<>(); - this.restoredKvStateMetaInfos = new HashMap<>(); - this.materializedSstFiles = new TreeMap<>(); - this.backendUID = UUID.randomUUID(); - LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID); - } - - @Override - public Stream getKeys(String state, N namespace) { - Tuple2 columnInfo = kvStateInformation.get(state); - if (columnInfo == null) { - return Stream.empty(); - } - - RocksIterator iterator = db.newIterator(columnInfo.f0); - iterator.seekToFirst(); - - Iterable iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes); - Stream targetStream = StreamSupport.stream(iterable.spliterator(), false); - return targetStream.onClose(iterator::close); - } - - /** - * Should only be called by one thread, and only after all accesses to the DB happened. - */ - @Override - public void dispose() { - super.dispose(); - - // This call will block until all clients that still acquire access to the RocksDB instance have released it, - // so that we cannot release the native resources while clients are still working with it in parallel. - rocksDBResourceGuard.close(); - - // IMPORTANT: null reference to signal potential async checkpoint workers that the db was disposed, as - // working on the disposed object results in SEGFAULTS. - if (db != null) { - - // RocksDB's native memory management requires that *all* CFs (including default) are closed before the - // DB is closed. So we start with the ones created by Flink... - for (Tuple2> columnMetaData : - kvStateInformation.values()) { - IOUtils.closeQuietly(columnMetaData.f0); - } - - // ... close the default CF ... - IOUtils.closeQuietly(defaultColumnFamily); - - // ... and finally close the DB instance ... - IOUtils.closeQuietly(db); - - // invalidate the reference before releasing the lock so that other accesses will not cause crashes - db = null; - } - - kvStateInformation.clear(); - restoredKvStateMetaInfos.clear(); - - IOUtils.closeQuietly(dbOptions); - IOUtils.closeQuietly(columnOptions); - - cleanInstanceBasePath(); - } - - private void cleanInstanceBasePath() { - LOG.info("Deleting existing instance base directory {}.", instanceBasePath); - - try { - FileUtils.deleteDirectory(instanceBasePath); - } catch (IOException ex) { - LOG.warn("Could not delete instance base path for RocksDB: " + instanceBasePath, ex); - } - } - - public int getKeyGroupPrefixBytes() { - return keyGroupPrefixBytes; - } - - /** - * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and - * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always - * be called by the same thread. - * - * @param checkpointId The Id of the checkpoint. - * @param timestamp The timestamp of the checkpoint. - * @param streamFactory The factory that we can use for writing our state to streams. - * @param checkpointOptions Options for how to perform this checkpoint. - * @return Future to the state handle of the snapshot data. - * @throws Exception - */ - @Override - public RunnableFuture snapshot( - final long checkpointId, - final long timestamp, - final CheckpointStreamFactory streamFactory, - CheckpointOptions checkpointOptions) throws Exception { - - if (checkpointOptions.getCheckpointType() != CheckpointType.SAVEPOINT && - enableIncrementalCheckpointing) { - return snapshotIncrementally(checkpointId, timestamp, streamFactory); - } else { - return snapshotFully(checkpointId, timestamp, streamFactory); - } - } - - private RunnableFuture snapshotIncrementally( - final long checkpointId, - final long checkpointTimestamp, - final CheckpointStreamFactory checkpointStreamFactory) throws Exception { - - if (db == null) { - throw new IOException("RocksDB closed."); - } - - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", - checkpointTimestamp); - } - return DoneFuture.nullValue(); - } - - final RocksDBIncrementalSnapshotOperation snapshotOperation = - new RocksDBIncrementalSnapshotOperation<>( - this, - checkpointStreamFactory, - checkpointId, - checkpointTimestamp); - - try { - snapshotOperation.takeSnapshot(); - } catch (Exception e) { - snapshotOperation.stop(); - snapshotOperation.releaseResources(true); - throw e; - } - - return new FutureTask( - new Callable() { - @Override - public KeyedStateHandle call() throws Exception { - return snapshotOperation.materializeSnapshot(); - } - } - ) { - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - snapshotOperation.stop(); - return super.cancel(mayInterruptIfRunning); - } - - @Override - protected void done() { - snapshotOperation.releaseResources(isCancelled()); - } - }; - } - - private RunnableFuture snapshotFully( - final long checkpointId, - final long timestamp, - final CheckpointStreamFactory streamFactory) throws Exception { - - long startTime = System.currentTimeMillis(); - final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry(); - - final RocksDBFullSnapshotOperation snapshotOperation; - - if (kvStateInformation.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", timestamp); - } - - return DoneFuture.nullValue(); - } - - snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry); - snapshotOperation.takeDBSnapShot(checkpointId, timestamp); - - // implementation of the async IO operation, based on FutureTask - AbstractAsyncCallableWithResources ioCallable = - new AbstractAsyncCallableWithResources() { - - @Override - protected void acquireResources() throws Exception { - cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry); - snapshotOperation.openCheckpointStream(); - } - - @Override - protected void releaseResources() throws Exception { - closeLocalRegistry(); - releaseSnapshotOperationResources(); - } - - private void releaseSnapshotOperationResources() { - // hold the db lock while operation on the db to guard us against async db disposal - snapshotOperation.releaseSnapshotResources(); - } - - @Override - protected void stopOperation() throws Exception { - closeLocalRegistry(); - } - - private void closeLocalRegistry() { - if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) { - try { - snapshotCloseableRegistry.close(); - } catch (Exception ex) { - LOG.warn("Error closing local registry", ex); - } - } - } - - @Override - public KeyGroupsStateHandle performOperation() throws Exception { - long startTime = System.currentTimeMillis(); - - if (isStopped()) { - throw new IOException("RocksDB closed."); - } - - snapshotOperation.writeDBSnapshot(); - - LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.", - streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); - - return snapshotOperation.getSnapshotResultStateHandle(); - } - }; - - LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.", - streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime)); - - return AsyncStoppableTaskWithCallback.from(ioCallable); - } - - /** - * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend. - */ - static final class RocksDBFullSnapshotOperation { - - static final int FIRST_BIT_IN_BYTE_MASK = 0x80; - static final int END_OF_KEY_GROUP_MARK = 0xFFFF; - - private final RocksDBKeyedStateBackend stateBackend; - private final KeyGroupRangeOffsets keyGroupRangeOffsets; - private final CheckpointStreamFactory checkpointStreamFactory; - private final CloseableRegistry snapshotCloseableRegistry; - private final ResourceGuard.Lease dbLease; - - private long checkpointId; - private long checkpointTimeStamp; - - private Snapshot snapshot; - private ReadOptions readOptions; - private List> kvStateIterators; - - private CheckpointStreamFactory.CheckpointStateOutputStream outStream; - private DataOutputView outputView; - - RocksDBFullSnapshotOperation( - RocksDBKeyedStateBackend stateBackend, - CheckpointStreamFactory checkpointStreamFactory, - CloseableRegistry registry) throws IOException { - - this.stateBackend = stateBackend; - this.checkpointStreamFactory = checkpointStreamFactory; - this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange); - this.snapshotCloseableRegistry = registry; - this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); - } - - /** - * 1) Create a snapshot object from RocksDB. - * - * @param checkpointId id of the checkpoint for which we take the snapshot - * @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot - */ - public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) { - Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!"); - this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size()); - this.checkpointId = checkpointId; - this.checkpointTimeStamp = checkpointTimeStamp; - this.snapshot = stateBackend.db.getSnapshot(); - } - - /** - * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write. - * - * @throws Exception - */ - public void openCheckpointStream() throws Exception { - Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set."); - outStream = checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); - snapshotCloseableRegistry.registerCloseable(outStream); - outputView = new DataOutputViewStreamWrapper(outStream); - } - - /** - * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1). - * - * @throws IOException - */ - public void writeDBSnapshot() throws IOException, InterruptedException { - - if (null == snapshot) { - throw new IOException("No snapshot available. Might be released due to cancellation."); - } - - Preconditions.checkNotNull(outStream, "No output stream to write snapshot."); - writeKVStateMetaData(); - writeKVStateData(); - } - - /** - * 4) Returns a state handle to the snapshot after the snapshot procedure is completed and null before. - * - * @return state handle to the completed snapshot - */ - public KeyGroupsStateHandle getSnapshotResultStateHandle() throws IOException { - - if (snapshotCloseableRegistry.unregisterCloseable(outStream)) { - - StreamStateHandle stateHandle = outStream.closeAndGetHandle(); - outStream = null; - - if (stateHandle != null) { - return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle); - } - } - return null; - } - - /** - * 5) Release the snapshot object for RocksDB and clean up. - */ - public void releaseSnapshotResources() { - - outStream = null; - - if (null != kvStateIterators) { - for (Tuple2 kvStateIterator : kvStateIterators) { - IOUtils.closeQuietly(kvStateIterator.f0); - } - kvStateIterators = null; - } - - if (null != snapshot) { - if (null != stateBackend.db) { - stateBackend.db.releaseSnapshot(snapshot); - } - IOUtils.closeQuietly(snapshot); - snapshot = null; - } - - if (null != readOptions) { - IOUtils.closeQuietly(readOptions); - readOptions = null; - } - - this.dbLease.close(); - } - - private void writeKVStateMetaData() throws IOException { - - List> metaInfoSnapshots = - new ArrayList<>(stateBackend.kvStateInformation.size()); - - int kvStateId = 0; - for (Map.Entry>> column : - stateBackend.kvStateInformation.entrySet()) { - - metaInfoSnapshots.add(column.getValue().f1.snapshot()); - - //retrieve iterator for this k/v states - readOptions = new ReadOptions(); - readOptions.setSnapshot(snapshot); - - kvStateIterators.add( - new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId)); - - ++kvStateId; - } - - KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy<>( - stateBackend.getKeySerializer(), - metaInfoSnapshots, - !Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator)); - - serializationProxy.write(outputView); - } - - private void writeKVStateData() throws IOException, InterruptedException { - - byte[] previousKey = null; - byte[] previousValue = null; - OutputStream kgOutStream = null; - DataOutputView kgOutView = null; - - try { - // Here we transfer ownership of RocksIterators to the RocksDBMergeIterator - try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator( - kvStateIterators, stateBackend.keyGroupPrefixBytes)) { - - // handover complete, null out to prevent double close - kvStateIterators = null; - - //preamble: setup with first key-group as our lookahead - if (mergeIterator.isValid()) { - //begin first key-group by recording the offset - keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); - //write the k/v-state id as metadata - kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream); - kgOutView = new DataOutputViewStreamWrapper(kgOutStream); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(mergeIterator.kvStateId()); - previousKey = mergeIterator.key(); - previousValue = mergeIterator.value(); - mergeIterator.next(); - } - - //main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets. - while (mergeIterator.isValid()) { - - assert (!hasMetaDataFollowsFlag(previousKey)); - - //set signal in first key byte that meta data will follow in the stream after this k/v pair - if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) { - - //be cooperative and check for interruption from time to time in the hot loop - checkInterrupted(); - - setMetaDataFollowsFlagInKey(previousKey); - } - - writeKeyValuePair(previousKey, previousValue, kgOutView); - - //write meta data if we have to - if (mergeIterator.isNewKeyGroup()) { - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(END_OF_KEY_GROUP_MARK); - // this will just close the outer stream - kgOutStream.close(); - //begin new key-group - keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos()); - //write the kev-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream); - kgOutView = new DataOutputViewStreamWrapper(kgOutStream); - kgOutView.writeShort(mergeIterator.kvStateId()); - } else if (mergeIterator.isNewKeyValueState()) { - //write the k/v-state - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(mergeIterator.kvStateId()); - } - - //request next k/v pair - previousKey = mergeIterator.key(); - previousValue = mergeIterator.value(); - mergeIterator.next(); - } - } - - //epilogue: write last key-group - if (previousKey != null) { - assert (!hasMetaDataFollowsFlag(previousKey)); - setMetaDataFollowsFlagInKey(previousKey); - writeKeyValuePair(previousKey, previousValue, kgOutView); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kgOutView.writeShort(END_OF_KEY_GROUP_MARK); - // this will just close the outer stream - kgOutStream.close(); - kgOutStream = null; - } - - } finally { - // this will just close the outer stream - IOUtils.closeQuietly(kgOutStream); - } - } - - private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException { - BytePrimitiveArraySerializer.INSTANCE.serialize(key, out); - BytePrimitiveArraySerializer.INSTANCE.serialize(value, out); - } - - static void setMetaDataFollowsFlagInKey(byte[] key) { - key[0] |= FIRST_BIT_IN_BYTE_MASK; - } - - static void clearMetaDataFollowsFlag(byte[] key) { - key[0] &= (~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); - } - - static boolean hasMetaDataFollowsFlag(byte[] key) { - return 0 != (key[0] & RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK); - } - - private static void checkInterrupted() throws InterruptedException { - if (Thread.currentThread().isInterrupted()) { - throw new InterruptedException("RocksDB snapshot interrupted."); - } - } - } - - private static final class RocksDBIncrementalSnapshotOperation { - - /** The backend which we snapshot. */ - private final RocksDBKeyedStateBackend stateBackend; - - /** Stream factory that creates the outpus streams to DFS. */ - private final CheckpointStreamFactory checkpointStreamFactory; - - /** Id for the current checkpoint. */ - private final long checkpointId; - - /** Timestamp for the current checkpoint. */ - private final long checkpointTimestamp; - - /** All sst files that were part of the last previously completed checkpoint. */ - private Set baseSstFiles; - - /** The state meta data. */ - private final List> stateMetaInfoSnapshots = new ArrayList<>(); - - private FileSystem backupFileSystem; - private Path backupPath; - - // Registry for all opened i/o streams - private final CloseableRegistry closeableRegistry = new CloseableRegistry(); - - // new sst files since the last completed checkpoint - private final Map sstFiles = new HashMap<>(); - - // handles to the misc files in the current snapshot - private final Map miscFiles = new HashMap<>(); - - // This lease protects from concurrent disposal of the native rocksdb instance. - private final ResourceGuard.Lease dbLease; - - private StreamStateHandle metaStateHandle = null; - - private RocksDBIncrementalSnapshotOperation( - RocksDBKeyedStateBackend stateBackend, - CheckpointStreamFactory checkpointStreamFactory, - long checkpointId, - long checkpointTimestamp) throws IOException { - - this.stateBackend = stateBackend; - this.checkpointStreamFactory = checkpointStreamFactory; - this.checkpointId = checkpointId; - this.checkpointTimestamp = checkpointTimestamp; - this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource(); - } - - private StreamStateHandle materializeStateData(Path filePath) throws Exception { - FSDataInputStream inputStream = null; - CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; - - try { - final byte[] buffer = new byte[8 * 1024]; - - FileSystem backupFileSystem = backupPath.getFileSystem(); - inputStream = backupFileSystem.open(filePath); - closeableRegistry.registerCloseable(inputStream); - - outputStream = checkpointStreamFactory - .createCheckpointStateOutputStream(CheckpointedStateScope.SHARED); - closeableRegistry.registerCloseable(outputStream); - - while (true) { - int numBytes = inputStream.read(buffer); - - if (numBytes == -1) { - break; - } - - outputStream.write(buffer, 0, numBytes); - } - - StreamStateHandle result = null; - if (closeableRegistry.unregisterCloseable(outputStream)) { - result = outputStream.closeAndGetHandle(); - outputStream = null; - } - return result; - - } finally { - if (inputStream != null && closeableRegistry.unregisterCloseable(inputStream)) { - inputStream.close(); - } - - if (outputStream != null && closeableRegistry.unregisterCloseable(outputStream)) { - outputStream.close(); - } - } - } - - private StreamStateHandle materializeMetaData() throws Exception { - CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null; - - try { - outputStream = checkpointStreamFactory - .createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE); - closeableRegistry.registerCloseable(outputStream); - - //no need for compression scheme support because sst-files are already compressed - KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy<>( - stateBackend.keySerializer, - stateMetaInfoSnapshots, - false); - - DataOutputView out = new DataOutputViewStreamWrapper(outputStream); - - serializationProxy.write(out); - - StreamStateHandle result = null; - if (closeableRegistry.unregisterCloseable(outputStream)) { - result = outputStream.closeAndGetHandle(); - outputStream = null; - } - return result; - } finally { - if (outputStream != null) { - if (closeableRegistry.unregisterCloseable(outputStream)) { - outputStream.close(); - } - } - } - } - - void takeSnapshot() throws Exception { - - final long lastCompletedCheckpoint; - - // use the last completed checkpoint as the comparison base. - synchronized (stateBackend.materializedSstFiles) { - lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId; - baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint); - } - - LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " + - "assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles); - - // save meta data - for (Map.Entry>> stateMetaInfoEntry - : stateBackend.kvStateInformation.entrySet()) { - stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot()); - } - - // save state data - backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId); - - LOG.trace("Local RocksDB checkpoint goes to backup path {}.", backupPath); - - backupFileSystem = backupPath.getFileSystem(); - if (backupFileSystem.exists(backupPath)) { - throw new IllegalStateException("Unexpected existence of the backup directory."); - } - - // create hard links of living files in the checkpoint path - Checkpoint checkpoint = Checkpoint.create(stateBackend.db); - checkpoint.createCheckpoint(backupPath.getPath()); - } - - KeyedStateHandle materializeSnapshot() throws Exception { - - stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry); - - // write meta data - metaStateHandle = materializeMetaData(); - - // write state data - Preconditions.checkState(backupFileSystem.exists(backupPath)); - - FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath); - if (fileStatuses != null) { - for (FileStatus fileStatus : fileStatuses) { - final Path filePath = fileStatus.getPath(); - final String fileName = filePath.getName(); - final StateHandleID stateHandleID = new StateHandleID(fileName); - - if (fileName.endsWith(SST_FILE_SUFFIX)) { - final boolean existsAlready = - baseSstFiles != null && baseSstFiles.contains(stateHandleID); - - if (existsAlready) { - // we introduce a placeholder state handle, that is replaced with the - // original from the shared state registry (created from a previous checkpoint) - sstFiles.put( - stateHandleID, - new PlaceholderStreamStateHandle()); - } else { - sstFiles.put(stateHandleID, materializeStateData(filePath)); - } - } else { - StreamStateHandle fileHandle = materializeStateData(filePath); - miscFiles.put(stateHandleID, fileHandle); - } - } - } - - synchronized (stateBackend.materializedSstFiles) { - stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet()); - } - - return new IncrementalKeyedStateHandle( - stateBackend.backendUID, - stateBackend.keyGroupRange, - checkpointId, - sstFiles, - miscFiles, - metaStateHandle); - } - - void stop() { - - if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { - try { - closeableRegistry.close(); - } catch (IOException e) { - LOG.warn("Could not properly close io streams.", e); - } - } - } - - void releaseResources(boolean canceled) { - - dbLease.close(); - - if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) { - try { - closeableRegistry.close(); - } catch (IOException e) { - LOG.warn("Exception on closing registry.", e); - } - } - - if (backupPath != null) { - try { - if (backupFileSystem.exists(backupPath)) { - - LOG.trace("Deleting local RocksDB backup path {}.", backupPath); - backupFileSystem.delete(backupPath, true); - } - } catch (Exception e) { - LOG.warn("Could not properly delete the checkpoint directory.", e); - } - } - - if (canceled) { - Collection statesToDiscard = - new ArrayList<>(1 + miscFiles.size() + sstFiles.size()); - - statesToDiscard.add(metaStateHandle); - statesToDiscard.addAll(miscFiles.values()); - statesToDiscard.addAll(sstFiles.values()); - - try { - StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard); - } catch (Exception e) { - LOG.warn("Could not properly discard states.", e); - } - } - } - } - - @Override - public void restore(Collection restoreState) throws Exception { - LOG.info("Initializing RocksDB keyed state backend from snapshot."); - - if (LOG.isDebugEnabled()) { - LOG.debug("Restoring snapshot from state handles: {}.", restoreState); - } - - // clear all meta data - kvStateInformation.clear(); - restoredKvStateMetaInfos.clear(); - - try { - if (restoreState == null || restoreState.isEmpty()) { - createDB(); - } else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) { - RocksDBIncrementalRestoreOperation restoreOperation = new RocksDBIncrementalRestoreOperation<>(this); - restoreOperation.restore(restoreState); - } else { - RocksDBFullRestoreOperation restoreOperation = new RocksDBFullRestoreOperation<>(this); - restoreOperation.doRestore(restoreState); - } - } catch (Exception ex) { - dispose(); - throw ex; - } - } - - @Override - public void notifyCheckpointComplete(long completedCheckpointId) { - synchronized (materializedSstFiles) { - if (completedCheckpointId < lastCompletedCheckpointId) { - return; - } - - materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId); - - lastCompletedCheckpointId = completedCheckpointId; - } - } - - private void createDB() throws IOException { - List columnFamilyHandles = new ArrayList<>(1); - this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles); - this.defaultColumnFamily = columnFamilyHandles.get(0); - } - - private RocksDB openDB( - String path, - List stateColumnFamilyDescriptors, - List stateColumnFamilyHandles) throws IOException { - - List columnFamilyDescriptors = - new ArrayList<>(1 + stateColumnFamilyDescriptors.size()); - - columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors); - - // we add the required descriptor for the default CF in last position. - columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions)); - - RocksDB dbRef; - - try { - dbRef = RocksDB.open( - Preconditions.checkNotNull(dbOptions), - Preconditions.checkNotNull(path), - columnFamilyDescriptors, - stateColumnFamilyHandles); - } catch (RocksDBException e) { - throw new IOException("Error while opening RocksDB instance.", e); - } - - // requested + default CF - Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(), - "Not all requested column family handles have been created"); - - return dbRef; - } - - /** - * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot. - */ - static final class RocksDBFullRestoreOperation { - - private final RocksDBKeyedStateBackend rocksDBKeyedStateBackend; - - /** Current key-groups state handle from which we restore key-groups. */ - private KeyGroupsStateHandle currentKeyGroupsStateHandle; - /** Current input stream we obtained from currentKeyGroupsStateHandle. */ - private FSDataInputStream currentStateHandleInStream; - /** Current data input view that wraps currentStateHandleInStream. */ - private DataInputView currentStateHandleInView; - /** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */ - private List currentStateHandleKVStateColumnFamilies; - /** The compression decorator that was used for writing the state, as determined by the meta data. */ - private StreamCompressionDecorator keygroupStreamCompressionDecorator; - - /** - * Creates a restore operation object for the given state backend instance. - * - * @param rocksDBKeyedStateBackend the state backend into which we restore - */ - public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend rocksDBKeyedStateBackend) { - this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend); - } - - /** - * Restores all key-groups data that is referenced by the passed state handles. - * - * @param keyedStateHandles List of all key groups state handles that shall be restored. - */ - public void doRestore(Collection keyedStateHandles) - throws IOException, StateMigrationException, RocksDBException { - - rocksDBKeyedStateBackend.createDB(); - - for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { - if (keyedStateHandle != null) { - - if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) { - throw new IllegalStateException("Unexpected state handle type, " + - "expected: " + KeyGroupsStateHandle.class + - ", but found: " + keyedStateHandle.getClass()); - } - this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; - restoreKeyGroupsInStateHandle(); - } - } - } - - /** - * Restore one key groups state handle. - */ - private void restoreKeyGroupsInStateHandle() - throws IOException, StateMigrationException, RocksDBException { - try { - currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream(); - rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream); - currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream); - restoreKVStateMetaData(); - restoreKVStateData(); - } finally { - if (currentStateHandleInStream != null - && rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) { - IOUtils.closeQuietly(currentStateHandleInStream); - } - } - } - - /** - * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle. - * - * @throws IOException - * @throws ClassNotFoundException - * @throws RocksDBException - */ - private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException { - - KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader); - - serializationProxy.read(currentStateHandleInView); - - // check for key serializer compatibility; this also reconfigures the - // key serializer to be compatible, if it is required and is possible - if (CompatibilityUtil.resolveCompatibilityResult( - serializationProxy.getKeySerializer(), - UnloadableDummyTypeSerializer.class, - serializationProxy.getKeySerializerConfigSnapshot(), - rocksDBKeyedStateBackend.keySerializer) - .isRequiresMigration()) { - - // TODO replace with state migration; note that key hash codes need to remain the same after migration - throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + - "Aborting now since state migration is currently not available"); - } - - this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ? - SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE; - - List> restoredMetaInfos = - serializationProxy.getStateMetaInfoSnapshots(); - currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size()); - //rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size()); - - for (RegisteredKeyedBackendStateMetaInfo.Snapshot restoredMetaInfo : restoredMetaInfos) { - - Tuple2> registeredColumn = - rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName()); - - if (registeredColumn == null) { - byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); - - ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( - nameBytes, - rocksDBKeyedStateBackend.columnOptions); - - RegisteredKeyedBackendStateMetaInfo stateMetaInfo = - new RegisteredKeyedBackendStateMetaInfo<>( - restoredMetaInfo.getStateType(), - restoredMetaInfo.getName(), - restoredMetaInfo.getNamespaceSerializer(), - restoredMetaInfo.getStateSerializer()); - - rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo); - - ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor); - - registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo); - rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn); - - } else { - // TODO with eager state registration in place, check here for serializer migration strategies - } - currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0); - } - } - - /** - * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle. - * - * @throws IOException - * @throws RocksDBException - */ - private void restoreKVStateData() throws IOException, RocksDBException { - //for all key-groups in the current state handle... - for (Tuple2 keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) { - int keyGroup = keyGroupOffset.f0; - - // Check that restored key groups all belong to the backend - Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup), - "The key group must belong to the backend"); - - long offset = keyGroupOffset.f1; - //not empty key-group? - if (0L != offset) { - currentStateHandleInStream.seek(offset); - try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) { - DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - int kvStateId = compressedKgInputView.readShort(); - ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); - //insert all k/v pairs into DB - boolean keyGroupHasMoreKeys = true; - while (keyGroupHasMoreKeys) { - byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); - byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView); - if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) { - //clear the signal bit in the key to make it ready for insertion again - RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key); - rocksDBKeyedStateBackend.db.put(handle, key, value); - //TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible - kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK - & compressedKgInputView.readShort(); - if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) { - keyGroupHasMoreKeys = false; - } else { - handle = currentStateHandleKVStateColumnFamilies.get(kvStateId); - } - } else { - rocksDBKeyedStateBackend.db.put(handle, key, value); - } - } - } - } - } - } - } - - private static class RocksDBIncrementalRestoreOperation { - - private final RocksDBKeyedStateBackend stateBackend; - - private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend stateBackend) { - this.stateBackend = stateBackend; - } - - private List> readMetaData( - StreamStateHandle metaStateHandle) throws Exception { - - FSDataInputStream inputStream = null; - - try { - inputStream = metaStateHandle.openInputStream(); - stateBackend.cancelStreamRegistry.registerCloseable(inputStream); - - KeyedBackendSerializationProxy serializationProxy = - new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader); - DataInputView in = new DataInputViewStreamWrapper(inputStream); - serializationProxy.read(in); - - // check for key serializer compatibility; this also reconfigures the - // key serializer to be compatible, if it is required and is possible - if (CompatibilityUtil.resolveCompatibilityResult( - serializationProxy.getKeySerializer(), - UnloadableDummyTypeSerializer.class, - serializationProxy.getKeySerializerConfigSnapshot(), - stateBackend.keySerializer) - .isRequiresMigration()) { - - // TODO replace with state migration; note that key hash codes need to remain the same after migration - throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " + - "Aborting now since state migration is currently not available"); - } - - return serializationProxy.getStateMetaInfoSnapshots(); - } finally { - if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { - inputStream.close(); - } - } - } - - private void readStateData( - Path restoreFilePath, - StreamStateHandle remoteFileHandle) throws IOException { - - FileSystem restoreFileSystem = restoreFilePath.getFileSystem(); - - FSDataInputStream inputStream = null; - FSDataOutputStream outputStream = null; - - try { - inputStream = remoteFileHandle.openInputStream(); - stateBackend.cancelStreamRegistry.registerCloseable(inputStream); - - outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE); - stateBackend.cancelStreamRegistry.registerCloseable(outputStream); - - byte[] buffer = new byte[8 * 1024]; - while (true) { - int numBytes = inputStream.read(buffer); - if (numBytes == -1) { - break; - } - - outputStream.write(buffer, 0, numBytes); - } - } finally { - if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) { - inputStream.close(); - } - - if (outputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) { - outputStream.close(); - } - } - } - - private void restoreInstance( - IncrementalKeyedStateHandle restoreStateHandle, - boolean hasExtraKeys) throws Exception { - - // read state data - Path restoreInstancePath = new Path( - stateBackend.instanceBasePath.getAbsolutePath(), - UUID.randomUUID().toString()); - - try { - final Map sstFiles = - restoreStateHandle.getSharedState(); - final Map miscFiles = - restoreStateHandle.getPrivateState(); - - readAllStateData(sstFiles, restoreInstancePath); - readAllStateData(miscFiles, restoreInstancePath); - - // read meta data - List> stateMetaInfoSnapshots = - readMetaData(restoreStateHandle.getMetaStateHandle()); - - List columnFamilyDescriptors = - new ArrayList<>(1 + stateMetaInfoSnapshots.size()); - - for (RegisteredKeyedBackendStateMetaInfo.Snapshot stateMetaInfoSnapshot : stateMetaInfoSnapshots) { - - ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor( - stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), - stateBackend.columnOptions); - - columnFamilyDescriptors.add(columnFamilyDescriptor); - stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot); - } - - if (hasExtraKeys) { - - List columnFamilyHandles = - new ArrayList<>(1 + columnFamilyDescriptors.size()); - - try (RocksDB restoreDb = stateBackend.openDB( - restoreInstancePath.getPath(), - columnFamilyDescriptors, - columnFamilyHandles)) { - - try { - // iterating only the requested descriptors automatically skips the default column family handle - for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { - ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); - ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i); - RegisteredKeyedBackendStateMetaInfo.Snapshot stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); - - Tuple2> registeredStateMetaInfoEntry = - stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName()); - - if (null == registeredStateMetaInfoEntry) { - - RegisteredKeyedBackendStateMetaInfo stateMetaInfo = - new RegisteredKeyedBackendStateMetaInfo<>( - stateMetaInfoSnapshot.getStateType(), - stateMetaInfoSnapshot.getName(), - stateMetaInfoSnapshot.getNamespaceSerializer(), - stateMetaInfoSnapshot.getStateSerializer()); - - registeredStateMetaInfoEntry = - new Tuple2<>( - stateBackend.db.createColumnFamily(columnFamilyDescriptor), - stateMetaInfo); - - stateBackend.kvStateInformation.put( - stateMetaInfoSnapshot.getName(), - registeredStateMetaInfoEntry); - } - - ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0; - - try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) { - - int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup(); - byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes]; - for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { - startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE)); - } - - iterator.seek(startKeyGroupPrefixBytes); - - while (iterator.isValid()) { - - int keyGroup = 0; - for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) { - keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j]; - } - - if (stateBackend.keyGroupRange.contains(keyGroup)) { - stateBackend.db.put(targetColumnFamilyHandle, - iterator.key(), iterator.value()); - } - - iterator.next(); - } - } // releases native iterator resources - } - } finally { - //release native tmp db column family resources - for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) { - IOUtils.closeQuietly(columnFamilyHandle); - } - } - } // releases native tmp db resources - } else { - // pick up again the old backend id, so the we can reference existing state - stateBackend.backendUID = restoreStateHandle.getBackendIdentifier(); - - LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.", - stateBackend.operatorIdentifier, stateBackend.backendUID); - - // create hard links in the instance directory - if (!stateBackend.instanceRocksDBPath.mkdirs()) { - throw new IOException("Could not create RocksDB data directory."); - } - - createFileHardLinksInRestorePath(sstFiles, restoreInstancePath); - createFileHardLinksInRestorePath(miscFiles, restoreInstancePath); - - List columnFamilyHandles = - new ArrayList<>(1 + columnFamilyDescriptors.size()); - - stateBackend.db = stateBackend.openDB( - stateBackend.instanceRocksDBPath.getAbsolutePath(), - columnFamilyDescriptors, columnFamilyHandles); - - // extract and store the default column family which is located at the last index - stateBackend.defaultColumnFamily = columnFamilyHandles.remove(columnFamilyHandles.size() - 1); - - for (int i = 0; i < columnFamilyDescriptors.size(); ++i) { - RegisteredKeyedBackendStateMetaInfo.Snapshot stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i); - - ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i); - RegisteredKeyedBackendStateMetaInfo stateMetaInfo = - new RegisteredKeyedBackendStateMetaInfo<>( - stateMetaInfoSnapshot.getStateType(), - stateMetaInfoSnapshot.getName(), - stateMetaInfoSnapshot.getNamespaceSerializer(), - stateMetaInfoSnapshot.getStateSerializer()); - - stateBackend.kvStateInformation.put( - stateMetaInfoSnapshot.getName(), - new Tuple2<>(columnFamilyHandle, stateMetaInfo)); - } - - // use the restore sst files as the base for succeeding checkpoints - synchronized (stateBackend.materializedSstFiles) { - stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet()); - } - - stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId(); - } - } finally { - FileSystem restoreFileSystem = restoreInstancePath.getFileSystem(); - if (restoreFileSystem.exists(restoreInstancePath)) { - restoreFileSystem.delete(restoreInstancePath, true); - } - } - } - - private void readAllStateData( - Map stateHandleMap, - Path restoreInstancePath) throws IOException { - - for (Map.Entry entry : stateHandleMap.entrySet()) { - StateHandleID stateHandleID = entry.getKey(); - StreamStateHandle remoteFileHandle = entry.getValue(); - readStateData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle); - } - } - - private void createFileHardLinksInRestorePath( - Map stateHandleMap, - Path restoreInstancePath) throws IOException { - - for (StateHandleID stateHandleID : stateHandleMap.keySet()) { - String newSstFileName = stateHandleID.toString(); - File restoreFile = new File(restoreInstancePath.getPath(), newSstFileName); - File targetFile = new File(stateBackend.instanceRocksDBPath, newSstFileName); - Files.createLink(targetFile.toPath(), restoreFile.toPath()); - } - } - - void restore(Collection restoreStateHandles) throws Exception { - - boolean hasExtraKeys = (restoreStateHandles.size() > 1 || - !Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(), stateBackend.keyGroupRange)); - - if (hasExtraKeys) { - stateBackend.createDB(); - } - - for (KeyedStateHandle rawStateHandle : restoreStateHandles) { - - if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) { - throw new IllegalStateException("Unexpected state handle type, " + - "expected " + IncrementalKeyedStateHandle.class + - ", but found " + rawStateHandle.getClass()); - } - - IncrementalKeyedStateHandle keyedStateHandle = (IncrementalKeyedStateHandle) rawStateHandle; - - restoreInstance(keyedStateHandle, hasExtraKeys); - } - } - } - - // ------------------------------------------------------------------------ - // State factories - // ------------------------------------------------------------------------ - - /** - * Creates a column family handle for use with a k/v state. When restoring from a snapshot - * we don't restore the individual k/v states, just the global RocksDB database and the - * list of column families. When a k/v state is first requested we check here whether we - * already have a column family for that and return it or create a new one if it doesn't exist. - * - *

This also checks whether the {@link StateDescriptor} for a state matches the one - * that we checkpointed, i.e. is already in the map of column families. - */ - @SuppressWarnings("rawtypes, unchecked") - protected ColumnFamilyHandle getColumnFamily( - StateDescriptor descriptor, TypeSerializer namespaceSerializer) throws IOException, StateMigrationException { - - Tuple2> stateInfo = - kvStateInformation.get(descriptor.getName()); - - RegisteredKeyedBackendStateMetaInfo newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>( - descriptor.getType(), - descriptor.getName(), - namespaceSerializer, - descriptor.getSerializer()); - - if (stateInfo != null) { - // TODO with eager registration in place, these checks should be moved to restore() - - RegisteredKeyedBackendStateMetaInfo.Snapshot restoredMetaInfo = - (RegisteredKeyedBackendStateMetaInfo.Snapshot) restoredKvStateMetaInfos.get(descriptor.getName()); - - Preconditions.checkState( - Objects.equals(newMetaInfo.getName(), restoredMetaInfo.getName()), - "Incompatible state names. " + - "Was [" + restoredMetaInfo.getName() + "], " + - "registered with [" + newMetaInfo.getName() + "]."); - - if (!Objects.equals(newMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN) - && !Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) { - - Preconditions.checkState( - newMetaInfo.getStateType() == restoredMetaInfo.getStateType(), - "Incompatible state types. " + - "Was [" + restoredMetaInfo.getStateType() + "], " + - "registered with [" + newMetaInfo.getStateType() + "]."); - } - - // check compatibility results to determine if state migration is required - CompatibilityResult namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult( - restoredMetaInfo.getNamespaceSerializer(), - null, - restoredMetaInfo.getNamespaceSerializerConfigSnapshot(), - newMetaInfo.getNamespaceSerializer()); - - CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult( - restoredMetaInfo.getStateSerializer(), - UnloadableDummyTypeSerializer.class, - restoredMetaInfo.getStateSerializerConfigSnapshot(), - newMetaInfo.getStateSerializer()); - - if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) { - // TODO state migration currently isn't possible. - throw new StateMigrationException("State migration isn't supported, yet."); - } else { - stateInfo.f1 = newMetaInfo; - return stateInfo.f0; - } - } - - byte[] nameBytes = descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET); - Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES, nameBytes), - "The chosen state name 'default' collides with the name of the default column family!"); - - ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions); - - final ColumnFamilyHandle columnFamily; - - try { - columnFamily = db.createColumnFamily(columnDescriptor); - } catch (RocksDBException e) { - throw new IOException("Error creating ColumnFamilyHandle.", e); - } - - Tuple2> tuple = - new Tuple2<>(columnFamily, newMetaInfo); - Map rawAccess = kvStateInformation; - rawAccess.put(descriptor.getName(), tuple); - return columnFamily; - } - - @Override - protected InternalValueState createValueState( - TypeSerializer namespaceSerializer, - ValueStateDescriptor stateDesc) throws Exception { - - ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); - - return new RocksDBValueState<>(columnFamily, namespaceSerializer, stateDesc, this); - } - - @Override - protected InternalListState createListState( - TypeSerializer namespaceSerializer, - ListStateDescriptor stateDesc) throws Exception { - - ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); - - return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this); - } - - @Override - protected InternalReducingState createReducingState( - TypeSerializer namespaceSerializer, - ReducingStateDescriptor stateDesc) throws Exception { - - ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); - - return new RocksDBReducingState<>(columnFamily, namespaceSerializer, stateDesc, this); - } - - @Override - protected InternalAggregatingState createAggregatingState( - TypeSerializer namespaceSerializer, - AggregatingStateDescriptor stateDesc) throws Exception { - - ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); - return new RocksDBAggregatingState<>(columnFamily, namespaceSerializer, stateDesc, this); - } - - @Override - protected InternalFoldingState createFoldingState( - TypeSerializer namespaceSerializer, - FoldingStateDescriptor stateDesc) throws Exception { - - ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); - - return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this); - } - - @Override - protected InternalMapState createMapState( - TypeSerializer namespaceSerializer, - MapStateDescriptor stateDesc) throws Exception { - - ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer); - - return new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this); - } - - /** - * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator. - * Used by #MergeIterator. - */ - static final class MergeIterator implements AutoCloseable { - - /** - * @param iterator The #RocksIterator to wrap . - * @param kvStateId Id of the K/V state to which this iterator belongs. - */ - MergeIterator(RocksIterator iterator, int kvStateId) { - this.iterator = Preconditions.checkNotNull(iterator); - this.currentKey = iterator.key(); - this.kvStateId = kvStateId; - } - - private final RocksIterator iterator; - private byte[] currentKey; - private final int kvStateId; - - public byte[] getCurrentKey() { - return currentKey; - } - - public void setCurrentKey(byte[] currentKey) { - this.currentKey = currentKey; - } - - public RocksIterator getIterator() { - return iterator; - } - - public int getKvStateId() { - return kvStateId; - } - - @Override - public void close() { - IOUtils.closeQuietly(iterator); - } - } - - /** - * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups. - * The resulting iteration sequence is ordered by (key-group, kv-state). - */ - static final class RocksDBMergeIterator implements AutoCloseable { - - private final PriorityQueue heap; - private final int keyGroupPrefixByteCount; - private boolean newKeyGroup; - private boolean newKVState; - private boolean valid; - - private MergeIterator currentSubIterator; - - private static final List> COMPARATORS; - - static { - int maxBytes = 4; - COMPARATORS = new ArrayList<>(maxBytes); - for (int i = 0; i < maxBytes; ++i) { - final int currentBytes = i; - COMPARATORS.add(new Comparator() { - @Override - public int compare(MergeIterator o1, MergeIterator o2) { - int arrayCmpRes = compareKeyGroupsForByteArrays( - o1.currentKey, o2.currentKey, currentBytes); - return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes; - } - }); - } - } - - RocksDBMergeIterator(List> kvStateIterators, final int keyGroupPrefixByteCount) { - Preconditions.checkNotNull(kvStateIterators); - this.keyGroupPrefixByteCount = keyGroupPrefixByteCount; - - Comparator iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount); - - if (kvStateIterators.size() > 0) { - PriorityQueue iteratorPriorityQueue = - new PriorityQueue<>(kvStateIterators.size(), iteratorComparator); - - for (Tuple2 rocksIteratorWithKVStateId : kvStateIterators) { - final RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0; - rocksIterator.seekToFirst(); - if (rocksIterator.isValid()) { - iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1)); - } else { - IOUtils.closeQuietly(rocksIterator); - } - } - - kvStateIterators.clear(); - - this.heap = iteratorPriorityQueue; - this.valid = !heap.isEmpty(); - this.currentSubIterator = heap.poll(); - } else { - // creating a PriorityQueue of size 0 results in an exception. - this.heap = null; - this.valid = false; - } - - this.newKeyGroup = true; - this.newKVState = true; - } - - /** - * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after - * calls to {@link #next()}. - */ - public void next() { - newKeyGroup = false; - newKVState = false; - - final RocksIterator rocksIterator = currentSubIterator.getIterator(); - rocksIterator.next(); - - byte[] oldKey = currentSubIterator.getCurrentKey(); - if (rocksIterator.isValid()) { - currentSubIterator.currentKey = rocksIterator.key(); - - if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) { - heap.offer(currentSubIterator); - currentSubIterator = heap.poll(); - newKVState = currentSubIterator.getIterator() != rocksIterator; - detectNewKeyGroup(oldKey); - } - } else { - IOUtils.closeQuietly(rocksIterator); - - if (heap.isEmpty()) { - currentSubIterator = null; - valid = false; - } else { - currentSubIterator = heap.poll(); - newKVState = true; - detectNewKeyGroup(oldKey); - } - } - } - - private boolean isDifferentKeyGroup(byte[] a, byte[] b) { - return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount); - } - - private void detectNewKeyGroup(byte[] oldKey) { - if (isDifferentKeyGroup(oldKey, currentSubIterator.currentKey)) { - newKeyGroup = true; - } - } - - /** - * @return key-group for the current key - */ - public int keyGroup() { - int result = 0; - //big endian decode - for (int i = 0; i < keyGroupPrefixByteCount; ++i) { - result <<= 8; - result |= (currentSubIterator.currentKey[i] & 0xFF); - } - return result; - } - - public byte[] key() { - return currentSubIterator.getCurrentKey(); - } - - public byte[] value() { - return currentSubIterator.getIterator().value(); - } - - /** - * @return Id of K/V state to which the current key belongs. - */ - public int kvStateId() { - return currentSubIterator.getKvStateId(); - } - - /** - * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor. - * @return true iff the current key belong to a different k/v-state than it's predecessor. - */ - public boolean isNewKeyValueState() { - return newKVState; - } - - /** - * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor. - * @return true iff the current key belong to a different key-group than it's predecessor. - */ - public boolean isNewKeyGroup() { - return newKeyGroup; - } - - /** - * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as - * {@link #next()} should only be called if valid returned true. Should be checked after each call to - * {@link #next()} before accessing iterator state. - * @return True iff this iterator is valid. - */ - public boolean isValid() { - return valid; - } - - private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) { - for (int i = 0; i < len; ++i) { - int diff = (a[i] & 0xFF) - (b[i] & 0xFF); - if (diff != 0) { - return diff; - } - } - return 0; - } - - @Override - public void close() { - IOUtils.closeQuietly(currentSubIterator); - currentSubIterator = null; - - IOUtils.closeAllQuietly(heap); - heap.clear(); - } - } - - /** - * Only visible for testing, DO NOT USE. - */ - @VisibleForTesting - public File getInstanceBasePath() { - return instanceBasePath; - } - - @Override - public boolean supportsAsynchronousSnapshots() { - return true; - } - - @VisibleForTesting - @SuppressWarnings("unchecked") - @Override - public int numStateEntries() { - int count = 0; - - for (Tuple2> column : kvStateInformation.values()) { - try (RocksIterator rocksIterator = db.newIterator(column.f0)) { - rocksIterator.seekToFirst(); - - while (rocksIterator.isValid()) { - count++; - rocksIterator.next(); - } - } - } - - return count; - } - - private static class RocksIteratorWrapper implements Iterator { - private final RocksIterator iterator; - private final String state; - private final TypeSerializer keySerializer; - private final int keyGroupPrefixBytes; - - public RocksIteratorWrapper( - RocksIterator iterator, - String state, - TypeSerializer keySerializer, - int keyGroupPrefixBytes) { - this.iterator = Preconditions.checkNotNull(iterator); - this.state = Preconditions.checkNotNull(state); - this.keySerializer = Preconditions.checkNotNull(keySerializer); - this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes); - } - - @Override - public boolean hasNext() { - return iterator.isValid(); - } - - @Override - public K next() { - if (!hasNext()) { - throw new NoSuchElementException("Failed to access state [" + state + "]"); - } - try { - byte[] key = iterator.key(); - DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper( - new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes)); - K value = keySerializer.deserialize(dataInput); - iterator.next(); - return value; - } catch (IOException e) { - throw new FlinkRuntimeException("Failed to access state [" + state + "]", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java deleted file mode 100644 index f0481ec..0000000 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.state.internal.InternalListState; -import org.apache.flink.util.Preconditions; - -import org.rocksdb.ColumnFamilyHandle; -import org.rocksdb.RocksDBException; -import org.rocksdb.WriteOptions; - -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -/** - * {@link ListState} implementation that stores state in RocksDB. - * - *

{@link RocksDBStateBackend} must ensure that we set the - * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since - * we use the {@code merge()} call. - * - * @param The type of the key. - * @param The type of the namespace. - * @param The type of the values in the list state. - */ -public class RocksDBListState - extends AbstractRocksDBState, ListStateDescriptor, List> - implements InternalListState { - - /** Serializer for the values. */ - private final TypeSerializer valueSerializer; - - /** - * We disable writes to the write-ahead-log here. We can't have these in the base class - * because JNI segfaults for some reason if they are. - */ - private final WriteOptions writeOptions; - - /** - * Separator of StringAppendTestOperator in RocksDB. - */ - private static final byte DELIMITER = ','; - - /** - * Creates a new {@code RocksDBListState}. - * - * @param namespaceSerializer The serializer for the namespace. - * @param stateDesc The state identifier for the state. This contains name - * and can create a default state value. - */ - public RocksDBListState(ColumnFamilyHandle columnFamily, - TypeSerializer namespaceSerializer, - ListStateDescriptor stateDesc, - RocksDBKeyedStateBackend backend) { - - super(columnFamily, namespaceSerializer, stateDesc, backend); - this.valueSerializer = stateDesc.getElementSerializer(); - - writeOptions = new WriteOptions(); - writeOptions.setDisableWAL(true); - } - - @Override - public Iterable get() { - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - byte[] valueBytes = backend.db.get(columnFamily, key); - - if (valueBytes == null) { - return null; - } - - ByteArrayInputStream bais = new ByteArrayInputStream(valueBytes); - DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); - - List result = new ArrayList<>(); - while (in.available() > 0) { - result.add(valueSerializer.deserialize(in)); - if (in.available() > 0) { - in.readByte(); - } - } - return result; - } catch (IOException | RocksDBException e) { - throw new RuntimeException("Error while retrieving data from RocksDB", e); - } - } - - @Override - public void add(V value) throws IOException { - Preconditions.checkNotNull(value, "You cannot add null to a ListState."); - - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - keySerializationStream.reset(); - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - valueSerializer.serialize(value, out); - backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray()); - } catch (Exception e) { - throw new RuntimeException("Error while adding data to RocksDB", e); - } - } - - @Override - public void mergeNamespaces(N target, Collection sources) throws Exception { - if (sources == null || sources.isEmpty()) { - return; - } - - // cache key and namespace - final K key = backend.getCurrentKey(); - final int keyGroup = backend.getCurrentKeyGroupIndex(); - - try { - // create the target full-binary-key - writeKeyWithGroupAndNamespace( - keyGroup, key, target, - keySerializationStream, keySerializationDataOutputView); - final byte[] targetKey = keySerializationStream.toByteArray(); - - // merge the sources to the target - for (N source : sources) { - if (source != null) { - writeKeyWithGroupAndNamespace( - keyGroup, key, source, - keySerializationStream, keySerializationDataOutputView); - - byte[] sourceKey = keySerializationStream.toByteArray(); - byte[] valueBytes = backend.db.get(columnFamily, sourceKey); - backend.db.delete(columnFamily, sourceKey); - - if (valueBytes != null) { - backend.db.merge(columnFamily, writeOptions, targetKey, valueBytes); - } - } - } - } - catch (Exception e) { - throw new Exception("Error while merging state in RocksDB", e); - } - } - - @Override - public void update(List values) throws Exception { - Preconditions.checkNotNull(values, "List of values to add cannot be null."); - - clear(); - - if (!values.isEmpty()) { - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - - byte[] premerge = getPreMergedValue(values); - if (premerge != null) { - backend.db.put(columnFamily, writeOptions, key, premerge); - } else { - throw new IOException("Failed pre-merge values in update()"); - } - } catch (IOException | RocksDBException e) { - throw new RuntimeException("Error while updating data to RocksDB", e); - } - } - } - - @Override - public void addAll(List values) throws Exception { - Preconditions.checkNotNull(values, "List of values to add cannot be null."); - - if (!values.isEmpty()) { - try { - writeCurrentKeyWithGroupAndNamespace(); - byte[] key = keySerializationStream.toByteArray(); - - byte[] premerge = getPreMergedValue(values); - if (premerge != null) { - backend.db.merge(columnFamily, writeOptions, key, premerge); - } else { - throw new IOException("Failed pre-merge values in addAll()"); - } - } catch (IOException | RocksDBException e) { - throw new RuntimeException("Error while updating data to RocksDB", e); - } - } - } - - private byte[] getPreMergedValue(List values) throws IOException { - DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream); - - keySerializationStream.reset(); - boolean first = true; - for (V value : values) { - Preconditions.checkNotNull(value, "You cannot add null to a ListState."); - if (first) { - first = false; - } else { - keySerializationStream.write(DELIMITER); - } - valueSerializer.serialize(value, out); - } - - return keySerializationStream.toByteArray(); - } -}