flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [01/11] flink git commit: [FLINK-7460] [state backends] Close all ColumnFamilyHandles when restoring from rescaled incremental checkpoints
Date Thu, 24 Aug 2017 18:22:32 GMT
Repository: flink
Updated Branches:
  refs/heads/master 3f4de57b1 -> 6642768ad


[FLINK-7460] [state backends] Close all ColumnFamilyHandles when restoring from rescaled incremental checkpoints


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca87bec4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca87bec4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca87bec4

Branch: refs/heads/master
Commit: ca87bec4f79c32c9f6cf7a4aa96866f6fac957e0
Parents: 3f4de57
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Mon Aug 14 14:01:03 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Thu Aug 24 17:17:39 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         | 453 ++++++++++---------
 .../state/RocksDBStateBackendTest.java          | 313 +++++++------
 .../runtime/state/StateBackendTestBase.java     | 244 +++++-----
 3 files changed, 546 insertions(+), 464 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ca87bec4/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 756cfdd..b7f386d 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -105,7 +105,9 @@ import java.io.ObjectInputStream;
 import java.io.OutputStream;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -138,6 +140,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	/** The name of the merge operator in RocksDB. Do not change except you know exactly what you do. */
 	public static final String MERGE_OPERATOR_NAME = "stringappendtest";
 
+	/** Bytes for the name of the column decriptor for the default column family. */
+	public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = "default".getBytes(ConfigConstants.DEFAULT_CHARSET);
+
 	private final String operatorIdentifier;
 
 	/** The column family options from the options factory. */
@@ -196,7 +201,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private final SortedMap<Long, Set<StateHandleID>> materializedSstFiles;
 
 	/** The identifier of the last completed checkpoint. */
-	private long lastCompletedCheckpointId = -1;
+	private long lastCompletedCheckpointId = -1L;
 
 	/** Unique ID of this backend. */
 	private UUID backendUID;
@@ -204,17 +209,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	private static final String SST_FILE_SUFFIX = ".sst";
 
 	public RocksDBKeyedStateBackend(
-			String operatorIdentifier,
-			ClassLoader userCodeClassLoader,
-			File instanceBasePath,
-			DBOptions dbOptions,
-			ColumnFamilyOptions columnFamilyOptions,
-			TaskKvStateRegistry kvStateRegistry,
-			TypeSerializer<K> keySerializer,
-			int numberOfKeyGroups,
-			KeyGroupRange keyGroupRange,
-			ExecutionConfig executionConfig,
-			boolean enableIncrementalCheckpointing
+		String operatorIdentifier,
+		ClassLoader userCodeClassLoader,
+		File instanceBasePath,
+		DBOptions dbOptions,
+		ColumnFamilyOptions columnFamilyOptions,
+		TaskKvStateRegistry kvStateRegistry,
+		TypeSerializer<K> keySerializer,
+		int numberOfKeyGroups,
+		KeyGroupRange keyGroupRange,
+		ExecutionConfig executionConfig,
+		boolean enableIncrementalCheckpointing
 	) throws IOException {
 
 		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);
@@ -253,10 +258,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		this.restoredKvStateMetaInfos = new HashMap<>();
 		this.materializedSstFiles = new TreeMap<>();
 		this.backendUID = UUID.randomUUID();
-
-		LOG.debug("Setting initial backend ID in RocksDBKeyedStateBackend for operator {} to {}.",
-			this.operatorIdentifier,
-			this.backendUID);
+		LOG.debug("Setting initial keyed backend uid for operator {} to {}.", this.operatorIdentifier, this.backendUID);
 	}
 
 	/**
@@ -277,7 +279,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				// DB is closed. So we start with the ones created by Flink...
 				for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
 					kvStateInformation.values()) {
-
 					IOUtils.closeQuietly(columnMetaData.f0);
 				}
 
@@ -328,10 +329,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 */
 	@Override
 	public RunnableFuture<KeyedStateHandle> snapshot(
-			final long checkpointId,
-			final long timestamp,
-			final CheckpointStreamFactory streamFactory,
-			CheckpointOptions checkpointOptions) throws Exception {
+		final long checkpointId,
+		final long timestamp,
+		final CheckpointStreamFactory streamFactory,
+		CheckpointOptions checkpointOptions) throws Exception {
 
 		if (checkpointOptions.getCheckpointType() != CheckpointOptions.CheckpointType.SAVEPOINT &&
 			enableIncrementalCheckpointing) {
@@ -342,9 +343,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	private RunnableFuture<KeyedStateHandle> snapshotIncrementally(
-			final long checkpointId,
-			final long checkpointTimestamp,
-			final CheckpointStreamFactory checkpointStreamFactory) throws Exception {
+		final long checkpointId,
+		final long checkpointTimestamp,
+		final CheckpointStreamFactory checkpointStreamFactory) throws Exception {
 
 		final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
 			new RocksDBIncrementalSnapshotOperation<>(
@@ -361,7 +362,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			if (!hasRegisteredState()) {
 				if (LOG.isDebugEnabled()) {
 					LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " +
-							checkpointTimestamp + " . Returning null.");
+						checkpointTimestamp + " . Returning null.");
 				}
 				return DoneFuture.nullValue();
 			}
@@ -391,9 +392,9 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	private RunnableFuture<KeyedStateHandle> snapshotFully(
-			final long checkpointId,
-			final long timestamp,
-			final CheckpointStreamFactory streamFactory) throws Exception {
+		final long checkpointId,
+		final long timestamp,
+		final CheckpointStreamFactory streamFactory) throws Exception {
 
 		long startTime = System.currentTimeMillis();
 
@@ -406,7 +407,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				if (!hasRegisteredState()) {
 					if (LOG.isDebugEnabled()) {
 						LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
-								" . Returning null.");
+							" . Returning null.");
 					}
 					return DoneFuture.nullValue();
 				}
@@ -419,52 +420,52 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 		// implementation of the async IO operation, based on FutureTask
 		AbstractAsyncIOCallable<KeyedStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream> ioCallable =
-				new AbstractAsyncIOCallable<KeyedStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() {
+			new AbstractAsyncIOCallable<KeyedStateHandle, CheckpointStreamFactory.CheckpointStateOutputStream>() {
 
-					@Override
-					public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
-						snapshotOperation.openCheckpointStream();
-						return snapshotOperation.getOutStream();
-					}
+				@Override
+				public CheckpointStreamFactory.CheckpointStateOutputStream openIOHandle() throws Exception {
+					snapshotOperation.openCheckpointStream();
+					return snapshotOperation.getOutStream();
+				}
 
-					@Override
-					public KeyGroupsStateHandle performOperation() throws Exception {
-						long startTime = System.currentTimeMillis();
-						synchronized (asyncSnapshotLock) {
-							try {
-								// hold the db lock while operation on the db to guard us against async db disposal
-								if (db == null) {
-									throw new IOException("RocksDB closed.");
-								}
+				@Override
+				public KeyGroupsStateHandle performOperation() throws Exception {
+					long startTime = System.currentTimeMillis();
+					synchronized (asyncSnapshotLock) {
+						try {
+							// hold the db lock while operation on the db to guard us against async db disposal
+							if (db == null) {
+								throw new IOException("RocksDB closed.");
+							}
 
-								snapshotOperation.writeDBSnapshot();
+							snapshotOperation.writeDBSnapshot();
 
-							} finally {
-								snapshotOperation.closeCheckpointStream();
-							}
+						} finally {
+							snapshotOperation.closeCheckpointStream();
 						}
+					}
 
-						LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
-							streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
+					LOG.info("Asynchronous RocksDB snapshot ({}, asynchronous part) in thread {} took {} ms.",
+						streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));
 
-						return snapshotOperation.getSnapshotResultStateHandle();
-					}
+					return snapshotOperation.getSnapshotResultStateHandle();
+				}
 
-					private void releaseSnapshotOperationResources(boolean canceled) {
-						// hold the db lock while operation on the db to guard us against async db disposal
-						synchronized (asyncSnapshotLock) {
-							snapshotOperation.releaseSnapshotResources(canceled);
-						}
+				private void releaseSnapshotOperationResources(boolean canceled) {
+					// hold the db lock while operation on the db to guard us against async db disposal
+					synchronized (asyncSnapshotLock) {
+						snapshotOperation.releaseSnapshotResources(canceled);
 					}
+				}
 
-					@Override
-					public void done(boolean canceled) {
-						releaseSnapshotOperationResources(canceled);
-					}
-				};
+				@Override
+				public void done(boolean canceled) {
+					releaseSnapshotOperationResources(canceled);
+				}
+			};
 
 		LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " +
-				Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms.");
+			Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms.");
 
 		return AsyncStoppableTaskWithCallback.from(ioCallable);
 	}
@@ -493,8 +494,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private KeyGroupsStateHandle snapshotResultStateHandle;
 
 		RocksDBFullSnapshotOperation(
-				RocksDBKeyedStateBackend<K> stateBackend,
-				CheckpointStreamFactory checkpointStreamFactory) {
+			RocksDBKeyedStateBackend<K> stateBackend,
+			CheckpointStreamFactory checkpointStreamFactory) {
 
 			this.stateBackend = stateBackend;
 			this.checkpointStreamFactory = checkpointStreamFactory;
@@ -523,7 +524,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		public void openCheckpointStream() throws Exception {
 			Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set.");
 			outStream = checkpointStreamFactory.
-					createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
+				createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
 			stateBackend.cancelStreamRegistry.registerClosable(outStream);
 			outputView = new DataOutputViewStreamWrapper(outStream);
 		}
@@ -615,11 +616,11 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private void writeKVStateMetaData() throws IOException {
 
 			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots =
-					new ArrayList<>(stateBackend.kvStateInformation.size());
+				new ArrayList<>(stateBackend.kvStateInformation.size());
 
 			int kvStateId = 0;
 			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
-					stateBackend.kvStateInformation.entrySet()) {
+				stateBackend.kvStateInformation.entrySet()) {
 
 				metaInfoSnapshots.add(column.getValue().f1.snapshot());
 
@@ -628,7 +629,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				readOptions.setSnapshot(snapshot);
 
 				kvStateIterators.add(
-						new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId));
+					new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId));
 
 				++kvStateId;
 			}
@@ -797,10 +798,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private StreamStateHandle metaStateHandle = null;
 
 		private RocksDBIncrementalSnapshotOperation(
-				RocksDBKeyedStateBackend<K> stateBackend,
-				CheckpointStreamFactory checkpointStreamFactory,
-				long checkpointId,
-				long checkpointTimestamp) {
+			RocksDBKeyedStateBackend<K> stateBackend,
+			CheckpointStreamFactory checkpointStreamFactory,
+			long checkpointId,
+			long checkpointTimestamp) {
 
 			this.stateBackend = stateBackend;
 			this.checkpointStreamFactory = checkpointStreamFactory;
@@ -886,20 +887,14 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		void takeSnapshot() throws Exception {
 			assert (Thread.holdsLock(stateBackend.asyncSnapshotLock));
 
-			final long lastCompletedCheckpoint;
-
 			// use the last completed checkpoint as the comparison base.
 			synchronized (stateBackend.materializedSstFiles) {
-				lastCompletedCheckpoint = stateBackend.lastCompletedCheckpointId;
-				baseSstFiles = stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
+				baseSstFiles = stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
 			}
 
-			LOG.trace("Taking incremental snapshot for checkpoint {}. Snapshot is based on last completed checkpoint {} " +
-				"assuming the following (shared) files as base: {}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
-
 			// save meta data
 			for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
-					: stateBackend.kvStateInformation.entrySet()) {
+				: stateBackend.kvStateInformation.entrySet()) {
 				stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().f1.snapshot());
 			}
 
@@ -1054,47 +1049,39 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	private void createDB() throws IOException {
-		db = openDB(instanceRocksDBPath.getAbsolutePath(),
-			new ArrayList<ColumnFamilyDescriptor>(),
-			null);
+		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(1);
+		this.db = openDB(instanceRocksDBPath.getAbsolutePath(), Collections.emptyList(), columnFamilyHandles);
+		this.defaultColumnFamily = columnFamilyHandles.get(0);
 	}
 
 	private RocksDB openDB(
-			String path,
-			List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
-			List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException {
+		String path,
+		List<ColumnFamilyDescriptor> stateColumnFamilyDescriptors,
+		List<ColumnFamilyHandle> stateColumnFamilyHandles) throws IOException {
 
-		List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(stateColumnFamilyDescriptors);
+		List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+			new ArrayList<>(1 + stateColumnFamilyDescriptors.size());
 
-		// we add the required descriptor for the default CF in last position.
-		columnFamilyDescriptors.add(
-			new ColumnFamilyDescriptor(
-				"default".getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions));
+		columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);
 
-		List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>(columnFamilyDescriptors.size());
+		// we add the required descriptor for the default CF in last position.
+		columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));
 
 		RocksDB db;
 
 		try {
 			db = RocksDB.open(
-					Preconditions.checkNotNull(dbOptions),
-					Preconditions.checkNotNull(path),
-					columnFamilyDescriptors,
-					columnFamilyHandles);
+				Preconditions.checkNotNull(dbOptions),
+				Preconditions.checkNotNull(path),
+				columnFamilyDescriptors,
+				stateColumnFamilyHandles);
 		} catch (RocksDBException e) {
 			throw new IOException("Error while opening RocksDB instance.", e);
 		}
 
-		final int defaultColumnFamilyIndex = columnFamilyHandles.size() - 1;
-
-		// extract the default column family.
-		defaultColumnFamily = columnFamilyHandles.get(defaultColumnFamilyIndex);
-
-		if (stateColumnFamilyHandles != null) {
-			// return all CFs except the default CF which is kept separately because it is not used in Flink operations.
-			stateColumnFamilyHandles.addAll(
-				columnFamilyHandles.subList(0, defaultColumnFamilyIndex));
-		}
+		// requested + default CF
+		Preconditions.checkState(1 + stateColumnFamilyDescriptors.size() == stateColumnFamilyHandles.size(),
+			"Not all requested column family handles have been created");
 
 		return db;
 	}
@@ -1135,7 +1122,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @throws RocksDBException
 		 */
 		public void doRestore(Collection<KeyedStateHandle> keyedStateHandles)
-				throws IOException, StateMigrationException, ClassNotFoundException, RocksDBException {
+			throws IOException, StateMigrationException, ClassNotFoundException, RocksDBException {
 
 			rocksDBKeyedStateBackend.createDB();
 
@@ -1144,8 +1131,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 					if (!(keyedStateHandle instanceof KeyGroupsStateHandle)) {
 						throw new IllegalStateException("Unexpected state handle type, " +
-								"expected: " + KeyGroupsStateHandle.class +
-								", but found: " + keyedStateHandle.getClass());
+							"expected: " + KeyGroupsStateHandle.class +
+							", but found: " + keyedStateHandle.getClass());
 					}
 					this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
 					restoreKeyGroupsInStateHandle();
@@ -1161,7 +1148,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		 * @throws ClassNotFoundException
 		 */
 		private void restoreKeyGroupsInStateHandle()
-				throws IOException, StateMigrationException, RocksDBException, ClassNotFoundException {
+			throws IOException, StateMigrationException, RocksDBException, ClassNotFoundException {
 			try {
 				currentStateHandleInStream = currentKeyGroupsStateHandle.openInputStream();
 				rocksDBKeyedStateBackend.cancelStreamRegistry.registerClosable(currentStateHandleInStream);
@@ -1186,17 +1173,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException {
 
 			KeyedBackendSerializationProxy<K> serializationProxy =
-					new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
+				new KeyedBackendSerializationProxy<>(rocksDBKeyedStateBackend.userCodeClassLoader);
 
 			serializationProxy.read(currentStateHandleInView);
 
 			// check for key serializer compatibility; this also reconfigures the
 			// key serializer to be compatible, if it is required and is possible
 			if (CompatibilityUtil.resolveCompatibilityResult(
-					serializationProxy.getKeySerializer(),
-					UnloadableDummyTypeSerializer.class,
-					serializationProxy.getKeySerializerConfigSnapshot(),
-					rocksDBKeyedStateBackend.keySerializer)
+				serializationProxy.getKeySerializer(),
+				UnloadableDummyTypeSerializer.class,
+				serializationProxy.getKeySerializerConfigSnapshot(),
+				rocksDBKeyedStateBackend.keySerializer)
 				.isRequiresMigration()) {
 
 				// TODO replace with state migration; note that key hash codes need to remain the same after migration
@@ -1208,7 +1195,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				SnappyStreamCompressionDecorator.INSTANCE : UncompressedStreamCompressionDecorator.INSTANCE;
 
 			List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
-					serializationProxy.getStateMetaInfoSnapshots();
+				serializationProxy.getStateMetaInfoSnapshots();
 			currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
 			//rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size());
 
@@ -1218,22 +1205,24 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					rocksDBKeyedStateBackend.kvStateInformation.get(restoredMetaInfo.getName());
 
 				if (registeredColumn == null) {
+					byte[] nameBytes = restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
+
 					ColumnFamilyDescriptor columnFamilyDescriptor = new ColumnFamilyDescriptor(
-						restoredMetaInfo.getName().getBytes(ConfigConstants.DEFAULT_CHARSET),
+						nameBytes,
 						rocksDBKeyedStateBackend.columnOptions);
 
 					RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-							new RegisteredKeyedBackendStateMetaInfo<>(
-								restoredMetaInfo.getStateType(),
-								restoredMetaInfo.getName(),
-								restoredMetaInfo.getNamespaceSerializer(),
-								restoredMetaInfo.getStateSerializer());
+						new RegisteredKeyedBackendStateMetaInfo<>(
+							restoredMetaInfo.getStateType(),
+							restoredMetaInfo.getName(),
+							restoredMetaInfo.getNamespaceSerializer(),
+							restoredMetaInfo.getStateSerializer());
 
 					rocksDBKeyedStateBackend.restoredKvStateMetaInfos.put(restoredMetaInfo.getName(), restoredMetaInfo);
 
 					ColumnFamilyHandle columnFamily = rocksDBKeyedStateBackend.db.createColumnFamily(columnFamilyDescriptor);
 
-					registeredColumn = new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(columnFamily, stateMetaInfo);
+					registeredColumn = new Tuple2<>(columnFamily, stateMetaInfo);
 					rocksDBKeyedStateBackend.kvStateInformation.put(stateMetaInfo.getName(), registeredColumn);
 
 				} else {
@@ -1303,7 +1292,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData(
-				StreamStateHandle metaStateHandle) throws Exception {
+			StreamStateHandle metaStateHandle) throws Exception {
 
 			FSDataInputStream inputStream = null;
 
@@ -1319,10 +1308,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				// check for key serializer compatibility; this also reconfigures the
 				// key serializer to be compatible, if it is required and is possible
 				if (CompatibilityUtil.resolveCompatibilityResult(
-						serializationProxy.getKeySerializer(),
-						UnloadableDummyTypeSerializer.class,
-						serializationProxy.getKeySerializerConfigSnapshot(),
-						stateBackend.keySerializer)
+					serializationProxy.getKeySerializer(),
+					UnloadableDummyTypeSerializer.class,
+					serializationProxy.getKeySerializerConfigSnapshot(),
+					stateBackend.keySerializer)
 					.isRequiresMigration()) {
 
 					// TODO replace with state migration; note that key hash codes need to remain the same after migration
@@ -1340,8 +1329,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		private void readStateData(
-				Path restoreFilePath,
-				StreamStateHandle remoteFileHandle) throws IOException {
+			Path restoreFilePath,
+			StreamStateHandle remoteFileHandle) throws IOException {
 
 			FileSystem restoreFileSystem = restoreFilePath.getFileSystem();
 
@@ -1378,8 +1367,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		}
 
 		private void restoreInstance(
-				IncrementalKeyedStateHandle restoreStateHandle,
-				boolean hasExtraKeys) throws Exception {
+			IncrementalKeyedStateHandle restoreStateHandle,
+			boolean hasExtraKeys) throws Exception {
 
 			// read state data
 			Path restoreInstancePath = new Path(
@@ -1399,7 +1388,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots =
 					readMetaData(restoreStateHandle.getMetaStateHandle());
 
-				List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
+				List<ColumnFamilyDescriptor> columnFamilyDescriptors =
+					new ArrayList<>(1 + stateMetaInfoSnapshots.size());
 
 				for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot : stateMetaInfoSnapshots) {
 
@@ -1413,69 +1403,78 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 				if (hasExtraKeys) {
 
-					List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+					List<ColumnFamilyHandle> columnFamilyHandles =
+						new ArrayList<>(1 + columnFamilyDescriptors.size());
 
 					try (RocksDB restoreDb = stateBackend.openDB(
-							restoreInstancePath.getPath(),
-							columnFamilyDescriptors,
-							columnFamilyHandles)) {
-
-						for (int i = 0; i < columnFamilyHandles.size(); ++i) {
-							ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
-							ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
-							RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
-
-							Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
-								stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
-
-							if (null == registeredStateMetaInfoEntry) {
-
-								RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
-									new RegisteredKeyedBackendStateMetaInfo<>(
-										stateMetaInfoSnapshot.getStateType(),
+						restoreInstancePath.getPath(),
+						columnFamilyDescriptors,
+						columnFamilyHandles)) {
+
+						try {
+							// iterating only the requested descriptors automatically skips the default column family handle
+							for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
+								ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
+								ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
+								RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
+
+								Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> registeredStateMetaInfoEntry =
+									stateBackend.kvStateInformation.get(stateMetaInfoSnapshot.getName());
+
+								if (null == registeredStateMetaInfoEntry) {
+
+									RegisteredKeyedBackendStateMetaInfo<?, ?> stateMetaInfo =
+										new RegisteredKeyedBackendStateMetaInfo<>(
+											stateMetaInfoSnapshot.getStateType(),
+											stateMetaInfoSnapshot.getName(),
+											stateMetaInfoSnapshot.getNamespaceSerializer(),
+											stateMetaInfoSnapshot.getStateSerializer());
+
+									registeredStateMetaInfoEntry =
+										new Tuple2<>(
+											stateBackend.db.createColumnFamily(columnFamilyDescriptor),
+											stateMetaInfo);
+
+									stateBackend.kvStateInformation.put(
 										stateMetaInfoSnapshot.getName(),
-										stateMetaInfoSnapshot.getNamespaceSerializer(),
-										stateMetaInfoSnapshot.getStateSerializer());
-
-								registeredStateMetaInfoEntry =
-									new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
-										stateBackend.db.createColumnFamily(columnFamilyDescriptor),
-										stateMetaInfo);
+										registeredStateMetaInfoEntry);
+								}
 
-								stateBackend.kvStateInformation.put(
-									stateMetaInfoSnapshot.getName(),
-									registeredStateMetaInfoEntry);
-							}
+								ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
 
-							ColumnFamilyHandle targetColumnFamilyHandle = registeredStateMetaInfoEntry.f0;
+								try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {
 
-							try (RocksIterator iterator = restoreDb.newIterator(columnFamilyHandle)) {
+									int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
+									byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
+									for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
+										startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
+									}
 
-								int startKeyGroup = stateBackend.getKeyGroupRange().getStartKeyGroup();
-								byte[] startKeyGroupPrefixBytes = new byte[stateBackend.keyGroupPrefixBytes];
-								for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
-									startKeyGroupPrefixBytes[j] = (byte) (startKeyGroup >>> ((stateBackend.keyGroupPrefixBytes - j - 1) * Byte.SIZE));
-								}
+									iterator.seek(startKeyGroupPrefixBytes);
 
-								iterator.seek(startKeyGroupPrefixBytes);
+									while (iterator.isValid()) {
 
-								while (iterator.isValid()) {
+										int keyGroup = 0;
+										for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
+											keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
+										}
 
-									int keyGroup = 0;
-									for (int j = 0; j < stateBackend.keyGroupPrefixBytes; ++j) {
-										keyGroup = (keyGroup << Byte.SIZE) + iterator.key()[j];
-									}
+										if (stateBackend.keyGroupRange.contains(keyGroup)) {
+											stateBackend.db.put(targetColumnFamilyHandle,
+												iterator.key(), iterator.value());
+										}
 
-									if (stateBackend.keyGroupRange.contains(keyGroup)) {
-										stateBackend.db.put(targetColumnFamilyHandle,
-											iterator.key(), iterator.value());
+										iterator.next();
 									}
-
-									iterator.next();
-								}
+								} // releases native iterator resources
+							}
+						} finally {
+							//release native tmp db column family resources
+							for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
+								IOUtils.closeQuietly(columnFamilyHandle);
 							}
 						}
-					}
+					} // releases native tmp db resources
 				} else {
 					// pick up again the old backend id, so the we can reference existing state
 					stateBackend.backendUID = restoreStateHandle.getBackendIdentifier();
@@ -1491,11 +1490,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					createFileHardLinksInRestorePath(sstFiles, restoreInstancePath);
 					createFileHardLinksInRestorePath(miscFiles, restoreInstancePath);
 
-					List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+					List<ColumnFamilyHandle> columnFamilyHandles =
+						new ArrayList<>(1 + columnFamilyDescriptors.size());
+
 					stateBackend.db = stateBackend.openDB(
 						stateBackend.instanceRocksDBPath.getAbsolutePath(),
 						columnFamilyDescriptors, columnFamilyHandles);
 
+					// extract and store the default column family which is located at the last index
+					stateBackend.defaultColumnFamily = columnFamilyHandles.remove(columnFamilyHandles.size() - 1);
+
 					for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
 						RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
 
@@ -1509,8 +1513,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 						stateBackend.kvStateInformation.put(
 							stateMetaInfoSnapshot.getName(),
-							new Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>(
-								columnFamilyHandle, stateMetaInfo));
+							new Tuple2<>(columnFamilyHandle, stateMetaInfo));
 					}
 
 					// use the restore sst files as the base for succeeding checkpoints
@@ -1590,10 +1593,10 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 */
 	@SuppressWarnings("rawtypes, unchecked")
 	protected <N, S> ColumnFamilyHandle getColumnFamily(
-			StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
+		StateDescriptor<?, S> descriptor, TypeSerializer<N> namespaceSerializer) throws IOException, StateMigrationException {
 
 		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> stateInfo =
-				kvStateInformation.get(descriptor.getName());
+			kvStateInformation.get(descriptor.getName());
 
 		RegisteredKeyedBackendStateMetaInfo<N, S> newMetaInfo = new RegisteredKeyedBackendStateMetaInfo<>(
 			descriptor.getType(),
@@ -1625,16 +1628,16 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			// check compatibility results to determine if state migration is required
 			CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-					restoredMetaInfo.getNamespaceSerializer(),
-					MigrationNamespaceSerializerProxy.class,
-					restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
-					newMetaInfo.getNamespaceSerializer());
+				restoredMetaInfo.getNamespaceSerializer(),
+				MigrationNamespaceSerializerProxy.class,
+				restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
+				newMetaInfo.getNamespaceSerializer());
 
 			CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
-					restoredMetaInfo.getStateSerializer(),
-					UnloadableDummyTypeSerializer.class,
-					restoredMetaInfo.getStateSerializerConfigSnapshot(),
-					newMetaInfo.getStateSerializer());
+				restoredMetaInfo.getStateSerializer(),
+				UnloadableDummyTypeSerializer.class,
+				restoredMetaInfo.getStateSerializerConfigSnapshot(),
+				newMetaInfo.getStateSerializer());
 
 			if (!namespaceCompatibility.isRequiresMigration() && !stateCompatibility.isRequiresMigration()) {
 				stateInfo.f1 = newMetaInfo;
@@ -1645,25 +1648,31 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			}
 		}
 
-		ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(
-				descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET), columnOptions);
+		byte[] nameBytes = descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
+		Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES, nameBytes),
+			"The chosen state name 'default' collides with the name of the default column family!");
+
+		ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions);
+
+		final ColumnFamilyHandle columnFamily;
 
 		try {
-			ColumnFamilyHandle columnFamily = db.createColumnFamily(columnDescriptor);
-			Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
-					new Tuple2<>(columnFamily, newMetaInfo);
-			Map rawAccess = kvStateInformation;
-			rawAccess.put(descriptor.getName(), tuple);
-			return columnFamily;
+			columnFamily = db.createColumnFamily(columnDescriptor);
 		} catch (RocksDBException e) {
 			throw new IOException("Error creating ColumnFamilyHandle.", e);
 		}
+
+		Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<N, S>> tuple =
+			new Tuple2<>(columnFamily, newMetaInfo);
+		Map rawAccess = kvStateInformation;
+		rawAccess.put(descriptor.getName(), tuple);
+		return columnFamily;
 	}
 
 	@Override
 	protected <N, T> InternalValueState<N, T> createValueState(
-			TypeSerializer<N> namespaceSerializer,
-			ValueStateDescriptor<T> stateDesc) throws Exception {
+		TypeSerializer<N> namespaceSerializer,
+		ValueStateDescriptor<T> stateDesc) throws Exception {
 
 		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
 
@@ -1672,8 +1681,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	@Override
 	protected <N, T> InternalListState<N, T> createListState(
-			TypeSerializer<N> namespaceSerializer,
-			ListStateDescriptor<T> stateDesc) throws Exception {
+		TypeSerializer<N> namespaceSerializer,
+		ListStateDescriptor<T> stateDesc) throws Exception {
 
 		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
 
@@ -1682,8 +1691,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	@Override
 	protected <N, T> InternalReducingState<N, T> createReducingState(
-			TypeSerializer<N> namespaceSerializer,
-			ReducingStateDescriptor<T> stateDesc) throws Exception {
+		TypeSerializer<N> namespaceSerializer,
+		ReducingStateDescriptor<T> stateDesc) throws Exception {
 
 		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
 
@@ -1692,8 +1701,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	@Override
 	protected <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
-			TypeSerializer<N> namespaceSerializer,
-			AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
+		TypeSerializer<N> namespaceSerializer,
+		AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {
 
 		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
 		return new RocksDBAggregatingState<>(columnFamily, namespaceSerializer, stateDesc, this);
@@ -1701,8 +1710,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	@Override
 	protected <N, T, ACC> InternalFoldingState<N, T, ACC> createFoldingState(
-			TypeSerializer<N> namespaceSerializer,
-			FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
+		TypeSerializer<N> namespaceSerializer,
+		FoldingStateDescriptor<T, ACC> stateDesc) throws Exception {
 
 		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
 
@@ -1711,7 +1720,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 	@Override
 	protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(TypeSerializer<N> namespaceSerializer,
-			MapStateDescriptor<UK, UV> stateDesc) throws Exception {
+	                                                                 MapStateDescriptor<UK, UV> stateDesc) throws Exception {
 		ColumnFamilyHandle columnFamily = getColumnFamily(stateDesc, namespaceSerializer);
 
 		return new RocksDBMapState<>(columnFamily, namespaceSerializer, stateDesc, this);
@@ -1784,7 +1793,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 					@Override
 					public int compare(MergeIterator o1, MergeIterator o2) {
 						int arrayCmpRes = compareKeyGroupsForByteArrays(
-								o1.currentKey, o2.currentKey, currentBytes);
+							o1.currentKey, o2.currentKey, currentBytes);
 						return arrayCmpRes == 0 ? o1.getKvStateId() - o2.getKvStateId() : arrayCmpRes;
 					}
 				});
@@ -1799,7 +1808,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 
 			if (kvStateIterators.size() > 0) {
 				PriorityQueue<MergeIterator> iteratorPriorityQueue =
-						new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
+					new PriorityQueue<>(kvStateIterators.size(), iteratorComparator);
 
 				for (Tuple2<RocksIterator, Integer> rocksIteratorWithKVStateId : kvStateIterators) {
 					final RocksIterator rocksIterator = rocksIteratorWithKVStateId.f0;
@@ -1968,8 +1977,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 		KeyedStateHandle keyedStateHandle = restoreState.iterator().next();
 		if (!(keyedStateHandle instanceof MigrationKeyGroupStateHandle)) {
 			throw new IllegalStateException("Unexpected state handle type, " +
-					"expected: " + MigrationKeyGroupStateHandle.class +
-					", but found: " + keyedStateHandle.getClass());
+				"expected: " + MigrationKeyGroupStateHandle.class +
+				", but found: " + keyedStateHandle.getClass());
 		}
 
 		MigrationKeyGroupStateHandle keyGroupStateHandle = (MigrationKeyGroupStateHandle) keyedStateHandle;
@@ -1989,8 +1998,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			byte mappingByte = inputView.readByte();
 
 			ObjectInputStream ooIn =
-					new InstantiationUtil.ClassLoaderObjectInputStream(
-							new DataInputViewStream(inputView), userCodeClassLoader);
+				new InstantiationUtil.ClassLoaderObjectInputStream(
+					new DataInputViewStream(inputView), userCodeClassLoader);
 
 			StateDescriptor<?, ?> stateDescriptor = (StateDescriptor<?, ?>) ooIn.readObject();
 
@@ -2015,7 +2024,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			while (true) {
 				byte mappingByte = inputView.readByte();
 				ColumnFamilyHandle handle = getColumnFamily(
-						columnFamilyMapping.get(mappingByte), MigrationNamespaceSerializerProxy.INSTANCE);
+					columnFamilyMapping.get(mappingByte), MigrationNamespaceSerializerProxy.INSTANCE);
 
 				byte[] keyAndNamespace = BytePrimitiveArraySerializer.INSTANCE.deserialize(inputView);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ca87bec4/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 991e0d4..08d661c 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -42,7 +42,9 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.filefilter.IOFileFilter;
+import org.junit.After;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -122,6 +124,22 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 		return backend;
 	}
 
+	// small safety net for instance cleanups, so that no native objects are left
+	@After
+	public void cleanupRocksDB() {
+		if (keyedStateBackend != null) {
+			IOUtils.closeQuietly(keyedStateBackend);
+			keyedStateBackend.dispose();
+		}
+
+		if (allCreatedCloseables != null) {
+			for (RocksObject rocksCloseable : allCreatedCloseables) {
+				verify(rocksCloseable, times(1)).close();
+			}
+			allCreatedCloseables = null;
+		}
+	}
+
 	public void setupRocksKeyedStateBackend() throws Exception {
 
 		blocker = new OneShotLatch();
@@ -238,149 +256,186 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 
 	@Test
 	public void testCorrectMergeOperatorSet() throws IOException {
-		ColumnFamilyOptions columnFamilyOptions = mock(ColumnFamilyOptions.class);
-
-		try (RocksDBKeyedStateBackend<Integer> test = new RocksDBKeyedStateBackend<>(
-			"test",
-			Thread.currentThread().getContextClassLoader(),
-			tempFolder.newFolder(),
-			mock(DBOptions.class),
-			columnFamilyOptions,
-			mock(TaskKvStateRegistry.class),
-			IntSerializer.INSTANCE,
-			1,
-			new KeyGroupRange(0, 0),
-			new ExecutionConfig(),
-			enableIncrementalCheckpointing)) {
+
+		final ColumnFamilyOptions columnFamilyOptions = spy(new ColumnFamilyOptions());
+		RocksDBKeyedStateBackend<Integer> test = null;
+		try {
+			test = new RocksDBKeyedStateBackend<>(
+				"test",
+				Thread.currentThread().getContextClassLoader(),
+				tempFolder.newFolder(),
+				mock(DBOptions.class),
+				columnFamilyOptions,
+				mock(TaskKvStateRegistry.class),
+				IntSerializer.INSTANCE,
+				1,
+				new KeyGroupRange(0, 0),
+				new ExecutionConfig(),
+				enableIncrementalCheckpointing);
 
 			verify(columnFamilyOptions, Mockito.times(1))
 				.setMergeOperatorName(RocksDBKeyedStateBackend.MERGE_OPERATOR_NAME);
+		} finally {
+			if (test != null) {
+				IOUtils.closeQuietly(test);
+				test.dispose();
+			}
+			columnFamilyOptions.close();
 		}
 	}
 
 	@Test
 	public void testReleasingSnapshotAfterBackendClosed() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
-			CheckpointOptions.forFullCheckpoint());
 
-		RocksDB spyDB = keyedStateBackend.db;
+		try {
+			RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
+				CheckpointOptions.forFullCheckpoint());
 
-		if (!enableIncrementalCheckpointing) {
-			verify(spyDB, times(1)).getSnapshot();
-			verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class));
-		}
+			RocksDB spyDB = keyedStateBackend.db;
 
-		this.keyedStateBackend.dispose();
-		verify(spyDB, times(1)).close();
-		assertEquals(null, keyedStateBackend.db);
+			if (!enableIncrementalCheckpointing) {
+				verify(spyDB, times(1)).getSnapshot();
+				verify(spyDB, times(0)).releaseSnapshot(any(Snapshot.class));
+			}
 
-		//Ensure every RocksObjects not closed yet
-		for (RocksObject rocksCloseable : allCreatedCloseables) {
-			verify(rocksCloseable, times(0)).close();
-		}
+			this.keyedStateBackend.dispose();
+			verify(spyDB, times(1)).close();
+			assertEquals(null, keyedStateBackend.db);
 
-		snapshot.cancel(true);
+			//Ensure every RocksObjects not closed yet
+			for (RocksObject rocksCloseable : allCreatedCloseables) {
+				verify(rocksCloseable, times(0)).close();
+			}
 
-		//Ensure every RocksObjects was closed exactly once
-		for (RocksObject rocksCloseable : allCreatedCloseables) {
-			verify(rocksCloseable, times(1)).close();
-		}
+			snapshot.cancel(true);
 
+			//Ensure every RocksObjects was closed exactly once
+			for (RocksObject rocksCloseable : allCreatedCloseables) {
+				verify(rocksCloseable, times(1)).close();
+			}
+		} finally {
+			keyedStateBackend.dispose();
+			keyedStateBackend = null;
+		}
 	}
 
 	@Test
 	public void testDismissingSnapshot() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
-		snapshot.cancel(true);
-		verifyRocksObjectsReleased();
+		try {
+			RunnableFuture<KeyedStateHandle> snapshot =
+				keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
+			snapshot.cancel(true);
+			verifyRocksObjectsReleased();
+		} finally {
+			this.keyedStateBackend.dispose();
+			this.keyedStateBackend = null;
+		}
 	}
 
 	@Test
 	public void testDismissingSnapshotNotRunnable() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
-		snapshot.cancel(true);
-		Thread asyncSnapshotThread = new Thread(snapshot);
-		asyncSnapshotThread.start();
 		try {
-			snapshot.get();
-			fail();
-		} catch (Exception ignored) {
+			RunnableFuture<KeyedStateHandle> snapshot =
+				keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
+			snapshot.cancel(true);
+			Thread asyncSnapshotThread = new Thread(snapshot);
+			asyncSnapshotThread.start();
+			try {
+				snapshot.get();
+				fail();
+			} catch (Exception ignored) {
 
+			}
+			asyncSnapshotThread.join();
+			verifyRocksObjectsReleased();
+		} finally {
+			this.keyedStateBackend.dispose();
+			this.keyedStateBackend = null;
 		}
-		asyncSnapshotThread.join();
-		verifyRocksObjectsReleased();
 	}
 
 	@Test
 	public void testCompletingSnapshot() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
-		Thread asyncSnapshotThread = new Thread(snapshot);
-		asyncSnapshotThread.start();
-		waiter.await(); // wait for snapshot to run
-		waiter.reset();
-		runStateUpdates();
-		blocker.trigger(); // allow checkpointing to start writing
-		waiter.await(); // wait for snapshot stream writing to run
-		KeyedStateHandle keyedStateHandle = snapshot.get();
-		assertNotNull(keyedStateHandle);
-		assertTrue(keyedStateHandle.getStateSize() > 0);
-		assertEquals(2, keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
-		assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
-		asyncSnapshotThread.join();
-		verifyRocksObjectsReleased();
+		try {
+			RunnableFuture<KeyedStateHandle> snapshot =
+				keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
+			Thread asyncSnapshotThread = new Thread(snapshot);
+			asyncSnapshotThread.start();
+			waiter.await(); // wait for snapshot to run
+			waiter.reset();
+			runStateUpdates();
+			blocker.trigger(); // allow checkpointing to start writing
+			waiter.await(); // wait for snapshot stream writing to run
+			KeyedStateHandle keyedStateHandle = snapshot.get();
+			assertNotNull(keyedStateHandle);
+			assertTrue(keyedStateHandle.getStateSize() > 0);
+			assertEquals(2, keyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
+			assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+			asyncSnapshotThread.join();
+			verifyRocksObjectsReleased();
+		} finally {
+			this.keyedStateBackend.dispose();
+			this.keyedStateBackend = null;
+		}
 	}
 
 	@Test
 	public void testCancelRunningSnapshot() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
-		Thread asyncSnapshotThread = new Thread(snapshot);
-		asyncSnapshotThread.start();
-		waiter.await(); // wait for snapshot to run
-		waiter.reset();
-		runStateUpdates();
-		snapshot.cancel(true);
-		blocker.trigger(); // allow checkpointing to start writing
-		assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
-		waiter.await(); // wait for snapshot stream writing to run
 		try {
-			snapshot.get();
-			fail();
-		} catch (Exception ignored) {
-		}
+			RunnableFuture<KeyedStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
+			Thread asyncSnapshotThread = new Thread(snapshot);
+			asyncSnapshotThread.start();
+			waiter.await(); // wait for snapshot to run
+			waiter.reset();
+			runStateUpdates();
+			snapshot.cancel(true);
+			blocker.trigger(); // allow checkpointing to start writing
+			assertTrue(testStreamFactory.getLastCreatedStream().isClosed());
+			waiter.await(); // wait for snapshot stream writing to run
+			try {
+				snapshot.get();
+				fail();
+			} catch (Exception ignored) {
+			}
 
-		asyncSnapshotThread.join();
-		verifyRocksObjectsReleased();
+			asyncSnapshotThread.join();
+			verifyRocksObjectsReleased();
+		} finally {
+			this.keyedStateBackend.dispose();
+			this.keyedStateBackend = null;
+		}
 	}
 
 	@Test
 	public void testDisposeDeletesAllDirectories() throws Exception {
 		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
-		ValueStateDescriptor<String> kvId =
+		Collection<File> allFilesInDbDir =
+			FileUtils.listFilesAndDirs(new File(dbPath), new AcceptAllFilter(), new AcceptAllFilter());
+		try {
+			ValueStateDescriptor<String> kvId =
 				new ValueStateDescriptor<>("id", String.class, null);
 
-		kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
-		ValueState<String> state =
+			ValueState<String> state =
 				backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
-		backend.setCurrentKey(1);
-		state.update("Hello");
-
-		Collection<File> allFilesInDbDir =
-				FileUtils.listFilesAndDirs(new File(dbPath), new AcceptAllFilter(), new AcceptAllFilter());
-
-		// more than just the root directory
-		assertTrue(allFilesInDbDir.size() > 1);
-
-		backend.dispose();
+			backend.setCurrentKey(1);
+			state.update("Hello");
 
+			// more than just the root directory
+			assertTrue(allFilesInDbDir.size() > 1);
+		} finally {
+			IOUtils.closeQuietly(backend);
+			backend.dispose();
+		}
 		allFilesInDbDir =
-				FileUtils.listFilesAndDirs(new File(dbPath), new AcceptAllFilter(), new AcceptAllFilter());
+			FileUtils.listFilesAndDirs(new File(dbPath), new AcceptAllFilter(), new AcceptAllFilter());
 
 		// just the root directory left
 		assertEquals(1, allFilesInDbDir.size());
@@ -390,62 +445,64 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	public void testSharedIncrementalStateDeRegistration() throws Exception {
 		if (enableIncrementalCheckpointing) {
 			AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
-			ValueStateDescriptor<String> kvId =
-				new ValueStateDescriptor<>("id", String.class, null);
+			try {
+				ValueStateDescriptor<String> kvId =
+					new ValueStateDescriptor<>("id", String.class, null);
 
-			kvId.initializeSerializerUnlessSet(new ExecutionConfig());
+				kvId.initializeSerializerUnlessSet(new ExecutionConfig());
 
-			ValueState<String> state =
-				backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+				ValueState<String> state =
+					backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
-			Queue<IncrementalKeyedStateHandle> previousStateHandles = new LinkedList<>();
-			SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
-			for (int checkpointId = 0; checkpointId < 3; ++checkpointId) {
+				Queue<IncrementalKeyedStateHandle> previousStateHandles = new LinkedList<>();
+				SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry());
+				for (int checkpointId = 0; checkpointId < 3; ++checkpointId) {
 
-				reset(sharedStateRegistry);
+					reset(sharedStateRegistry);
 
-				backend.setCurrentKey(checkpointId);
-				state.update("Hello-" + checkpointId);
+					backend.setCurrentKey(checkpointId);
+					state.update("Hello-" + checkpointId);
 
-				RunnableFuture<KeyedStateHandle> snapshot = backend.snapshot(
-					checkpointId,
-					checkpointId,
-					createStreamFactory(),
-					CheckpointOptions.forFullCheckpoint());
+					RunnableFuture<KeyedStateHandle> snapshot = backend.snapshot(
+						checkpointId,
+						checkpointId,
+						createStreamFactory(),
+						CheckpointOptions.forFullCheckpoint());
 
-				snapshot.run();
+					snapshot.run();
 
-				IncrementalKeyedStateHandle stateHandle = (IncrementalKeyedStateHandle) snapshot.get();
-				Map<StateHandleID, StreamStateHandle> sharedState =
-					new HashMap<>(stateHandle.getSharedState());
+					IncrementalKeyedStateHandle stateHandle = (IncrementalKeyedStateHandle) snapshot.get();
+					Map<StateHandleID, StreamStateHandle> sharedState =
+						new HashMap<>(stateHandle.getSharedState());
 
-				stateHandle.registerSharedStates(sharedStateRegistry);
+					stateHandle.registerSharedStates(sharedStateRegistry);
 
-				for (Map.Entry<StateHandleID, StreamStateHandle> e : sharedState.entrySet()) {
-					verify(sharedStateRegistry).registerReference(
-						stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()),
-						e.getValue());
-				}
+					for (Map.Entry<StateHandleID, StreamStateHandle> e : sharedState.entrySet()) {
+						verify(sharedStateRegistry).registerReference(
+							stateHandle.createSharedStateRegistryKeyFromFileName(e.getKey()),
+							e.getValue());
+					}
 
-				previousStateHandles.add(stateHandle);
-				backend.notifyCheckpointComplete(checkpointId);
+					previousStateHandles.add(stateHandle);
+					backend.notifyCheckpointComplete(checkpointId);
 
-				//-----------------------------------------------------------------
+					//-----------------------------------------------------------------
 
-				if (previousStateHandles.size() > 1) {
-					checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+					if (previousStateHandles.size() > 1) {
+						checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+					}
 				}
-			}
 
-			while (!previousStateHandles.isEmpty()) {
+				while (!previousStateHandles.isEmpty()) {
 
-				reset(sharedStateRegistry);
+					reset(sharedStateRegistry);
 
-				checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+					checkRemove(previousStateHandles.remove(), sharedStateRegistry);
+				}
+			} finally {
+				IOUtils.closeQuietly(backend);
+				backend.dispose();
 			}
-
-			backend.close();
-			backend.dispose();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ca87bec4/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 6debff7..f6f73f2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -18,10 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -64,14 +60,17 @@ import org.apache.flink.runtime.state.heap.StateTable;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
+import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
-
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -177,17 +176,15 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			Environment env) throws Exception {
 
 		AbstractKeyedStateBackend<K> backend = getStateBackend().createKeyedStateBackend(
-				env,
-				new JobID(),
-				"test_op",
-				keySerializer,
-				numberOfKeyGroups,
-				keyGroupRange,
-				env.getTaskKvStateRegistry());
+			env,
+			new JobID(),
+			"test_op",
+			keySerializer,
+			numberOfKeyGroups,
+			keyGroupRange,
+			env.getTaskKvStateRegistry());
 
-		if (null != state) {
-			backend.restore(state);
-		}
+		backend.restore(state);
 
 		return backend;
 	}
@@ -244,6 +241,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 
 		assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+		backend.dispose();
 	}
 
 	@Test
@@ -303,6 +301,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 
 		assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+		backend.dispose();
 	}
 
 	@Test
@@ -356,6 +355,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 
 		assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+		backend.dispose();
 	}
 
 	@Test
@@ -411,6 +411,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		}
 
 		assertEquals("Didn't see the expected Kryo exception.", 1, numExceptions);
+		backend.dispose();
 	}
 
 
@@ -488,81 +489,91 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		CheckpointStreamFactory streamFactory = createStreamFactory();
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		Environment env = new DummyEnvironment("test", 1, 0);
-		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+		AbstractKeyedStateBackend<Integer> backend = null;
 
-		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+		try {
+			backend = createKeyedBackend(IntSerializer.INSTANCE, env);
 
-		// make sure that we are in fact using the KryoSerializer
-		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+			TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
 
-		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+			// make sure that we are in fact using the KryoSerializer
+			assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
 
-		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
 
-		// ============== create snapshot - no Kryo registration or specific / default serializers ==============
+			ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
-		// make some more modifications
-		backend.setCurrentKey(1);
-		state.update(new TestPojo("u1", 1));
+			// ============== create snapshot - no Kryo registration or specific / default serializers ==============
 
-		backend.setCurrentKey(2);
-		state.update(new TestPojo("u2", 2));
+			// make some more modifications
+			backend.setCurrentKey(1);
+			state.update(new TestPojo("u1", 1));
 
-		KeyedStateHandle snapshot = runSnapshot(backend.snapshot(
+			backend.setCurrentKey(2);
+			state.update(new TestPojo("u2", 2));
+
+			KeyedStateHandle snapshot = runSnapshot(backend.snapshot(
 				682375462378L,
 				2,
 				streamFactory,
 				CheckpointOptions.forFullCheckpoint()));
 
-		snapshot.registerSharedStates(sharedStateRegistry);
-		backend.dispose();
+			snapshot.registerSharedStates(sharedStateRegistry);
+			backend.dispose();
 
-		// ========== restore snapshot - should use default serializer (ONLY SERIALIZATION) ==========
+			// ========== restore snapshot - should use default serializer (ONLY SERIALIZATION) ==========
 
-		// cast because our test serializer is not typed to TestPojo
-		env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class);
+			// cast because our test serializer is not typed to TestPojo
+			env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class);
 
-		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
 
-		// re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
-		// initializeSerializerUnlessSet would not pick up our new config
-		kvId = new ValueStateDescriptor<>("id", pojoType);
-		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			// re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
+			// initializeSerializerUnlessSet would not pick up our new config
+			kvId = new ValueStateDescriptor<>("id", pojoType);
+			state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
-		backend.setCurrentKey(1);
+			backend.setCurrentKey(1);
 
-		// update to test state backends that eagerly serialize, such as RocksDB
-		state.update(new TestPojo("u1", 11));
+			// update to test state backends that eagerly serialize, such as RocksDB
+			state.update(new TestPojo("u1", 11));
 
-		KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(
+			KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(
 				682375462378L,
 				2,
 				streamFactory,
 				CheckpointOptions.forFullCheckpoint()));
 
-		snapshot2.registerSharedStates(sharedStateRegistry);
+			snapshot2.registerSharedStates(sharedStateRegistry);
+			snapshot.discardState();
 
-		snapshot.discardState();
+			backend.dispose();
 
-		backend.dispose();
+			// ========= restore snapshot - should use default serializer (FAIL ON DESERIALIZATION) =========
 
-		// ========= restore snapshot - should use default serializer (FAIL ON DESERIALIZATION) =========
+			// cast because our test serializer is not typed to TestPojo
+			env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class);
 
-		// cast because our test serializer is not typed to TestPojo
-		env.getExecutionConfig().addDefaultKryoSerializer(TestPojo.class, (Class) CustomKryoTestSerializer.class);
+			// on the second restore, since the custom serializer will be used for
+			// deserialization, we expect the deliberate failure to be thrown
+			expectedException.expect(ExpectedKryoTestException.class);
 
-		// on the second restore, since the custom serializer will be used for
-		// deserialization, we expect the deliberate failure to be thrown
-		expectedException.expect(ExpectedKryoTestException.class);
+			// state backends that eagerly deserializes (such as the memory state backend) will fail here
+			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
 
-		// state backends that eagerly deserializes (such as the memory state backend) will fail here
-		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
+			state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
-		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			backend.setCurrentKey(1);
+			// state backends that lazily deserializes (such as RocksDB) will fail here
+			state.value();
 
-		backend.setCurrentKey(1);
-		// state backends that lazily deserializes (such as RocksDB) will fail here
-		state.value();
+			snapshot2.discardState();
+			backend.dispose();
+		} finally {
+			// ensure to release native resources even when we exit through exception
+			IOUtils.closeQuietly(backend);
+			backend.dispose();
+		}
 	}
 
 	/**
@@ -581,78 +592,89 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		Environment env = new DummyEnvironment("test", 1, 0);
 
-		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env);
+		AbstractKeyedStateBackend<Integer> backend = null;
 
-		TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
+		try {
+			backend = createKeyedBackend(IntSerializer.INSTANCE, env);
 
-		// make sure that we are in fact using the KryoSerializer
-		assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
+			TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class);
 
-		ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
-		ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			// make sure that we are in fact using the KryoSerializer
+			assertTrue(pojoType.createSerializer(env.getExecutionConfig()) instanceof KryoSerializer);
 
-		// ============== create snapshot - no Kryo registration or specific / default serializers ==============
+			ValueStateDescriptor<TestPojo> kvId = new ValueStateDescriptor<>("id", pojoType);
+			ValueState<TestPojo> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
-		// make some more modifications
-		backend.setCurrentKey(1);
-		state.update(new TestPojo("u1", 1));
+			// ============== create snapshot - no Kryo registration or specific / default serializers ==============
 
-		backend.setCurrentKey(2);
-		state.update(new TestPojo("u2", 2));
+			// make some more modifications
+			backend.setCurrentKey(1);
+			state.update(new TestPojo("u1", 1));
 
-		KeyedStateHandle snapshot = runSnapshot(backend.snapshot(
+			backend.setCurrentKey(2);
+			state.update(new TestPojo("u2", 2));
+
+			KeyedStateHandle snapshot = runSnapshot(backend.snapshot(
 				682375462378L,
 				2,
 				streamFactory,
 				CheckpointOptions.forFullCheckpoint()));
 
-		snapshot.registerSharedStates(sharedStateRegistry);
-		backend.dispose();
+			snapshot.registerSharedStates(sharedStateRegistry);
+			backend.dispose();
 
-		// ========== restore snapshot - should use specific serializer (ONLY SERIALIZATION) ==========
+			// ========== restore snapshot - should use specific serializer (ONLY SERIALIZATION) ==========
 
-		env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
+			env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
 
-		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
+			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env);
 
-		// re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
-		// initializeSerializerUnlessSet would not pick up our new config
-		kvId = new ValueStateDescriptor<>("id", pojoType);
-		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			// re-initialize to ensure that we create the KryoSerializer from scratch, otherwise
+			// initializeSerializerUnlessSet would not pick up our new config
+			kvId = new ValueStateDescriptor<>("id", pojoType);
+			state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
-		backend.setCurrentKey(1);
+			backend.setCurrentKey(1);
 
-		// update to test state backends that eagerly serialize, such as RocksDB
-		state.update(new TestPojo("u1", 11));
+			// update to test state backends that eagerly serialize, such as RocksDB
+			state.update(new TestPojo("u1", 11));
 
-		KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(
+			KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(
 				682375462378L,
 				2,
 				streamFactory,
 				CheckpointOptions.forFullCheckpoint()));
 
-		snapshot2.registerSharedStates(sharedStateRegistry);
+			snapshot2.registerSharedStates(sharedStateRegistry);
 
-		snapshot.discardState();
+			snapshot.discardState();
 
-		backend.dispose();
+			backend.dispose();
 
-		// ========= restore snapshot - should use specific serializer (FAIL ON DESERIALIZATION) =========
+			// ========= restore snapshot - should use specific serializer (FAIL ON DESERIALIZATION) =========
 
-		env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
+			env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, CustomKryoTestSerializer.class);
 
-		// on the second restore, since the custom serializer will be used for
-		// deserialization, we expect the deliberate failure to be thrown
-		expectedException.expect(ExpectedKryoTestException.class);
+			// on the second restore, since the custom serializer will be used for
+			// deserialization, we expect the deliberate failure to be thrown
+			expectedException.expect(ExpectedKryoTestException.class);
 
-		// state backends that eagerly deserializes (such as the memory state backend) will fail here
-		backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
+			// state backends that eagerly deserializes (such as the memory state backend) will fail here
+			backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2, env);
 
-		state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
+			state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
 
-		backend.setCurrentKey(1);
-		// state backends that lazily deserializes (such as RocksDB) will fail here
-		state.value();
+			backend.setCurrentKey(1);
+			// state backends that lazily deserializes (such as RocksDB) will fail here
+			state.value();
+
+			backend.dispose();
+		} finally {
+			// ensure that native resources are also released in case of exception
+			if (backend != null) {
+				backend.dispose();
+			}
+		}
 	}
 
 	@Test
@@ -1726,7 +1748,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		final int MAX_PARALLELISM = 10;
 
 		CheckpointStreamFactory streamFactory = createStreamFactory();
-		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(
+		final AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(
 				IntSerializer.INSTANCE,
 				MAX_PARALLELISM,
 				new KeyGroupRange(0, MAX_PARALLELISM - 1),
@@ -1770,7 +1792,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		backend.dispose();
 
 		// backend for the first half of the key group range
-		AbstractKeyedStateBackend<Integer> firstHalfBackend = restoreKeyedBackend(
+		final AbstractKeyedStateBackend<Integer> firstHalfBackend = restoreKeyedBackend(
 				IntSerializer.INSTANCE,
 				MAX_PARALLELISM,
 				new KeyGroupRange(0, 4),
@@ -1778,7 +1800,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 				new DummyEnvironment("test", 1, 0));
 
 		// backend for the second half of the key group range
-		AbstractKeyedStateBackend<Integer> secondHalfBackend = restoreKeyedBackend(
+		final AbstractKeyedStateBackend<Integer> secondHalfBackend = restoreKeyedBackend(
 				IntSerializer.INSTANCE,
 				MAX_PARALLELISM,
 				new KeyGroupRange(5, 9),
@@ -2017,7 +2039,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 
 	@Test
 	public void testCopyDefaultValue() throws Exception {
-		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+		final AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
 
@@ -2044,7 +2066,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	 */
 	@Test
 	public void testRequireNonNullNamespace() throws Exception {
-		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+		final AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 		ValueStateDescriptor<IntValue> kvId = new ValueStateDescriptor<>("id", IntValue.class, new IntValue(-1));
 
@@ -2076,7 +2098,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 	@SuppressWarnings("unchecked")
 	protected void testConcurrentMapIfQueryable() throws Exception {
 		final int numberOfKeyGroups = 1;
-		AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(
+		final AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(
 				IntSerializer.INSTANCE,
 				numberOfKeyGroups,
 				new KeyGroupRange(0, 0),
@@ -2384,9 +2406,9 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 		streamFactory.setBlockerLatch(blocker);
 		streamFactory.setAfterNumberInvocations(10);
 
-		AbstractKeyedStateBackend<Integer> backend = null;
+		final AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
+
 		try {
-			backend = createKeyedBackend(IntSerializer.INSTANCE);
 
 			if (!backend.supportsAsynchronousSnapshots()) {
 				return;
@@ -2413,14 +2435,11 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			waiter.await();
 
 			// close the backend to see if the close is propagated to the stream
-			backend.close();
+			IOUtils.closeQuietly(backend);
 
 			//unblock the stream so that it can run into the IOException
 			blocker.trigger();
 
-			//dispose the backend
-			backend.dispose();
-
 			runner.join();
 
 			try {
@@ -2431,10 +2450,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten
 			}
 
 		} finally {
-			if (null != backend) {
-				IOUtils.closeQuietly(backend);
-				backend.dispose();
-			}
+			backend.dispose();
 		}
 	}
 


Mime
View raw message