flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [7/8] flink git commit: [FLINK-8695] [rocksdb] Move flink-statebackend-rocksdb from 'flink-contrib' to 'flink-state-backends'.
Date Mon, 19 Feb 2018 17:39:43 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
deleted file mode 100644
index 5507339..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ /dev/null
@@ -1,2033 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.AggregatingStateDescriptor;
-import org.apache.flink.api.common.state.FoldingStateDescriptor;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.MapStateDescriptor;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.state.StateDescriptor;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.CompatibilityResult;
-import org.apache.flink.api.common.typeutils.CompatibilityUtil;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
-import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.core.fs.CloseableRegistry;
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
-import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
-import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
-import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-import org.apache.flink.runtime.state.CheckpointedStateScope;
-import org.apache.flink.runtime.state.DoneFuture;
-import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.KeyedBackendSerializationProxy;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
-import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
-import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator;
-import org.apache.flink.runtime.state.StateHandleID;
-import org.apache.flink.runtime.state.StateObject;
-import org.apache.flink.runtime.state.StateUtil;
-import org.apache.flink.runtime.state.StreamCompressionDecorator;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
-import org.apache.flink.runtime.state.internal.InternalAggregatingState;
-import org.apache.flink.runtime.state.internal.InternalFoldingState;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.runtime.state.internal.InternalMapState;
-import org.apache.flink.runtime.state.internal.InternalReducingState;
-import org.apache.flink.runtime.state.internal.InternalValueState;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.FlinkRuntimeException;
-import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.ResourceGuard;
-import org.apache.flink.util.StateMigrationException;
-
-import org.rocksdb.Checkpoint;
-import org.rocksdb.ColumnFamilyDescriptor;
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.DBOptions;
-import org.rocksdb.ReadOptions;
-import org.rocksdb.RocksDB;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.RocksIterator;
-import org.rocksdb.Snapshot;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Objects;
-import java.util.PriorityQueue;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.RunnableFuture;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-/**
- * An {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and serializes state to
- * streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
- * checkpointing. This state backend can store very large state that exceeds memory and spills
- * to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe.
- *
- * <p>This class follows the rules for closing/releasing native RocksDB resources as described in
- + <a href="https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families">
- * this document</a>.
- */
-public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
-
-	private static final Logger LOG = LoggerFactory.getLogger(RocksDBKeyedStateBackend.class);
-
-	/** The name of the merge operator in RocksDB. Do not change except you know exactly what you do. */
-	public static final String MERGE_OPERATOR_NAME = "stringappendtest";
-
-	/** File suffix of sstable files. */
-	private static final String SST_FILE_SUFFIX = ".sst";
-
-	/** Bytes for the name of the column descriptor for the default column family. */
-	public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = "default".getBytes(ConfigConstants.DEFAULT_CHARSET);
-
-	/** String that identifies the operator that owns this backend. */
-	private final String operatorIdentifier;
-
-	/** The column family options from the options factory. */
-	private final ColumnFamilyOptions columnOptions;
-
-	/** The DB options from the options factory. */
-	private final DBOptions dbOptions;
-
-	/** Path where this configured instance stores its data directory. */
-	private final File instanceBasePath;
-
-	/** Path where this configured instance stores its RocksDB database. */
-	private final File instanceRocksDBPath;
-
-	/**
-	 * Protects access to RocksDB in other threads, like the checkpointing thread from parallel call that disposes the
-	 * RocksDb object.
-	 */
-	private final ResourceGuard rocksDBResourceGuard;
-
-	/**
-	 * Our RocksDB database, this is used by the actual subclasses of {@link AbstractRocksDBState}
-	 * to store state. The different k/v states that we have don't each have their own RocksDB
-	 * instance. They all write to this instance but to their own column family.
-	 */
-	protected RocksDB db;
-
-	/**
-	 * We are not using the default column family for Flink state ops, but we still need to remember this handle so that
-	 * we can close it properly when the backend is closed. This is required by RocksDB's native memory management.
-	 */
-	private ColumnFamilyHandle defaultColumnFamily;
-
-	/**
-	 * Information about the k/v states as we create them. This is used to retrieve the
-	 * column family that is used for a state and also for sanity checks when restoring.
-	 */
-	private final Map<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformation;
-
-	/**
-	 * Map of state names to their corresponding restored state meta info.
-	 *
-	 * <p>TODO this map can be removed when eager-state registration is in place.
-	 * TODO we currently need this cached to check state migration strategies when new serializers are registered.
-	 */
-	private final Map<String, RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredKvStateMetaInfos;
-
-	/** Number of bytes required to prefix the key groups. */
-	private final int keyGroupPrefixBytes;
-
-	/** True if incremental checkpointing is enabled. */
-	private final boolean enableIncrementalCheckpointing;
-
-	/** The state handle ids of all sst files materialized in snapshots for previous checkpoints. */
-	private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
-
-	/** The identifier of the last completed checkpoint. */
-	private long lastCompletedCheckpointId = -1L;
-
-	/** Unique ID of this backend. */
-	private UUID backendUID;
-
-	public RocksDBKeyedStateBackend(
-		String operatorIdentifier,
-		ClassLoader userCodeClassLoader,
-		File instanceBasePath,
-		DBOptions dbOptions,
-		ColumnFamilyOptions columnFamilyOptions,
-		TaskKvStateRegistry kvStateRegistry,
-		TypeSerializer<K> keySerializer,
-		int numberOfKeyGroups,
-		KeyGroupRange keyGroupRange,
-		ExecutionConfig executionConfig,
-		boolean enableIncrementalCheckpointing
-	) throws IOException {
-
-		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
-
-		this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);
-
-		this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
-		this.rocksDBResourceGuard = new ResourceGuard();
-
-		// ensure that we use the right merge operator, because other code relies on this
-		this.columnOptions = Preconditions.checkNotNull(columnFamilyOptions)
-			.setMergeOperatorName(MERGE_OPERATOR_NAME);
-
-		this.dbOptions = Preconditions.checkNotNull(dbOptions);
-
-		this.instanceBasePath = Preconditions.checkNotNull(instanceBasePath);
-		this.instanceRocksDBPath = new File(instanceBasePath, "db");
-
-		if (instanceBasePath.exists()) {
-			// Clear the base directory when the backend is created
-			// in case something crashed and the backend never reached dispose()
-			cleanInstanceBasePath();
-		}
-
-		if (!instanceBasePath.mkdirs()) {
-			throw new IOException(
-					String.format("Could not create RocksDB data directory at %s.", instanceBasePath.getAbsolutePath()));
-		}
-
-		this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1;
-		this.kvStateInformation = new HashMap<>();
-		this.restoredKvStateMetaInfos = new HashMap<>();
-		this.materializedSstFiles = new TreeMap<>();
-		this.backendUID = UUID.randomUUID();
-		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
-	}
-
-	@Override
-	public <N> Stream<K> getKeys(String state, N namespace) {
-		Tuple2<ColumnFamilyHandle, ?> columnInfo = kvStateInformation.get(state);
-		if (columnInfo == null) {
-			return Stream.empty();
-		}
-
-		RocksIterator iterator = db.newIterator(columnInfo.f0);
-		iterator.seekToFirst();
-
-		Iterable<K> iterable = () -> new RocksIteratorWrapper<>(iterator, state, keySerializer, keyGroupPrefixBytes);
-		Stream<K> targetStream = StreamSupport.stream(iterable.spliterator(), false);
-		return targetStream.onClose(iterator::close);
-	}
-
-	/**
-	 * Should only be called by one thread, and only after all accesses to the DB happened.
-	 */
-	@Override
-	public void dispose() {
-		super.dispose();
-
-		// This call will block until all clients that still acquire access to the RocksDB instance have released it,
-		// so that we cannot release the native resources while clients are still working with it in parallel.
-		rocksDBResourceGuard.close();
-
-		// IMPORTANT: null reference to signal potential async checkpoint workers that the db was disposed, as
-		// working on the disposed object results in SEGFAULTS.
-		if (db != null) {
-
-			// RocksDB's native memory management requires that *all* CFs (including default) are closed before the
-			// DB is closed. So we start with the ones created by Flink...
-			for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
-				kvStateInformation.values()) {
-				IOUtils.closeQuietly(columnMetaData.f0);
-			}
-
-			// ... close the default CF ...
-			IOUtils.closeQuietly(defaultColumnFamily);
-
-			// ... and finally close the DB instance ...
-			IOUtils.closeQuietly(db);
-
-			// invalidate the reference before releasing the lock so that other accesses will not cause crashes
-			db = null;
-		}
-
-		kvStateInformation.clear();
-		restoredKvStateMetaInfos.clear();
-
-		IOUtils.closeQuietly(dbOptions);
-		IOUtils.closeQuietly(columnOptions);
-
-		cleanInstanceBasePath();
-	}
-
-	private void cleanInstanceBasePath() {
-		LOG.info("Deleting existing instance base directory {}.", instanceBasePath);
-
-		try {
-			FileUtils.deleteDirectory(instanceBasePath);
-		} catch (IOException ex) {
-			LOG.warn("Could not delete instance base path for RocksDB: " + instanceBasePath, ex);
-		}
-	}
-
-	public int getKeyGroupPrefixBytes() {
-		return keyGroupPrefixBytes;
-	}
-
-	/**
-	 * Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
-	 * is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
-	 * be called by the same thread.
-	 *
-	 * @param checkpointId  The Id of the checkpoint.
-	 * @param timestamp     The timestamp of the checkpoint.
-	 * @param streamFactory The factory that we can use for writing our state to streams.
-	 * @param checkpointOptions Options for how to perform this checkpoint.
-	 * @return Future to the state handle of the snapshot data.
-	 * @throws Exception
-	 */
-	@Override
-	public RunnableFuture<KeyedStateHandle> snapshot(
-		final long checkpointId,
-		final long timestamp,
-		final CheckpointStreamFactory streamFactory,
-		CheckpointOptions checkpointOptions) throws Exception {
-
-		if (checkpointOptions.getCheckpointType() != CheckpointType.SAVEPOINT &&
-			enableIncrementalCheckpointing) {
-			return snapshotIncrementally(checkpointId, timestamp, streamFactory);
-		} else {
-			return snapshotFully(checkpointId, timestamp, streamFactory);
-		}
-	}
-
-	private RunnableFuture<KeyedStateHandle> snapshotIncrementally(
-		final long checkpointId,
-		final long checkpointTimestamp,
-		final CheckpointStreamFactory checkpointStreamFactory) throws Exception {
-
-		if (db == null) {
-			throw new IOException("RocksDB closed.");
-		}
-
-		if (kvStateInformation.isEmpty()) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
-						checkpointTimestamp);
-			}
-			return DoneFuture.nullValue();
-		}
-
-		final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
-			new RocksDBIncrementalSnapshotOperation<>(
-				this,
-				checkpointStreamFactory,
-				checkpointId,
-				checkpointTimestamp);
-
-		try {
-			snapshotOperation.takeSnapshot();
-		} catch (Exception e) {
-			snapshotOperation.stop();
-			snapshotOperation.releaseResources(true);
-			throw e;
-		}
-
-		return new FutureTask<KeyedStateHandle>(
-			new Callable<KeyedStateHandle>() {
-				@Override
-				public KeyedStateHandle call() throws Exception {
-					return snapshotOperation.materializeSnapshot();
-				}
-			}
-		) {
-			@Override
-			public boolean cancel(boolean mayInterruptIfRunning) {
-				snapshotOperation.stop();
-				return super.cancel(mayInterruptIfRunning);
-			}
-
-			@Override
-			protected void done() {
-				snapshotOperation.releaseResources(isCancelled());
-			}
-		};
-	}
-
-	private RunnableFuture<KeyedStateHandle> snapshotFully(
-		final long checkpointId,
-		final long timestamp,
-		final CheckpointStreamFactory streamFactory) throws Exception {
-
-		long startTime = System.currentTimeMillis();
-		final CloseableRegistry snapshotCloseableRegistry = new CloseableRegistry();
-
-		final RocksDBFullSnapshotOperation<K> snapshotOperation;
-
-		if (kvStateInformation.isEmpty()) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", timestamp);
-			}
-
-			return DoneFuture.nullValue();
-		}
-
-		snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry);
-		snapshotOperation.takeDBSnapShot(checkpointId, timestamp);
-
-		// implementation of the async IO operation, based on FutureTask
-		AbstractAsyncCallableWithResources<KeyedStateHandle> ioCallable =
-			new AbstractAsyncCallableWithResources<KeyedStateHandle>() {
-
-				@Override
-				protected void acquireResources() throws Exception {
-					cancelStreamRegistry.registerCloseable(snapshotCloseableRegistry);
-					snapshotOperation.openCheckpointStream();
-				}
-
-				@Override
-				protected void releaseResources() throws Exception {
-					closeLocalRegistry();
-					releaseSnapshotOperationResources();
-				}
-
-				private void releaseSnapshotOperationResources() {
-					// hold the db lock while operation on the db to guard us against async db disposal
-					snapshotOperation.releaseSnapshotResources();
-				}
-
-				@Override
-				protected void stopOperation() throws Exception {
-					closeLocalRegistry();
-				}
-
-				private void closeLocalRegistry() {
-					if (cancelStreamRegistry.unregisterCloseable(snapshotCloseableRegistry)) {
-						try {
-							snapshotCloseableRegistry.close();
-						} catch (Exception ex) {
-							LOG.warn("Error closing local registry", ex);
-						}
-					}
-				}
-
-				@Override
-				public KeyGroupsStateHandle performOperation() throws Exception {
-					long startTime = System.currentTimeMillis();
-
-					if (isStopped()) {
-						throw new IOException("RocksDB closed.");
-					}
-
-					snapshotOperation.writeDBSnapshot();
-
-					LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
-						streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
-
-					return snapshotOperation.getSnapshotResultStateHandle();
-				}
-			};
-
-		LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.",
-				streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
-
-		return AsyncStoppableTaskWithCallback.from(ioCallable);
-	}
-
-	/**
-	 * Encapsulates the process to perform a snapshot of a RocksDBKeyedStateBackend.
-	 */
-	static final class RocksDBFullSnapshotOperation<K> {
-
-		static final int FIRST_BIT_IN_BYTE_MASK = 0x80;
-		static final int END_OF_KEY_GROUP_MARK = 0xFFFF;
-
-		private final RocksDBKeyedStateBackend<K> stateBackend;
-		private final KeyGroupRangeOffsets keyGroupRangeOffsets;
-		private final CheckpointStreamFactory checkpointStreamFactory;
-		private final CloseableRegistry snapshotCloseableRegistry;
-		private final ResourceGuard.Lease dbLease;
-
-		private long checkpointId;
-		private long checkpointTimeStamp;
-
-		private Snapshot snapshot;
-		private ReadOptions readOptions;
-		private List<Tuple2<RocksIterator, Integer>> kvStateIterators;
-
-		private CheckpointStreamFactory.CheckpointStateOutputStream outStream;
-		private DataOutputView outputView;
-
-		RocksDBFullSnapshotOperation(
-			RocksDBKeyedStateBackend<K> stateBackend,
-			CheckpointStreamFactory checkpointStreamFactory,
-			CloseableRegistry registry) throws IOException {
-
-			this.stateBackend = stateBackend;
-			this.checkpointStreamFactory = checkpointStreamFactory;
-			this.keyGroupRangeOffsets = new KeyGroupRangeOffsets(stateBackend.keyGroupRange);
-			this.snapshotCloseableRegistry = registry;
-			this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource();
-		}
-
-		/**
-		 * 1) Create a snapshot object from RocksDB.
-		 *
-		 * @param checkpointId id of the checkpoint for which we take the snapshot
-		 * @param checkpointTimeStamp timestamp of the checkpoint for which we take the snapshot
-		 */
-		public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) {
-			Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
-			this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size());
-			this.checkpointId = checkpointId;
-			this.checkpointTimeStamp = checkpointTimeStamp;
-			this.snapshot = stateBackend.db.getSnapshot();
-		}
-
-		/**
-		 * 2) Open CheckpointStateOutputStream through the checkpointStreamFactory into which we will write.
-		 *
-		 * @throws Exception
-		 */
-		public void openCheckpointStream() throws Exception {
-			Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set.");
-			outStream = checkpointStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
-			snapshotCloseableRegistry.registerCloseable(outStream);
-			outputView = new DataOutputViewStreamWrapper(outStream);
-		}
-
-		/**
-		 * 3) Write the actual data from RocksDB from the time we took the snapshot object in (1).
-		 *
-		 * @throws IOException
-		 */
-		public void writeDBSnapshot() throws IOException, InterruptedException {
-
-			if (null == snapshot) {
-				throw new IOException("No snapshot available. Might be released due to cancellation.");
-			}
-
-			Preconditions.checkNotNull(outStream, "No output stream to write snapshot.");
-			writeKVStateMetaData();
-			writeKVStateData();
-		}
-
-		/**
-		 * 4) Returns a state handle to the snapshot after the snapshot procedure is completed and null before.
-		 *
-		 * @return state handle to the completed snapshot
-		 */
-		public KeyGroupsStateHandle getSnapshotResultStateHandle() throws IOException {
-
-			if (snapshotCloseableRegistry.unregisterCloseable(outStream)) {
-
-				StreamStateHandle stateHandle = outStream.closeAndGetHandle();
-				outStream = null;
-
-				if (stateHandle != null) {
-					return new KeyGroupsStateHandle(keyGroupRangeOffsets, stateHandle);
-				}
-			}
-			return null;
-		}
-
-		/**
-		 * 5) Release the snapshot object for RocksDB and clean up.
-		 */
-		public void releaseSnapshotResources() {
-
-			outStream = null;
-
-			if (null != kvStateIterators) {
-				for (Tuple2<RocksIterator, Integer> kvStateIterator : kvStateIterators) {
-					IOUtils.closeQuietly(kvStateIterator.f0);
-				}
-				kvStateIterators = null;
-			}
-
-			if (null != snapshot) {
-				if (null != stateBackend.db) {
-					stateBackend.db.releaseSnapshot(snapshot);
-				}
-				IOUtils.closeQuietly(snapshot);
-				snapshot = null;
-			}
-
-			if (null != readOptions) {
-				IOUtils.closeQuietly(readOptions);
-				readOptions = null;
-			}
-
-			this.dbLease.close();
-		}
-
-		private void writeKVStateMetaData() throws IOException {
-
-			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots =
-				new ArrayList<>(stateBackend.kvStateInformation.size());
-
-			int kvStateId = 0;
-			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
-				stateBackend.kvStateInformation.entrySet()) {
-
-				metaInfoSnapshots.add(column.getValue().f1.snapshot());
-
-				//retrieve iterator for this k/v states
-				readOptions = new ReadOptions();
-				readOptions.setSnapshot(snapshot);
-
-				kvStateIterators.add(
-					new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId));
-
-				++kvStateId;
-			}
-
-			KeyedBackendSerializationProxy<K> serializationProxy =
-				new KeyedBackendSerializationProxy<>(
-					stateBackend.getKeySerializer(),
-					metaInfoSnapshots,
-					!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, stateBackend.keyGroupCompressionDecorator));
-
-			serializationProxy.write(outputView);
-		}
-
-		private void writeKVStateData() throws IOException, InterruptedException {
-
-			byte[] previousKey = null;
-			byte[] previousValue = null;
-			OutputStream kgOutStream = null;
-			DataOutputView kgOutView = null;
-
-			try {
-				// Here we transfer ownership of RocksIterators to the RocksDBMergeIterator
-				try (RocksDBMergeIterator mergeIterator = new RocksDBMergeIterator(
-					kvStateIterators, stateBackend.keyGroupPrefixBytes)) {
-
-					// handover complete, null out to prevent double close
-					kvStateIterators = null;
-
-					//preamble: setup with first key-group as our lookahead
-					if (mergeIterator.isValid()) {
-						//begin first key-group by recording the offset
-						keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
-						//write the k/v-state id as metadata
-						kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
-						kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
-						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-						kgOutView.writeShort(mergeIterator.kvStateId());
-						previousKey = mergeIterator.key();
-						previousValue = mergeIterator.value();
-						mergeIterator.next();
-					}
-
-					//main loop: write k/v pairs ordered by (key-group, kv-state), thereby tracking key-group offsets.
-					while (mergeIterator.isValid()) {
-
-						assert (!hasMetaDataFollowsFlag(previousKey));
-
-						//set signal in first key byte that meta data will follow in the stream after this k/v pair
-						if (mergeIterator.isNewKeyGroup() || mergeIterator.isNewKeyValueState()) {
-
-							//be cooperative and check for interruption from time to time in the hot loop
-							checkInterrupted();
-
-							setMetaDataFollowsFlagInKey(previousKey);
-						}
-
-						writeKeyValuePair(previousKey, previousValue, kgOutView);
-
-						//write meta data if we have to
-						if (mergeIterator.isNewKeyGroup()) {
-							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-							kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
-							// this will just close the outer stream
-							kgOutStream.close();
-							//begin new key-group
-							keyGroupRangeOffsets.setKeyGroupOffset(mergeIterator.keyGroup(), outStream.getPos());
-							//write the kev-state
-							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-							kgOutStream = stateBackend.keyGroupCompressionDecorator.decorateWithCompression(outStream);
-							kgOutView = new DataOutputViewStreamWrapper(kgOutStream);
-							kgOutView.writeShort(mergeIterator.kvStateId());
-						} else if (mergeIterator.isNewKeyValueState()) {
-							//write the k/v-state
-							//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-							kgOutView.writeShort(mergeIterator.kvStateId());
-						}
-
-						//request next k/v pair
-						previousKey = mergeIterator.key();
-						previousValue = mergeIterator.value();
-						mergeIterator.next();
-					}
-				}
-
-				//epilogue: write last key-group
-				if (previousKey != null) {
-					assert (!hasMetaDataFollowsFlag(previousKey));
-					setMetaDataFollowsFlagInKey(previousKey);
-					writeKeyValuePair(previousKey, previousValue, kgOutView);
-					//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-					kgOutView.writeShort(END_OF_KEY_GROUP_MARK);
-					// this will just close the outer stream
-					kgOutStream.close();
-					kgOutStream = null;
-				}
-
-			} finally {
-				// this will just close the outer stream
-				IOUtils.closeQuietly(kgOutStream);
-			}
-		}
-
-		private void writeKeyValuePair(byte[] key, byte[] value, DataOutputView out) throws IOException {
-			BytePrimitiveArraySerializer.INSTANCE.serialize(key, out);
-			BytePrimitiveArraySerializer.INSTANCE.serialize(value, out);
-		}
-
-		static void setMetaDataFollowsFlagInKey(byte[] key) {
-			key[0] |= FIRST_BIT_IN_BYTE_MASK;
-		}
-
-		static void clearMetaDataFollowsFlag(byte[] key) {
-			key[0] &= (~RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
-		}
-
-		static boolean hasMetaDataFollowsFlag(byte[] key) {
-			return 0 != (key[0] & RocksDBFullSnapshotOperation.FIRST_BIT_IN_BYTE_MASK);
-		}
-
-		private static void checkInterrupted() throws InterruptedException {
-			if (Thread.currentThread().isInterrupted()) {
-				throw new InterruptedException("RocksDB snapshot interrupted.");
-			}
-		}
-	}
-
-	private static final class RocksDBIncrementalSnapshotOperation<K> {
-
-		/** The backend which we snapshot. */
-		private final RocksDBKeyedStateBackend<K> stateBackend;
-
-		/** Stream factory that creates the outpus streams to DFS. */
-		private final CheckpointStreamFactory checkpointStreamFactory;
-
-		/** Id for the current checkpoint. */
-		private final long checkpointId;
-
-		/** Timestamp for the current checkpoint. */
-		private final long checkpointTimestamp;
-
-		/** All sst files that were part of the last previously completed checkpoint. */
-		private Set<StateHandleID> baseSstFiles;
-
-		/** The state meta data. */
-		private final List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots = new ArrayList<>();
-
-		private FileSystem backupFileSystem;
-		private Path backupPath;
-
-		// Registry for all opened i/o streams
-		private final CloseableRegistry closeableRegistry = new CloseableRegistry();
-
-		// new sst files since the last completed checkpoint
-		private final Map<StateHandleID, StreamStateHandle> sstFiles = new HashMap<>();
-
-		// handles to the misc files in the current snapshot
-		private final Map<StateHandleID, StreamStateHandle> miscFiles = new HashMap<>();
-
-		// This lease protects from concurrent disposal of the native rocksdb instance.
-		private final ResourceGuard.Lease dbLease;
-
-		private StreamStateHandle metaStateHandle = null;
-
-		private RocksDBIncrementalSnapshotOperation(
-			RocksDBKeyedStateBackend<K> stateBackend,
-			CheckpointStreamFactory checkpointStreamFactory,
-			long checkpointId,
-			long checkpointTimestamp) throws IOException {
-
-			this.stateBackend = stateBackend;
-			this.checkpointStreamFactory = checkpointStreamFactory;
-			this.checkpointId = checkpointId;
-			this.checkpointTimestamp = checkpointTimestamp;
-			this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource();
-		}
-
-		private StreamStateHandle materializeStateData(Path filePath) throws Exception {
-			FSDataInputStream inputStream = null;
-			CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
-
-			try {
-				final byte[] buffer = new byte[8 * 1024];
-
-				FileSystem backupFileSystem = backupPath.getFileSystem();
-				inputStream = backupFileSystem.open(filePath);
-				closeableRegistry.registerCloseable(inputStream);
-
-				outputStream = checkpointStreamFactory
-					.createCheckpointStateOutputStream(CheckpointedStateScope.SHARED);
-				closeableRegistry.registerCloseable(outputStream);
-
-				while (true) {
-					int numBytes = inputStream.read(buffer);
-
-					if (numBytes == -1) {
-						break;
-					}
-
-					outputStream.write(buffer, 0, numBytes);
-				}
-
-				StreamStateHandle result = null;
-				if (closeableRegistry.unregisterCloseable(outputStream)) {
-					result = outputStream.closeAndGetHandle();
-					outputStream = null;
-				}
-				return result;
-
-			} finally {
-				if (inputStream != null && closeableRegistry.unregisterCloseable(inputStream)) {
-					inputStream.close();
-				}
-
-				if (outputStream != null && closeableRegistry.unregisterCloseable(outputStream)) {
-					outputStream.close();
-				}
-			}
-		}
-
-		private StreamStateHandle materializeMetaData() throws Exception {
-			CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;
-
-			try {
-				outputStream = checkpointStreamFactory
-					.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
-				closeableRegistry.registerCloseable(outputStream);
-
-				//no need for compression scheme support because sst-files are already compressed
-				KeyedBackendSerializationProxy<K> serializationProxy =
-					new KeyedBackendSerializationProxy<>(
-						stateBackend.keySerializer,
-						stateMetaInfoSnapshots,
-						false);
-
-				DataOutputView out = new DataOutputViewStreamWrapper(outputStream);
-
-				serializationProxy.write(out);
-
-				StreamStateHandle result = null;
-				if (closeableRegistry.unregisterCloseable(outputStream)) {
-					result = outputStream.closeAndGetHandle();
-					outputStream = null;
-				}
-				return result;
-			} finally {
-				if (outputStream != null) {
-					if (closeableRegistry.unregisterCloseable(outputStream)) {
-						outputStream.close();
-					}
-				}
-			}
-		}
-
-		void takeSnapshot() throws Exception {
-
-			final long lastCompletedCheckpoint;
-
-			// use the last completed checkpoint as the comparison base.
-			synchronized (stateBackend.materializedSstFiles) {
-				lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId;
-				baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
-			}
-
-			LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
-				"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
-
-			// save meta data
-			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
-				: stateBackend.kvStateInformation.entrySet()) {
-				stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
-			}
-
-			// save state data
-			backupPath = new Path(stateBackend.instanceBasePath.getAbsolutePath(), "chk-" + checkpointId);
-
-			LOG.trace("Local RocksDB checkpoint goes to backup path {}.", backupPath);
-
-			backupFileSystem = backupPath.getFileSystem();
-			if (backupFileSystem.exists(backupPath)) {
-				throw new IllegalStateException("Unexpected existence of the backup directory.");
-			}
-
-			// create hard links of living files in the checkpoint path
-			Checkpoint checkpoint = Checkpoint.create(stateBackend.db);
-			checkpoint.createCheckpoint(backupPath.getPath());
-		}
-
-		KeyedStateHandle materializeSnapshot() throws Exception {
-
-			stateBackend.cancelStreamRegistry.registerCloseable(closeableRegistry);
-
-			// write meta data
-			metaStateHandle = materializeMetaData();
-
-			// write state data
-			Preconditions.checkState(backupFileSystem.exists(backupPath));
-
-			FileStatus[] fileStatuses = backupFileSystem.listStatus(backupPath);
-			if (fileStatuses != null) {
-				for (FileStatus fileStatus : fileStatuses) {
-					final Path filePath = fileStatus.getPath();
-					final String fileName = filePath.getName();
-					final StateHandleID stateHandleID = new StateHandleID(fileName);
-
-					if (fileName.endsWith(SST_FILE_SUFFIX)) {
-						final boolean existsAlready =
-							baseSstFiles != null && baseSstFiles.contains(stateHandleID);
-
-						if (existsAlready) {
-							// we introduce a placeholder state handle, that is replaced with the
-							// original from the shared state registry (created from a previous checkpoint)
-							sstFiles.put(
-								stateHandleID,
-								new PlaceholderStreamStateHandle());
-						} else {
-							sstFiles.put(stateHandleID, materializeStateData(filePath));
-						}
-					} else {
-						StreamStateHandle fileHandle = materializeStateData(filePath);
-						miscFiles.put(stateHandleID, fileHandle);
-					}
-				}
-			}
-
-			synchronized (stateBackend.materializedSstFiles) {
-				stateBackend.materializedSstFiles.put(checkpointId, sstFiles.keySet());
-			}
-
-			return new IncrementalKeyedStateHandle(
-				stateBackend.backendUID,
-				stateBackend.keyGroupRange,
-				checkpointId,
-				sstFiles,
-				miscFiles,
-				metaStateHandle);
-		}
-
-		void stop() {
-
-			if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
-				try {
-					closeableRegistry.close();
-				} catch (IOException e) {
-					LOG.warn("Could not properly close io streams.", e);
-				}
-			}
-		}
-
-		void releaseResources(boolean canceled) {
-
-			dbLease.close();
-
-			if (stateBackend.cancelStreamRegistry.unregisterCloseable(closeableRegistry)) {
-				try {
-					closeableRegistry.close();
-				} catch (IOException e) {
-					LOG.warn("Exception on closing registry.", e);
-				}
-			}
-
-			if (backupPath != null) {
-				try {
-					if (backupFileSystem.exists(backupPath)) {
-
-						LOG.trace("Deleting local RocksDB backup path {}.", backupPath);
-						backupFileSystem.delete(backupPath, true);
-					}
-				} catch (Exception e) {
-					LOG.warn("Could not properly delete the checkpoint directory.", e);
-				}
-			}
-
-			if (canceled) {
-				Collection<StateObject> statesToDiscard =
-					new ArrayList<>(1 + miscFiles.size() + sstFiles.size());
-
-				statesToDiscard.add(metaStateHandle);
-				statesToDiscard.addAll(miscFiles.values());
-				statesToDiscard.addAll(sstFiles.values());
-
-				try {
-					StateUtil.bestEffortDiscardAllStateObjects(statesToDiscard);
-				} catch (Exception e) {
-					LOG.warn("Could not properly discard states.", e);
-				}
-			}
-		}
-	}
-
-	@Override
-	public void restore(Collection<KeyedStateHandle> restoreState) throws Exception {
-		LOG.info("Initializing RocksDB keyed state backend from snapshot.");
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Restoring snapshot from state handles: {}.", restoreState);
-		}
-
-		// clear all meta data
-		kvStateInformation.clear();
-		restoredKvStateMetaInfos.clear();
-
-		try {
-			if (restoreState == null || restoreState.isEmpty()) {
-				createDB();
-			} else if (restoreState.iterator().next() instanceof IncrementalKeyedStateHandle) {
-				RocksDBIncrementalRestoreOperation<K> restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
-				restoreOperation.restore(restoreState);
-			} else {
-				RocksDBFullRestoreOperation<K> restoreOperation = new RocksDBFullRestoreOperation<>(this);
-				restoreOperation.doRestore(restoreState);
-			}
-		} catch (Exception ex) {
-			dispose();
-			throw ex;
-		}
-	}
-
-	@Override
-	public void notifyCheckpointComplete(long completedCheckpointId) {
-		synchronized (materializedSstFiles) {
-			if (completedCheckpointId < lastCompletedCheckpointId) {
-				return;
-			}
-
-			materializedSstFiles.keySet().removeIf(checkpointId -> checkpointId < completedCheckpointId);
-
-			lastCompletedCheckpointId = completedCheckpointId;
-		}
-	}
-
-	private void createDB() throws IOException {
-		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
-		this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
-		this.defaultColumnFamily = columnFamilyHandles.get(0);
-	}
-
-	private RocksDB openDB(
-		String path,
-		List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
-		List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException {
-
-		List<ColumnFamilyDescriptor> columnFamilyDescriptors =
-			new ArrayList<>(1 + stateColumnFamilyDescriptors.size());
-
-		columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
-
-		// we add the required descriptor for the default CF in last position.
-		columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
-
-		RocksDB dbRef;
-
-		try {
-			dbRef = RocksDB.open(
-				Preconditions.checkNotNull(dbOptions),
-				Preconditions.checkNotNull(path),
-				columnFamilyDescriptors,
-				stateColumnFamilyHandles);
-		} catch (RocksDBException e) {
-			throw new IOException("Error while opening RocksDB instance.", e);
-		}
-
-		// requested + default CF
-		Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
-			"Not all requested column family handles have been created");
-
-		return dbRef;
-	}
-
-	/**
-	 * Encapsulates the process of restoring a RocksDBKeyedStateBackend from a snapshot.
-	 */
-	static final class RocksDBFullRestoreOperation<K> {
-
-		private final RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend;
-
-		/** Current key-groups state handle from which we restore key-groups. */
-		private KeyGroupsStateHandle currentKeyGroupsStateHandle;
-		/** Current input stream we obtained from currentKeyGroupsStateHandle. */
-		private FSDataInputStream currentStateHandleInStream;
-		/** Current data input view that wraps currentStateHandleInStream. */
-		private DataInputView currentStateHandleInView;
-		/** Current list of ColumnFamilyHandles for all column families we restore from currentKeyGroupsStateHandle. */
-		private List<ColumnFamilyHandle> currentStateHandleKVStateColumnFamilies;
-		/** The compression decorator that was used for writing the state, as determined by the meta data. */
-		private StreamCompressionDecorator keygroupStreamCompressionDecorator;
-
-		/**
-		 * Creates a restore operation object for the given state backend instance.
-		 *
-		 * @param rocksDBKeyedStateBackend the state backend into which we restore
-		 */
-		public RocksDBFullRestoreOperation(RocksDBKeyedStateBackend<K> rocksDBKeyedStateBackend) {
-			this.rocksDBKeyedStateBackend = Preconditions.checkNotNull(rocksDBKeyedStateBackend);
-		}
-
-		/**
-		 * Restores all key-groups data that is referenced by the passed state handles.
-		 *
-		 * @param keyedStateHandles List of all key groups state handles that shall be restored.
-		 */
-		public void doRestore(Collection<KeyedStateHandle> keyedStateHandles)
-			throws IOException, StateMigrationException, RocksDBException {
-
-			rocksDBKeyedStateBackend.createDB();
-
-			for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
-				if (keyedStateHandle != null) {
-
-					if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
-						throw new IllegalStateException("Unexpected state handle type, " +
-							"expected: " + KeyGroupsStateHandle.class +
-							", but found: " + keyedStateHandle.getClass());
-					}
-					this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
-					restoreKeyGroupsInStateHandle();
-				}
-			}
-		}
-
-		/**
-		 * Restore one key groups state handle.
-		 */
-		private void restoreKeyGroupsInStateHandle()
-			throws IOException, StateMigrationException, RocksDBException {
-			try {
-				currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
-				rocksDBKeyedStateBackend.cancelStreamRegistry.registerCloseable(currentStateHandleInStream);
-				currentStateHandleInView = new DataInputViewStreamWrapper(currentStateHandleInStream);
-				restoreKVStateMetaData();
-				restoreKVStateData();
-			} finally {
-				if (currentStateHandleInStream != null
-					&& rocksDBKeyedStateBackend.cancelStreamRegistry.unregisterCloseable(currentStateHandleInStream)) {
-					IOUtils.closeQuietly(currentStateHandleInStream);
-				}
-			}
-		}
-
-		/**
-		 * Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle.
-		 *
-		 * @throws IOException
-		 * @throws ClassNotFoundException
-		 * @throws RocksDBException
-		 */
-		private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException {
-
-			KeyedBackendSerializationProxy<K> serializationProxy =
-				new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
-
-			serializationProxy.read(currentStateHandleInView);
-
-			// check for key serializer compatibility; this also reconfigures the
-			// key serializer to be compatible, if it is required and is possible
-			if (CompatibilityUtil.resolveCompatibilityResult(
-				serializationProxy.getKeySerializer(),
-				UnloadableDummyTypeSerializer.class,
-				serializationProxy.getKeySerializerConfigSnapshot(),
-				rocksDBKeyedStateBackend.keySerializer)
-				.isRequiresMigration()) {
-
-				// TODO replace with state migration; note that key hash codes need to remain the same after migration
-				throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
-					"Aborting now since state migration is currently not available");
-			}
-
-			this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ?
-				SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
-
-			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
-				serializationProxy.getStateMetaInfoSnapshots();
-			currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
-			//rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size());
-
-			for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) {
-
-				Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredColumn =
-					rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
-
-				if (registeredColumn == null) {
-					byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
-
-					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
-						nameBytes,
-						rocksDBKeyedStateBackend.columnOptions);
-
-					RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-						new RegisteredKeyedBackendStateMetaInfo<>(
-							restoredMetaInfo.getStateType(),
-							restoredMetaInfo.getName(),
-							restoredMetaInfo.getNamespaceSerializer(),
-							restoredMetaInfo.getStateSerializer());
-
-					rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
-
-					ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
-
-					registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo);
-					rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn);
-
-				} else {
-					// TODO with eager state registration in place, check here for serializer migration strategies
-				}
-				currentStateHandleKVStateColumnFamilies.add(registeredColumn.f0);
-			}
-		}
-
-		/**
-		 * Restore the KV-state / ColumnFamily data for all key-groups referenced by the current state handle.
-		 *
-		 * @throws IOException
-		 * @throws RocksDBException
-		 */
-		private void restoreKVStateData() throws IOException, RocksDBException {
-			//for all key-groups in the current state handle...
-			for (Tuple2<Integer, Long> keyGroupOffset : currentKeyGroupsStateHandle.getGroupRangeOffsets()) {
-				int keyGroup = keyGroupOffset.f0;
-
-				// Check that restored key groups all belong to the backend
-				Preconditions.checkState(rocksDBKeyedStateBackend.getKeyGroupRange().contains(keyGroup),
-					"The key group must belong to the backend");
-
-				long offset = keyGroupOffset.f1;
-				//not empty key-group?
-				if (0L != offset) {
-					currentStateHandleInStream.seek(offset);
-					try (InputStream compressedKgIn = keygroupStreamCompressionDecorator.decorateWithCompression(currentStateHandleInStream)) {
-						DataInputViewStreamWrapper compressedKgInputView = new DataInputViewStreamWrapper(compressedKgIn);
-						//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-						int kvStateId = compressedKgInputView.readShort();
-						ColumnFamilyHandle handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
-						//insert all k/v pairs into DB
-						boolean keyGroupHasMoreKeys = true;
-						while (keyGroupHasMoreKeys) {
-							byte[] key = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
-							byte[] value = BytePrimitiveArraySerializer.INSTANCE.deserialize(compressedKgInputView);
-							if (RocksDBFullSnapshotOperation.hasMetaDataFollowsFlag(key)) {
-								//clear the signal bit in the key to make it ready for insertion again
-								RocksDBFullSnapshotOperation.clearMetaDataFollowsFlag(key);
-								rocksDBKeyedStateBackend.db.put(handle, key, value);
-								//TODO this could be aware of keyGroupPrefixBytes and write only one byte if possible
-								kvStateId = RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK
-									& compressedKgInputView.readShort();
-								if (RocksDBFullSnapshotOperation.END_OF_KEY_GROUP_MARK == kvStateId) {
-									keyGroupHasMoreKeys = false;
-								} else {
-									handle = currentStateHandleKVStateColumnFamilies.get(kvStateId);
-								}
-							} else {
-								rocksDBKeyedStateBackend.db.put(handle, key, value);
-							}
-						}
-					}
-				}
-			}
-		}
-	}
-
-	private static class RocksDBIncrementalRestoreOperation<T> {
-
-		private final RocksDBKeyedStateBackend<T> stateBackend;
-
-		private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBackend) {
-			this.stateBackend = stateBackend;
-		}
-
-		private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData(
-			StreamStateHandle metaStateHandle) throws Exception {
-
-			FSDataInputStream inputStream = null;
-
-			try {
-				inputStream = metaStateHandle.openInputStream();
-				stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
-
-				KeyedBackendSerializationProxy<T> serializationProxy =
-					new KeyedBackendSerializationProxy<>(stateBackend.userCodeClassLoader);
-				DataInputView in = new DataInputViewStreamWrapper(inputStream);
-				serializationProxy.read(in);
-
-				// check for key serializer compatibility; this also reconfigures the
-				// key serializer to be compatible, if it is required and is possible
-				if (CompatibilityUtil.resolveCompatibilityResult(
-					serializationProxy.getKeySerializer(),
-					UnloadableDummyTypeSerializer.class,
-					serializationProxy.getKeySerializerConfigSnapshot(),
-					stateBackend.keySerializer)
-					.isRequiresMigration()) {
-
-					// TODO replace with state migration; note that key hash codes need to remain the same after migration
-					throw new StateMigrationException("The new key serializer is not compatible to read previous keys. " +
-						"Aborting now since state migration is currently not available");
-				}
-
-				return serializationProxy.getStateMetaInfoSnapshots();
-			} finally {
-				if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
-					inputStream.close();
-				}
-			}
-		}
-
-		private void readStateData(
-			Path restoreFilePath,
-			StreamStateHandle remoteFileHandle) throws IOException {
-
-			FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
-
-			FSDataInputStream inputStream = null;
-			FSDataOutputStream outputStream = null;
-
-			try {
-				inputStream = remoteFileHandle.openInputStream();
-				stateBackend.cancelStreamRegistry.registerCloseable(inputStream);
-
-				outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
-				stateBackend.cancelStreamRegistry.registerCloseable(outputStream);
-
-				byte[] buffer = new byte[8 * 1024];
-				while (true) {
-					int numBytes = inputStream.read(buffer);
-					if (numBytes == -1) {
-						break;
-					}
-
-					outputStream.write(buffer, 0, numBytes);
-				}
-			} finally {
-				if (inputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
-					inputStream.close();
-				}
-
-				if (outputStream != null && stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
-					outputStream.close();
-				}
-			}
-		}
-
-		private void restoreInstance(
-			IncrementalKeyedStateHandle restoreStateHandle,
-			boolean hasExtraKeys) throws Exception {
-
-			// read state data
-			Path restoreInstancePath = new Path(
-				stateBackend.instanceBasePath.getAbsolutePath(),
-				UUID.randomUUID().toString());
-
-			try {
-				final Map<StateHandleID, StreamStateHandle> sstFiles =
-					restoreStateHandle.getSharedState();
-				final Map<StateHandleID, StreamStateHandle> miscFiles =
-					restoreStateHandle.getPrivateState();
-
-				readAllStateData(sstFiles, restoreInstancePath);
-				readAllStateData(miscFiles, restoreInstancePath);
-
-				// read meta data
-				List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots =
-					readMetaData(restoreStateHandle.getMetaStateHandle());
-
-				List<ColumnFamilyDescriptor> columnFamilyDescriptors =
-					new ArrayList<>(1 + stateMetaInfoSnapshots.size());
-
-				for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
-
-					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
-						stateMetaInfoSnapshot.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
-						stateBackend.columnOptions);
-
-					columnFamilyDescriptors.add(columnFamilyDescriptor);
-					stateBackend.restoredKvStateMetaInfos.put(stateMetaInfoSnapshot.getName(), stateMetaInfoSnapshot);
-				}
-
-				if (hasExtraKeys) {
-
-					List<ColumnFamilyHandle> columnFamilyHandles =
-						new ArrayList<>(1 + columnFamilyDescriptors.size());
-
-					try (RocksDB restoreDb = stateBackend.openDB(
-						restoreInstancePath.getPath(),
-						columnFamilyDescriptors,
-						columnFamilyHandles)) {
-
-						try {
-							// iterating only the requested descriptors automatically skips the default column family handle
-							for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
-								ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
-								ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
-								RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
-
-								Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
-									stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
-
-								if (null == registeredStateMetaInfoEntry) {
-
-									RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-										new RegisteredKeyedBackendStateMetaInfo<>(
-											stateMetaInfoSnapshot.getStateType(),
-											stateMetaInfoSnapshot.getName(),
-											stateMetaInfoSnapshot.getNamespaceSerializer(),
-											stateMetaInfoSnapshot.getStateSerializer());
-
-									registeredStateMetaInfoEntry =
-										new Tuple2<>(
-											stateBackend.db.createColumnFamily(columnFamilyDescriptor),
-											stateMetaInfo);
-
-									stateBackend.kvStateInformation.put(
-										stateMetaInfoSnapshot.getName(),
-										registeredStateMetaInfoEntry);
-								}
-
-								ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
-
-								try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {
-
-									int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
-									byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
-									for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
-										startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
-									}
-
-									iterator.seek(startKeyGroupPrefixBytes);
-
-									while (iterator.isValid()) {
-
-										int keyGroup = 0;
-										for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
-											keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
-										}
-
-										if (stateBackend.keyGroupRange.contains(keyGroup)) {
-											stateBackend.db.put(targetColumnFamilyHandle,
-												iterator.key(), iterator.value());
-										}
-
-										iterator.next();
-									}
-								} // releases native iterator resources
-							}
-						} finally {
-							//release native tmp db column family resources
-							for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
-								IOUtils.closeQuietly(columnFamilyHandle);
-							}
-						}
-					} // releases native tmp db resources
-				} else {
-					// pick up again the old backend id, so the we can reference existing state
-					stateBackend.backendUID = restoreStateHandle.getBackendIdentifier();
-
-					LOG.debug("Restoring keyed backend uid in operator {} from incremental snapshot to {}.",
-						stateBackend.operatorIdentifier, stateBackend.backendUID);
-
-					// create hard links in the instance directory
-					if (!stateBackend.instanceRocksDBPath.mkdirs()) {
-						throw new IOException("Could not create RocksDB data directory.");
-					}
-
-					createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
-					createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
-
-					List<ColumnFamilyHandle> columnFamilyHandles =
-						new ArrayList<>(1 + columnFamilyDescriptors.size());
-
-					stateBackend.db = stateBackend.openDB(
-						stateBackend.instanceRocksDBPath.getAbsolutePath(),
-						columnFamilyDescriptors, columnFamilyHandles);
-
-					// extract and store the default column family which is located at the last index
-					stateBackend.defaultColumnFamily = columnFamilyHandles.remove(columnFamilyHandles.size() - 1);
-
-					for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
-						RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
-
-						ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
-						RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-							new RegisteredKeyedBackendStateMetaInfo<>(
-								stateMetaInfoSnapshot.getStateType(),
-								stateMetaInfoSnapshot.getName(),
-								stateMetaInfoSnapshot.getNamespaceSerializer(),
-								stateMetaInfoSnapshot.getStateSerializer());
-
-						stateBackend.kvStateInformation.put(
-							stateMetaInfoSnapshot.getName(),
-							new Tuple2<>(columnFamilyHandle, stateMetaInfo));
-					}
-
-					// use the restore sst files as the base for succeeding checkpoints
-					synchronized (stateBackend.materializedSstFiles) {
-						stateBackend.materializedSstFiles.put(restoreStateHandle.getCheckpointId(), sstFiles.keySet());
-					}
-
-					stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();
-				}
-			} finally {
-				FileSystem restoreFileSystem = restoreInstancePath.getFileSystem();
-				if (restoreFileSystem.exists(restoreInstancePath)) {
-					restoreFileSystem.delete(restoreInstancePath, true);
-				}
-			}
-		}
-
-		private void readAllStateData(
-			Map<StateHandleID, StreamStateHandle> stateHandleMap,
-			Path restoreInstancePath) throws IOException {
-
-			for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) {
-				StateHandleID stateHandleID = entry.getKey();
-				StreamStateHandle remoteFileHandle = entry.getValue();
-				readStateData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
-			}
-		}
-
-		private void createFileHardLinksInRestorePath(
-			Map<StateHandleID, StreamStateHandle> stateHandleMap,
-			Path restoreInstancePath) throws IOException {
-
-			for (StateHandleID stateHandleID : stateHandleMap.keySet()) {
-				String newSstFileName = stateHandleID.toString();
-				File restoreFile = new File(restoreInstancePath.getPath(), newSstFileName);
-				File targetFile = new File(stateBackend.instanceRocksDBPath, newSstFileName);
-				Files.createLink(targetFile.toPath(), restoreFile.toPath());
-			}
-		}
-
-		void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {
-
-			boolean hasExtraKeys = (restoreStateHandles.size() > 1 ||
-				!Objects.equals(restoreStateHandles.iterator().next().getKeyGroupRange(), stateBackend.keyGroupRange));
-
-			if (hasExtraKeys) {
-				stateBackend.createDB();
-			}
-
-			for (KeyedStateHandle rawStateHandle : restoreStateHandles) {
-
-				if (!(rawStateHandle instanceof IncrementalKeyedStateHandle)) {
-					throw new IllegalStateException("Unexpected state handle type, " +
-						"expected " + IncrementalKeyedStateHandle.class +
-						", but found " + rawStateHandle.getClass());
-				}
-
-				IncrementalKeyedStateHandle keyedStateHandle = (IncrementalKeyedStateHandle) rawStateHandle;
-
-				restoreInstance(keyedStateHandle, hasExtraKeys);
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  State factories
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Creates a column family handle for use with a k/v state. When restoring from a snapshot
-	 * we don't restore the individual k/v states, just the global RocksDB database and the
-	 * list of column families. When a k/v state is first requested we check here whether we
-	 * already have a column family for that and return it or create a new one if it doesn't exist.
-	 *
-	 * <p>This also checks whether the {@link StateDescriptor} for a state matches the one
-	 * that we checkpointed, i.e. is already in the map of column families.
-	 */
-	@SuppressWarnings("rawtypes, unchecked")
-	protected <N, S> ColumnFamilyHandle getColumnFamily(
-		StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
-
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
-			kvStateInformation.get(descriptor.getName());
-
-		RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
-			descriptor.getType(),
-			descriptor.getName(),
-			namespaceSerializer,
-			descriptor.getSerializer());
-
-		if (stateInfo != null) {
-			// TODO with eager registration in place, these checks should be moved to restore()
-
-			RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S> restoredMetaInfo =
-				(RegisteredKeyedBackendStateMetaInfo.Snapshot<N, S>) restoredKvStateMetaInfos.get(descriptor.getName());
-
-			Preconditions.checkState(
-				Objects.equals(newMetaInfo.getName(), restoredMetaInfo.getName()),
-				"Incompatible state names. " +
-					"Was [" + restoredMetaInfo.getName() + "], " +
-					"registered with [" + newMetaInfo.getName() + "].");
-
-			if (!Objects.equals(newMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)
-				&& !Objects.equals(restoredMetaInfo.getStateType(), StateDescriptor.Type.UNKNOWN)) {
-
-				Preconditions.checkState(
-					newMetaInfo.getStateType() == restoredMetaInfo.getStateType(),
-					"Incompatible state types. " +
-						"Was [" + restoredMetaInfo.getStateType() + "], " +
-						"registered with [" + newMetaInfo.getStateType() + "].");
-			}
-
-			// check compatibility results to determine if state migration is required
-			CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-				restoredMetaInfo.getNamespaceSerializer(),
-				null,
-				restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
-				newMetaInfo.getNamespaceSerializer());
-
-			CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-				restoredMetaInfo.getStateSerializer(),
-				UnloadableDummyTypeSerializer.class,
-				restoredMetaInfo.getStateSerializerConfigSnapshot(),
-				newMetaInfo.getStateSerializer());
-
-			if (namespaceCompatibility.isRequiresMigration() || stateCompatibility.isRequiresMigration()) {
-				// TODO state migration currently isn't possible.
-				throw new StateMigrationException("State migration isn't supported, yet.");
-			} else {
-				stateInfo.f1 = newMetaInfo;
-				return stateInfo.f0;
-			}
-		}
-
-		byte[] nameBytes = descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
-		Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES, nameBytes),
-			"The chosen state name 'default' collides with the name of the default column family!");
-
-		ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions);
-
-		final ColumnFamilyHandle columnFamily;
-
-		try {
-			columnFamily = db.createColumnFamily(columnDescriptor);
-		} catch (RocksDBException e) {
-			throw new IOException("Error creating ColumnFamilyHandle.", e);
-		}
-
-		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
-			new Tuple2<>(columnFamily, newMetaInfo);
-		Map rawAccess = kvStateInformation;
-		rawAccess.put(descriptor.getName(), tuple);
-		return columnFamily;
-	}
-
-	@Override
-	protected <N, T> InternalValueState<N, T> createValueState(
-		TypeSerializer<N> namespaceSerializer,
-		ValueStateDescriptor<T> stateDesc) throws Exception {
-
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
-
-		return new RocksDBValueState<>(columnFamily, namespaceSerializer,  stateDesc, this);
-	}
-
-	@Override
-	protected <N, T> InternalListState<N, T> createListState(
-		TypeSerializer<N> namespaceSerializer,
-		ListStateDescriptor<T> stateDesc) throws Exception {
-
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
-
-		return new RocksDBListState<>(columnFamily, namespaceSerializer, stateDesc, this);
-	}
-
-	@Override
-	protected <N, T> InternalReducingState<N, T> createReducingState(
-		TypeSerializer<N> namespaceSerializer,
-		ReducingStateDescriptor<T> stateDesc) throws Exception {
-
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
-
-		return new RocksDBReducingState<>(columnFamily, namespaceSerializer,  stateDesc, this);
-	}
-
-	@Override
-	protected <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
-		TypeSerializer<N> namespaceSerializer,
-		AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
-
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
-		return new RocksDBAggregatingState<>(columnFamily, namespaceSerializer, stateDesc, this);
-	}
-
-	@Override
-	protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
-		TypeSerializer<N> namespaceSerializer,
-		FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
-
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
-
-		return new RocksDBFoldingState<>(columnFamily, namespaceSerializer, stateDesc, this);
-	}
-
-	@Override
-	protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
-		TypeSerializer<N> namespaceSerializer,
-		MapStateDescriptor<UK, UV> stateDesc) throws Exception {
-
-		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
-
-		return new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this);
-	}
-
-	/**
-	 * Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator.
-	 * Used by #MergeIterator.
-	 */
-	static final class MergeIterator implements AutoCloseable {
-
-		/**
-		 * @param iterator  The #RocksIterator to wrap .
-		 * @param kvStateId Id of the K/V state to which this iterator belongs.
-		 */
-		MergeIterator(RocksIterator iterator, int kvStateId) {
-			this.iterator = Preconditions.checkNotNull(iterator);
-			this.currentKey = iterator.key();
-			this.kvStateId = kvStateId;
-		}
-
-		private final RocksIterator iterator;
-		private byte[] currentKey;
-		private final int kvStateId;
-
-		public byte[] getCurrentKey() {
-			return currentKey;
-		}
-
-		public void setCurrentKey(byte[] currentKey) {
-			this.currentKey = currentKey;
-		}
-
-		public RocksIterator getIterator() {
-			return iterator;
-		}
-
-		public int getKvStateId() {
-			return kvStateId;
-		}
-
-		@Override
-		public void close() {
-			IOUtils.closeQuietly(iterator);
-		}
-	}
-
-	/**
-	 * Iterator that merges multiple RocksDB iterators to partition all states into contiguous key-groups.
-	 * The resulting iteration sequence is ordered by (key-group, kv-state).
-	 */
-	static final class RocksDBMergeIterator implements AutoCloseable {
-
-		private final PriorityQueue<MergeIterator> heap;
-		private final int keyGroupPrefixByteCount;
-		private boolean newKeyGroup;
-		private boolean newKVState;
-		private boolean valid;
-
-		private MergeIterator currentSubIterator;
-
-		private static final List<Comparator<MergeIterator>> COMPARATORS;
-
-		static {
-			int maxBytes = 4;
-			COMPARATORS = new ArrayList<>(maxBytes);
-			for (int i = 0; i < maxBytes; ++i) {
-				final int currentBytes = i;
-				COMPARATORS.add(new Comparator<MergeIterator>() {
-					@Override
-					public int compare(MergeIterator o1, MergeIterator o2) {
-						int arrayCmpRes = compareKeyGroupsForByteArrays(
-							o1.currentKey, o2.currentKey, currentBytes);
-						return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
-					}
-				});
-			}
-		}
-
-		RocksDBMergeIterator(List<Tuple2<RocksIterator, Integer>> kvStateIterators, final int keyGroupPrefixByteCount) {
-			Preconditions.checkNotNull(kvStateIterators);
-			this.keyGroupPrefixByteCount = keyGroupPrefixByteCount;
-
-			Comparator<MergeIterator> iteratorComparator = COMPARATORS.get(keyGroupPrefixByteCount);
-
-			if (kvStateIterators.size() > 0) {
-				PriorityQueue<MergeIterator> iteratorPriorityQueue =
-					new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
-
-				for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
-					final RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0;
-					rocksIterator.seekToFirst();
-					if (rocksIterator.isValid()) {
-						iteratorPriorityQueue.offer(new MergeIterator(rocksIterator, rocksIteratorWithKVStateId.f1));
-					} else {
-						IOUtils.closeQuietly(rocksIterator);
-					}
-				}
-
-				kvStateIterators.clear();
-
-				this.heap = iteratorPriorityQueue;
-				this.valid = !heap.isEmpty();
-				this.currentSubIterator = heap.poll();
-			} else {
-				// creating a PriorityQueue of size 0 results in an exception.
-				this.heap = null;
-				this.valid = false;
-			}
-
-			this.newKeyGroup = true;
-			this.newKVState = true;
-		}
-
-		/**
-		 * Advance the iterator. Should only be called if {@link #isValid()} returned true. Valid can only chance after
-		 * calls to {@link #next()}.
-		 */
-		public void next() {
-			newKeyGroup = false;
-			newKVState = false;
-
-			final RocksIterator rocksIterator = currentSubIterator.getIterator();
-			rocksIterator.next();
-
-			byte[] oldKey = currentSubIterator.getCurrentKey();
-			if (rocksIterator.isValid()) {
-				currentSubIterator.currentKey = rocksIterator.key();
-
-				if (isDifferentKeyGroup(oldKey, currentSubIterator.getCurrentKey())) {
-					heap.offer(currentSubIterator);
-					currentSubIterator = heap.poll();
-					newKVState = currentSubIterator.getIterator() != rocksIterator;
-					detectNewKeyGroup(oldKey);
-				}
-			} else {
-				IOUtils.closeQuietly(rocksIterator);
-
-				if (heap.isEmpty()) {
-					currentSubIterator = null;
-					valid = false;
-				} else {
-					currentSubIterator = heap.poll();
-					newKVState = true;
-					detectNewKeyGroup(oldKey);
-				}
-			}
-		}
-
-		private boolean isDifferentKeyGroup(byte[] a, byte[] b) {
-			return 0 != compareKeyGroupsForByteArrays(a, b, keyGroupPrefixByteCount);
-		}
-
-		private void detectNewKeyGroup(byte[] oldKey) {
-			if (isDifferentKeyGroup(oldKey, currentSubIterator.currentKey)) {
-				newKeyGroup = true;
-			}
-		}
-
-		/**
-		 * @return key-group for the current key
-		 */
-		public int keyGroup() {
-			int result = 0;
-			//big endian decode
-			for (int i = 0; i < keyGroupPrefixByteCount; ++i) {
-				result <<= 8;
-				result |= (currentSubIterator.currentKey[i] & 0xFF);
-			}
-			return result;
-		}
-
-		public byte[] key() {
-			return currentSubIterator.getCurrentKey();
-		}
-
-		public byte[] value() {
-			return currentSubIterator.getIterator().value();
-		}
-
-		/**
-		 * @return Id of K/V state to which the current key belongs.
-		 */
-		public int kvStateId() {
-			return currentSubIterator.getKvStateId();
-		}
-
-		/**
-		 * Indicates if current key starts a new k/v-state, i.e. belong to a different k/v-state than it's predecessor.
-		 * @return true iff the current key belong to a different k/v-state than it's predecessor.
-		 */
-		public boolean isNewKeyValueState() {
-			return newKVState;
-		}
-
-		/**
-		 * Indicates if current key starts a new key-group, i.e. belong to a different key-group than it's predecessor.
-		 * @return true iff the current key belong to a different key-group than it's predecessor.
-		 */
-		public boolean isNewKeyGroup() {
-			return newKeyGroup;
-		}
-
-		/**
-		 * Check if the iterator is still valid. Getters like {@link #key()}, {@link #value()}, etc. as well as
-		 * {@link #next()} should only be called if valid returned true. Should be checked after each call to
-		 * {@link #next()} before accessing iterator state.
-		 * @return True iff this iterator is valid.
-		 */
-		public boolean isValid() {
-			return valid;
-		}
-
-		private static int compareKeyGroupsForByteArrays(byte[] a, byte[] b, int len) {
-			for (int i = 0; i < len; ++i) {
-				int diff = (a[i] & 0xFF) - (b[i] & 0xFF);
-				if (diff != 0) {
-					return diff;
-				}
-			}
-			return 0;
-		}
-
-		@Override
-		public void close() {
-			IOUtils.closeQuietly(currentSubIterator);
-			currentSubIterator = null;
-
-			IOUtils.closeAllQuietly(heap);
-			heap.clear();
-		}
-	}
-
-	/**
-	 * Only visible for testing, DO NOT USE.
-	 */
-	@VisibleForTesting
-	public File getInstanceBasePath() {
-		return instanceBasePath;
-	}
-
-	@Override
-	public boolean supportsAsynchronousSnapshots() {
-		return true;
-	}
-
-	@VisibleForTesting
-	@SuppressWarnings("unchecked")
-	@Override
-	public int numStateEntries() {
-		int count = 0;
-
-		for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column : kvStateInformation.values()) {
-			try (RocksIterator rocksIterator = db.newIterator(column.f0)) {
-				rocksIterator.seekToFirst();
-
-				while (rocksIterator.isValid()) {
-					count++;
-					rocksIterator.next();
-				}
-			}
-		}
-
-		return count;
-	}
-
-	private static class RocksIteratorWrapper<K> implements Iterator<K> {
-		private final RocksIterator iterator;
-		private final String state;
-		private final TypeSerializer<K> keySerializer;
-		private final int keyGroupPrefixBytes;
-
-		public RocksIteratorWrapper(
-				RocksIterator iterator,
-				String state,
-				TypeSerializer<K> keySerializer,
-				int keyGroupPrefixBytes) {
-			this.iterator = Preconditions.checkNotNull(iterator);
-			this.state = Preconditions.checkNotNull(state);
-			this.keySerializer = Preconditions.checkNotNull(keySerializer);
-			this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
-		}
-
-		@Override
-		public boolean hasNext() {
-			return iterator.isValid();
-		}
-
-		@Override
-		public K next() {
-			if (!hasNext()) {
-				throw new NoSuchElementException("Failed to access state [" + state + "]");
-			}
-			try {
-				byte[] key = iterator.key();
-					DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(
-					new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes));
-				K value = keySerializer.deserialize(dataInput);
-				iterator.next();
-				return value;
-			} catch (IOException e) {
-				throw new FlinkRuntimeException("Failed to access state [" + state + "]", e);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2f7392d7/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
deleted file mode 100644
index f0481ec..0000000
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.runtime.state.internal.InternalListState;
-import org.apache.flink.util.Preconditions;
-
-import org.rocksdb.ColumnFamilyHandle;
-import org.rocksdb.RocksDBException;
-import org.rocksdb.WriteOptions;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-/**
- * {@link ListState} implementation that stores state in RocksDB.
- *
- * <p>{@link RocksDBStateBackend} must ensure that we set the
- * {@link org.rocksdb.StringAppendOperator} on the column family that we use for our state since
- * we use the {@code merge()} call.
- *
- * @param <K> The type of the key.
- * @param <N> The type of the namespace.
- * @param <V> The type of the values in the list state.
- */
-public class RocksDBListState<K, N, V>
-	extends AbstractRocksDBState<K, N, ListState<V>, ListStateDescriptor<V>, List<V>>
-	implements InternalListState<N, V> {
-
-	/** Serializer for the values. */
-	private final TypeSerializer<V> valueSerializer;
-
-	/**
-	 * We disable writes to the write-ahead-log here. We can't have these in the base class
-	 * because JNI segfaults for some reason if they are.
-	 */
-	private final WriteOptions writeOptions;
-
-	/**
-	 * Separator of StringAppendTestOperator in RocksDB.
-	 */
-	private static final byte DELIMITER = ',';
-
-	/**
-	 * Creates a new {@code RocksDBListState}.
-	 *
-	 * @param namespaceSerializer The serializer for the namespace.
-	 * @param stateDesc The state identifier for the state. This contains name
-	 *                     and can create a default state value.
-	 */
-	public RocksDBListState(ColumnFamilyHandle columnFamily,
-			TypeSerializer<N> namespaceSerializer,
-			ListStateDescriptor<V> stateDesc,
-			RocksDBKeyedStateBackend<K> backend) {
-
-		super(columnFamily, namespaceSerializer, stateDesc, backend);
-		this.valueSerializer = stateDesc.getElementSerializer();
-
-		writeOptions = new WriteOptions();
-		writeOptions.setDisableWAL(true);
-	}
-
-	@Override
-	public Iterable<V> get() {
-		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = keySerializationStream.toByteArray();
-			byte[] valueBytes = backend.db.get(columnFamily, key);
-
-			if (valueBytes == null) {
-				return null;
-			}
-
-			ByteArrayInputStream bais = new ByteArrayInputStream(valueBytes);
-			DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais);
-
-			List<V> result = new ArrayList<>();
-			while (in.available() > 0) {
-				result.add(valueSerializer.deserialize(in));
-				if (in.available() > 0) {
-					in.readByte();
-				}
-			}
-			return result;
-		} catch (IOException | RocksDBException e) {
-			throw new RuntimeException("Error while retrieving data from RocksDB", e);
-		}
-	}
-
-	@Override
-	public void add(V value) throws IOException {
-		Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
-
-		try {
-			writeCurrentKeyWithGroupAndNamespace();
-			byte[] key = keySerializationStream.toByteArray();
-			keySerializationStream.reset();
-			DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
-			valueSerializer.serialize(value, out);
-			backend.db.merge(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
-		} catch (Exception e) {
-			throw new RuntimeException("Error while adding data to RocksDB", e);
-		}
-	}
-
-	@Override
-	public void mergeNamespaces(N target, Collection<N> sources) throws Exception {
-		if (sources == null || sources.isEmpty()) {
-			return;
-		}
-
-		// cache key and namespace
-		final K key = backend.getCurrentKey();
-		final int keyGroup = backend.getCurrentKeyGroupIndex();
-
-		try {
-			// create the target full-binary-key
-			writeKeyWithGroupAndNamespace(
-					keyGroup, key, target,
-					keySerializationStream, keySerializationDataOutputView);
-			final byte[] targetKey = keySerializationStream.toByteArray();
-
-			// merge the sources to the target
-			for (N source : sources) {
-				if (source != null) {
-					writeKeyWithGroupAndNamespace(
-							keyGroup, key, source,
-							keySerializationStream, keySerializationDataOutputView);
-
-					byte[] sourceKey = keySerializationStream.toByteArray();
-					byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
-					backend.db.delete(columnFamily, sourceKey);
-
-					if (valueBytes != null) {
-						backend.db.merge(columnFamily, writeOptions, targetKey, valueBytes);
-					}
-				}
-			}
-		}
-		catch (Exception e) {
-			throw new Exception("Error while merging state in RocksDB", e);
-		}
-	}
-
-	@Override
-	public void update(List<V> values) throws Exception {
-		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
-
-		clear();
-
-		if (!values.isEmpty()) {
-			try {
-				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = keySerializationStream.toByteArray();
-
-				byte[] premerge = getPreMergedValue(values);
-				if (premerge != null) {
-					backend.db.put(columnFamily, writeOptions, key, premerge);
-				} else {
-					throw new IOException("Failed pre-merge values in update()");
-				}
-			} catch (IOException | RocksDBException e) {
-				throw new RuntimeException("Error while updating data to RocksDB", e);
-			}
-		}
-	}
-
-	@Override
-	public void addAll(List<V> values) throws Exception {
-		Preconditions.checkNotNull(values, "List of values to add cannot be null.");
-
-		if (!values.isEmpty()) {
-			try {
-				writeCurrentKeyWithGroupAndNamespace();
-				byte[] key = keySerializationStream.toByteArray();
-
-				byte[] premerge = getPreMergedValue(values);
-				if (premerge != null) {
-					backend.db.merge(columnFamily, writeOptions, key, premerge);
-				} else {
-					throw new IOException("Failed pre-merge values in addAll()");
-				}
-			} catch (IOException | RocksDBException e) {
-				throw new RuntimeException("Error while updating data to RocksDB", e);
-			}
-		}
-	}
-
-	private byte[] getPreMergedValue(List<V> values) throws IOException {
-		DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
-
-		keySerializationStream.reset();
-		boolean first = true;
-		for (V value : values) {
-			Preconditions.checkNotNull(value, "You cannot add null to a ListState.");
-			if (first) {
-				first = false;
-			} else {
-				keySerializationStream.write(DELIMITER);
-			}
-			valueSerializer.serialize(value, out);
-		}
-
-		return keySerializationStream.toByteArray();
-	}
-}


Mime
View raw message