flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [04/27] flink git commit: [FLINK-3761] Refactor RocksDB Backend/Make Key-Group Aware
Date Wed, 31 Aug 2016 17:28:22 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
new file mode 100644
index 0000000..1cb3b2b
--- /dev/null
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMergeIteratorTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.junit.Assert;
+import org.junit.Test;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+
+public class RocksDBMergeIteratorTest {
+
+	private static final int NUM_KEY_VAL_STATES = 50;
+	private static final int MAX_NUM_KEYS = 20;
+
+	@Test
+	public void testEmptyMergeIterator() throws IOException {
+		RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
+				new RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.EMPTY_LIST, 2);
+		Assert.assertFalse(emptyIterator.isValid());
+	}
+
+	@Test
+	public void testMergeIterator() throws Exception {
+		Assert.assertTrue(MAX_NUM_KEYS <= Byte.MAX_VALUE);
+
+		testMergeIterator(Byte.MAX_VALUE);
+		testMergeIterator(Short.MAX_VALUE);
+	}
+
+	public void testMergeIterator(int maxParallelism) throws Exception {
+		Random random = new Random(1234);
+
+		File tmpDir = CommonTestUtils.createTempDirectory();
+
+		RocksDB rocksDB = RocksDB.open(tmpDir.getAbsolutePath());
+		try {
+			List<Tuple2<RocksIterator, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
+			List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount
= new ArrayList<>();
+
+			int totalKeysExpected = 0;
+
+			for (int c = 0; c < NUM_KEY_VAL_STATES; ++c) {
+				ColumnFamilyHandle handle = rocksDB.createColumnFamily(
+						new ColumnFamilyDescriptor(("column-" + c).getBytes()));
+
+				ByteArrayOutputStreamWithPos bos = new ByteArrayOutputStreamWithPos();
+				DataOutputStream dos = new DataOutputStream(bos);
+
+				int numKeys = random.nextInt(MAX_NUM_KEYS + 1);
+
+				for (int i = 0; i < numKeys; ++i) {
+					if (maxParallelism <= Byte.MAX_VALUE) {
+						dos.writeByte(i);
+					} else {
+						dos.writeShort(i);
+					}
+					dos.writeInt(i);
+					byte[] key = bos.toByteArray();
+					byte[] val = new byte[]{42};
+					rocksDB.put(handle, key, val);
+
+					bos.reset();
+				}
+				columnFamilyHandlesWithKeyCount.add(new Tuple2<>(handle, numKeys));
+				totalKeysExpected += numKeys;
+			}
+
+			int id = 0;
+			for (Tuple2<ColumnFamilyHandle, Integer> columnFamilyHandle : columnFamilyHandlesWithKeyCount)
{
+				rocksIteratorsWithKVStateId.add(new Tuple2<>(rocksDB.newIterator(columnFamilyHandle.f0),
id));
+				++id;
+			}
+
+			RocksDBKeyedStateBackend.RocksDBMergeIterator mergeIterator = new RocksDBKeyedStateBackend.RocksDBMergeIterator(rocksIteratorsWithKVStateId,
maxParallelism <= Byte.MAX_VALUE ? 1 : 2);
+
+			int prevKVState = -1;
+			int prevKey = -1;
+			int prevKeyGroup = -1;
+			int totalKeysActual = 0;
+
+			while (mergeIterator.isValid()) {
+				ByteBuffer bb = ByteBuffer.wrap(mergeIterator.key());
+
+				int keyGroup = maxParallelism > Byte.MAX_VALUE ? bb.getShort() : bb.get();
+				int key = bb.getInt();
+
+				Assert.assertTrue(keyGroup >= prevKeyGroup);
+				Assert.assertTrue(key >= prevKey);
+				Assert.assertEquals(prevKeyGroup != keyGroup, mergeIterator.isNewKeyGroup());
+				Assert.assertEquals(prevKVState != mergeIterator.kvStateId(), mergeIterator.isNewKeyValueState());
+
+				prevKeyGroup = keyGroup;
+				prevKVState = mergeIterator.kvStateId();
+
+				//System.out.println(keyGroup + " " + key + " " + mergeIterator.kvStateId());
+				mergeIterator.next();
+				++totalKeysActual;
+			}
+
+			Assert.assertEquals(totalKeysExpected, totalKeysActual);
+
+			for (Tuple2<ColumnFamilyHandle, Integer> handleWithCount : columnFamilyHandlesWithKeyCount)
{
+				rocksDB.dropColumnFamily(handleWithCount.f0);
+			}
+		} finally {
+			rocksDB.close();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 6f4a983..acf6cb8 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -1,322 +1,368 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *    http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.contrib.streaming.state;
-//
-//import org.apache.commons.io.FileUtils;
-//import org.apache.flink.api.common.JobID;
-//import org.apache.flink.api.common.TaskInfo;
-//import org.apache.flink.api.common.state.ValueStateDescriptor;
-//import org.apache.flink.api.common.typeutils.TypeSerializer;
-//import org.apache.flink.api.common.typeutils.base.IntSerializer;
-//import org.apache.flink.runtime.execution.Environment;
-//import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-//import org.apache.flink.runtime.state.AbstractStateBackend;
-//
-//import org.apache.flink.runtime.state.VoidNamespace;
-//import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-//import org.apache.flink.util.OperatingSystem;
-//import org.junit.Assume;
-//import org.junit.Before;
-//import org.junit.Test;
-//
-//import org.rocksdb.ColumnFamilyOptions;
-//import org.rocksdb.CompactionStyle;
-//import org.rocksdb.DBOptions;
-//
-//import java.io.File;
-//import java.util.UUID;
-//
-//import static org.junit.Assert.*;
-//import static org.mockito.Mockito.*;
-//
-///**
-// * Tests for configuring the RocksDB State Backend
-// */
-//@SuppressWarnings("serial")
-//public class RocksDBStateBackendConfigTest {
-//
-//	private static final String TEMP_URI = new File(System.getProperty("java.io.tmpdir")).toURI().toString();
-//
-//	@Before
-//	public void checkOperatingSystem() {
-//		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
-//	}
-//
-//	// ------------------------------------------------------------------------
-//	//  RocksDB local file directory
-//	// ------------------------------------------------------------------------
-//
-//	@Test
-//	public void testSetDbPath() throws Exception {
-//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-//
-//		assertNull(rocksDbBackend.getDbStoragePaths());
-//
-//		rocksDbBackend.setDbStoragePath("/abc/def");
-//		assertArrayEquals(new String[] { "/abc/def" }, rocksDbBackend.getDbStoragePaths());
-//
-//		rocksDbBackend.setDbStoragePath(null);
-//		assertNull(rocksDbBackend.getDbStoragePaths());
-//
-//		rocksDbBackend.setDbStoragePaths("/abc/def", "/uvw/xyz");
-//		assertArrayEquals(new String[] { "/abc/def", "/uvw/xyz" }, rocksDbBackend.getDbStoragePaths());
-//
-//		//noinspection NullArgumentToVariableArgMethod
-//		rocksDbBackend.setDbStoragePaths(null);
-//		assertNull(rocksDbBackend.getDbStoragePaths());
-//	}
-//
-//	@Test(expected = IllegalArgumentException.class)
-//	public void testSetNullPaths() throws Exception {
-//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-//		rocksDbBackend.setDbStoragePaths();
-//	}
-//
-//	@Test(expected = IllegalArgumentException.class)
-//	public void testNonFileSchemePath() throws Exception {
-//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-//		rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition");
-//	}
-//
-//	// ------------------------------------------------------------------------
-//	//  RocksDB local file automatic from temp directories
-//	// ------------------------------------------------------------------------
-//
-//	@Test
-//	public void testUseTempDirectories() throws Exception {
-//		File dir1 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
-//		File dir2 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
-//
-//		File[] tempDirs = new File[] { dir1, dir2 };
-//
-//		try {
-//			assertTrue(dir1.mkdirs());
-//			assertTrue(dir2.mkdirs());
-//
-//			RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-//			assertNull(rocksDbBackend.getDbStoragePaths());
-//
-//			rocksDbBackend.initializeForJob(getMockEnvironment(tempDirs), "foobar", IntSerializer.INSTANCE);
-//			assertArrayEquals(tempDirs, rocksDbBackend.getStoragePaths());
-//		}
-//		finally {
-//			FileUtils.deleteDirectory(dir1);
-//			FileUtils.deleteDirectory(dir2);
-//		}
-//	}
-//
-//	// ------------------------------------------------------------------------
-//	//  RocksDB local file directory initialization
-//	// ------------------------------------------------------------------------
-//
-//	@Test
-//	public void testFailWhenNoLocalStorageDir() throws Exception {
-//		File targetDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
-//		try {
-//			assertTrue(targetDir.mkdirs());
-//
-//			if (!targetDir.setWritable(false, false)) {
-//				System.err.println("Cannot execute 'testFailWhenNoLocalStorageDir' because cannot mark
directory non-writable");
-//				return;
-//			}
-//
-//			RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-//			rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());
-//
-//			boolean hasFailure = false;
-//			try {
-//				rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE);
-//			}
-//			catch (Exception e) {
-//				assertTrue(e.getMessage().contains("No local storage directories available"));
-//				assertTrue(e.getMessage().contains(targetDir.getAbsolutePath()));
-//				hasFailure = true;
-//			}
-//			assertTrue("We must see a failure because no storaged directory is feasible.", hasFailure);
-//		}
-//		finally {
-//			//noinspection ResultOfMethodCallIgnored
-//			targetDir.setWritable(true, false);
-//			FileUtils.deleteDirectory(targetDir);
-//		}
-//	}
-//
-//	@Test
-//	public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
-//		File targetDir1 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
-//		File targetDir2 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
-//
-//		try {
-//			assertTrue(targetDir1.mkdirs());
-//			assertTrue(targetDir2.mkdirs());
-//
-//			if (!targetDir1.setWritable(false, false)) {
-//				System.err.println("Cannot execute 'testContinueOnSomeDbDirectoriesMissing' because
cannot mark directory non-writable");
-//				return;
-//			}
-//
-//			RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-//			rocksDbBackend.setDbStoragePaths(targetDir1.getAbsolutePath(), targetDir2.getAbsolutePath());
-//
-//			try {
-//				rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE);
-//
-//				// actually get a state to see whether we can write to the storage directory
-//				rocksDbBackend.getPartitionedState(
-//						VoidNamespace.INSTANCE,
-//						VoidNamespaceSerializer.INSTANCE,
-//						new ValueStateDescriptor<>("test", String.class, ""));
-//			}
-//			catch (Exception e) {
-//				e.printStackTrace();
-//				fail("Backend initialization failed even though some paths were available");
-//			}
-//		} finally {
-//			//noinspection ResultOfMethodCallIgnored
-//			targetDir1.setWritable(true, false);
-//			FileUtils.deleteDirectory(targetDir1);
-//			FileUtils.deleteDirectory(targetDir2);
-//		}
-//	}
-//
-//	// ------------------------------------------------------------------------
-//	//  RocksDB Options
-//	// ------------------------------------------------------------------------
-//
-//	@Test
-//	public void testPredefinedOptions() throws Exception {
-//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-//
-//		assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions());
-//
-//		rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
-//		assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
-//
-//		DBOptions opt1 = rocksDbBackend.getDbOptions();
-//		DBOptions opt2 = rocksDbBackend.getDbOptions();
-//
-//		assertEquals(opt1, opt2);
-//
-//		ColumnFamilyOptions columnOpt1 = rocksDbBackend.getColumnOptions();
-//		ColumnFamilyOptions columnOpt2 = rocksDbBackend.getColumnOptions();
-//
-//		assertEquals(columnOpt1, columnOpt2);
-//
-//		assertEquals(CompactionStyle.LEVEL, columnOpt1.compactionStyle());
-//	}
-//
-//	@Test
-//	public void testOptionsFactory() throws Exception {
-//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-//
-//		rocksDbBackend.setOptions(new OptionsFactory() {
-//			@Override
-//			public DBOptions createDBOptions(DBOptions currentOptions) {
-//				return currentOptions;
-//			}
-//
-//			@Override
-//			public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
-//				return currentOptions.setCompactionStyle(CompactionStyle.FIFO);
-//			}
-//		});
-//
-//		assertNotNull(rocksDbBackend.getOptions());
-//		assertEquals(CompactionStyle.FIFO, rocksDbBackend.getColumnOptions().compactionStyle());
-//	}
-//
-//	@Test
-//	public void testPredefinedAndOptionsFactory() throws Exception {
-//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-//
-//		assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions());
-//
-//		rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
-//		rocksDbBackend.setOptions(new OptionsFactory() {
-//			@Override
-//			public DBOptions createDBOptions(DBOptions currentOptions) {
-//				return currentOptions;
-//			}
-//
-//			@Override
-//			public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
-//				return currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
-//			}
-//		});
-//
-//		assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
-//		assertNotNull(rocksDbBackend.getOptions());
-//		assertEquals(CompactionStyle.UNIVERSAL, rocksDbBackend.getColumnOptions().compactionStyle());
-//	}
-//
-//	@Test
-//	public void testPredefinedOptionsEnum() {
-//		for (PredefinedOptions o : PredefinedOptions.values()) {
-//			DBOptions opt = o.createDBOptions();
-//			try {
-//				assertNotNull(opt);
-//			} finally {
-//				opt.dispose();
-//			}
-//		}
-//	}
-//
-//	// ------------------------------------------------------------------------
-//	//  Contained Non-partitioned State Backend
-//	// ------------------------------------------------------------------------
-//
-//	@Test
-//	public void testCallsForwardedToNonPartitionedBackend() throws Exception {
-//		AbstractStateBackend nonPartBackend = mock(AbstractStateBackend.class);
-//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI, nonPartBackend);
-//
-//		rocksDbBackend.initializeForJob(getMockEnvironment(), "foo", IntSerializer.INSTANCE);
-//		verify(nonPartBackend, times(1)).initializeForJob(any(Environment.class), anyString(),
any(TypeSerializer.class));
-//
-//		rocksDbBackend.disposeAllStateForCurrentJob();
-//		verify(nonPartBackend, times(1)).disposeAllStateForCurrentJob();
-//
-//		rocksDbBackend.close();
-//		verify(nonPartBackend, times(1)).close();
-//	}
-//
-//	// ------------------------------------------------------------------------
-//	//  Utilities
-//	// ------------------------------------------------------------------------
-//
-//	private static Environment getMockEnvironment() {
-//		return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir"))
});
-//	}
-//
-//	private static Environment getMockEnvironment(File[] tempDirs) {
-//		IOManager ioMan = mock(IOManager.class);
-//		when(ioMan.getSpillingDirectories()).thenReturn(tempDirs);
-//
-//		Environment env = mock(Environment.class);
-//		when(env.getJobID()).thenReturn(new JobID());
-//		when(env.getUserClassLoader()).thenReturn(RocksDBStateBackendConfigTest.class.getClassLoader());
-//		when(env.getIOManager()).thenReturn(ioMan);
-//
-//		TaskInfo taskInfo = mock(TaskInfo.class);
-//		when(env.getTaskInfo()).thenReturn(taskInfo);
-//
-//		when(taskInfo.getIndexOfThisSubtask()).thenReturn(0);
-//		return env;
-//	}
-//}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.TaskInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.query.KvStateRegistry;
+
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.HashKeyGroupAssigner;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.util.OperatingSystem;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.junit.rules.TemporaryFolder;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.CompactionStyle;
+import org.rocksdb.DBOptions;
+
+import java.io.File;
+
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.startsWith;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+
+/**
+ * Tests for configuring the RocksDB State Backend
+ */
+@SuppressWarnings("serial")
+public class RocksDBStateBackendConfigTest {
+
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void checkOperatingSystem() {
+		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
+	}
+
+	// ------------------------------------------------------------------------
+	//  RocksDB local file directory
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testSetDbPath() throws Exception {
+		String checkpointPath = tempFolder.newFolder().toURI().toString();
+		File testDir1 = tempFolder.newFolder();
+		File testDir2 = tempFolder.newFolder();
+
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+
+		assertNull(rocksDbBackend.getDbStoragePaths());
+
+		rocksDbBackend.setDbStoragePath(testDir1.getAbsolutePath());
+		assertArrayEquals(new String[] { testDir1.getAbsolutePath() }, rocksDbBackend.getDbStoragePaths());
+
+		rocksDbBackend.setDbStoragePath(null);
+		assertNull(rocksDbBackend.getDbStoragePaths());
+
+		rocksDbBackend.setDbStoragePaths(testDir1.getAbsolutePath(), testDir2.getAbsolutePath());
+		assertArrayEquals(new String[] { testDir1.getAbsolutePath(), testDir2.getAbsolutePath()
}, rocksDbBackend.getDbStoragePaths());
+
+		Environment env = getMockEnvironment(new File[] {});
+		RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>)
rocksDbBackend.createKeyedStateBackend(env,
+				env.getJobID(),
+				"test_op",
+				IntSerializer.INSTANCE,
+				new HashKeyGroupAssigner<Integer>(1),
+				new KeyGroupRange(0, 0),
+				env.getTaskKvStateRegistry());
+
+
+		File instanceBasePath = keyedBackend.getInstanceBasePath();
+		assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()),
startsWith(testDir2.getAbsolutePath())));
+
+		//noinspection NullArgumentToVariableArgMethod
+		rocksDbBackend.setDbStoragePaths(null);
+		assertNull(rocksDbBackend.getDbStoragePaths());
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testSetNullPaths() throws Exception {
+		String checkpointPath = tempFolder.newFolder().toURI().toString();
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+		rocksDbBackend.setDbStoragePaths();
+	}
+
+	@Test(expected = IllegalArgumentException.class)
+	public void testNonFileSchemePath() throws Exception {
+		String checkpointPath = tempFolder.newFolder().toURI().toString();
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+		rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition");
+	}
+
+	// ------------------------------------------------------------------------
+	//  RocksDB local file automatic from temp directories
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This tests whether the RocksDB backends uses the temp directories that are provided
+	 * from the {@link Environment} when no db storage path is set.
+	 *
+	 * @throws Exception
+	 */
+	@Test
+	public void testUseTempDirectories() throws Exception {
+		String checkpointPath = tempFolder.newFolder().toURI().toString();
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+
+		File dir1 = tempFolder.newFolder();
+		File dir2 = tempFolder.newFolder();
+
+		File[] tempDirs = new File[] { dir1, dir2 };
+
+		assertNull(rocksDbBackend.getDbStoragePaths());
+
+		Environment env = getMockEnvironment(tempDirs);
+		RocksDBKeyedStateBackend<Integer> keyedBackend = (RocksDBKeyedStateBackend<Integer>)
rocksDbBackend.createKeyedStateBackend(env,
+				env.getJobID(),
+				"test_op",
+				IntSerializer.INSTANCE,
+				new HashKeyGroupAssigner<Integer>(1),
+				new KeyGroupRange(0, 0),
+				env.getTaskKvStateRegistry());
+
+
+		File instanceBasePath = keyedBackend.getInstanceBasePath();
+		assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()),
startsWith(dir2.getAbsolutePath())));
+	}
+
+	// ------------------------------------------------------------------------
+	//  RocksDB local file directory initialization
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testFailWhenNoLocalStorageDir() throws Exception {
+		String checkpointPath = tempFolder.newFolder().toURI().toString();
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+		File targetDir = tempFolder.newFolder();
+
+		try {
+			if (!targetDir.setWritable(false, false)) {
+				System.err.println("Cannot execute 'testFailWhenNoLocalStorageDir' because cannot mark
directory non-writable");
+				return;
+			}
+
+			rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());
+
+			boolean hasFailure = false;
+			try {
+				Environment env = getMockEnvironment();
+				rocksDbBackend.createKeyedStateBackend(
+						env,
+						env.getJobID(),
+						"foobar",
+						IntSerializer.INSTANCE,
+						new HashKeyGroupAssigner<Integer>(1),
+						new KeyGroupRange(0, 0),
+						new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
+			}
+			catch (Exception e) {
+				assertTrue(e.getMessage().contains("No local storage directories available"));
+				assertTrue(e.getMessage().contains(targetDir.getAbsolutePath()));
+				hasFailure = true;
+			}
+			assertTrue("We must see a failure because no storaged directory is feasible.", hasFailure);
+		}
+		finally {
+			//noinspection ResultOfMethodCallIgnored
+			targetDir.setWritable(true, false);
+			FileUtils.deleteDirectory(targetDir);
+		}
+	}
+
+	@Test
+	public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
+		File targetDir1 = tempFolder.newFolder();
+		File targetDir2 = tempFolder.newFolder();
+
+		String checkpointPath = tempFolder.newFolder().toURI().toString();
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+
+		try {
+
+			if (!targetDir1.setWritable(false, false)) {
+				System.err.println("Cannot execute 'testContinueOnSomeDbDirectoriesMissing' because cannot
mark directory non-writable");
+				return;
+			}
+
+			rocksDbBackend.setDbStoragePaths(targetDir1.getAbsolutePath(), targetDir2.getAbsolutePath());
+
+			try {
+				Environment env = getMockEnvironment();
+				rocksDbBackend.createKeyedStateBackend(
+						env,
+						env.getJobID(),
+						"foobar",
+						IntSerializer.INSTANCE,
+						new HashKeyGroupAssigner<Integer>(1),
+						new KeyGroupRange(0, 0),
+						new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
+			}
+			catch (Exception e) {
+				e.printStackTrace();
+				fail("Backend initialization failed even though some paths were available");
+			}
+		} finally {
+			//noinspection ResultOfMethodCallIgnored
+			targetDir1.setWritable(true, false);
+			FileUtils.deleteDirectory(targetDir1);
+			FileUtils.deleteDirectory(targetDir2);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  RocksDB Options
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testPredefinedOptions() throws Exception {
+		String checkpointPath = tempFolder.newFolder().toURI().toString();
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+
+		assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions());
+
+		rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
+		assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
+
+		DBOptions opt1 = rocksDbBackend.getDbOptions();
+		DBOptions opt2 = rocksDbBackend.getDbOptions();
+
+		assertEquals(opt1, opt2);
+
+		ColumnFamilyOptions columnOpt1 = rocksDbBackend.getColumnOptions();
+		ColumnFamilyOptions columnOpt2 = rocksDbBackend.getColumnOptions();
+
+		assertEquals(columnOpt1, columnOpt2);
+
+		assertEquals(CompactionStyle.LEVEL, columnOpt1.compactionStyle());
+	}
+
+	@Test
+	public void testOptionsFactory() throws Exception {
+		String checkpointPath = tempFolder.newFolder().toURI().toString();
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+
+		rocksDbBackend.setOptions(new OptionsFactory() {
+			@Override
+			public DBOptions createDBOptions(DBOptions currentOptions) {
+				return currentOptions;
+			}
+
+			@Override
+			public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
+				return currentOptions.setCompactionStyle(CompactionStyle.FIFO);
+			}
+		});
+
+		assertNotNull(rocksDbBackend.getOptions());
+		assertEquals(CompactionStyle.FIFO, rocksDbBackend.getColumnOptions().compactionStyle());
+	}
+
+	@Test
+	public void testPredefinedAndOptionsFactory() throws Exception {
+		String checkpointPath = tempFolder.newFolder().toURI().toString();
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath);
+
+		assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions());
+
+		rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
+		rocksDbBackend.setOptions(new OptionsFactory() {
+			@Override
+			public DBOptions createDBOptions(DBOptions currentOptions) {
+				return currentOptions;
+			}
+
+			@Override
+			public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
+				return currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
+			}
+		});
+
+		assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
+		assertNotNull(rocksDbBackend.getOptions());
+		assertEquals(CompactionStyle.UNIVERSAL, rocksDbBackend.getColumnOptions().compactionStyle());
+	}
+
+	@Test
+	public void testPredefinedOptionsEnum() {
+		for (PredefinedOptions o : PredefinedOptions.values()) {
+			DBOptions opt = o.createDBOptions();
+			try {
+				assertNotNull(opt);
+			} finally {
+				opt.dispose();
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Contained Non-partitioned State Backend
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testCallsForwardedToNonPartitionedBackend() throws Exception {
+		AbstractStateBackend nonPartBackend = mock(AbstractStateBackend.class);
+		String checkpointPath = tempFolder.newFolder().toURI().toString();
+		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(checkpointPath, nonPartBackend);
+
+		Environment env = getMockEnvironment();
+		rocksDbBackend.createStreamFactory(env.getJobID(), "foobar");
+
+		verify(nonPartBackend, times(1)).createStreamFactory(any(JobID.class), anyString());
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
+	private static Environment getMockEnvironment() {
+		return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) });
+	}
+
+	private static Environment getMockEnvironment(File[] tempDirs) {
+		IOManager ioMan = mock(IOManager.class);
+		when(ioMan.getSpillingDirectories()).thenReturn(tempDirs);
+
+		Environment env = mock(Environment.class);
+		when(env.getJobID()).thenReturn(new JobID());
+		when(env.getUserClassLoader()).thenReturn(RocksDBStateBackendConfigTest.class.getClassLoader());
+		when(env.getIOManager()).thenReturn(ioMan);
+		when(env.getTaskKvStateRegistry()).thenReturn(new KvStateRegistry().createTaskRegistry(new
JobID(), new JobVertexID()));
+
+		TaskInfo taskInfo = mock(TaskInfo.class);
+		when(env.getTaskInfo()).thenReturn(taskInfo);
+
+		when(taskInfo.getIndexOfThisSubtask()).thenReturn(0);
+		return env;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
new file mode 100644
index 0000000..989e868
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AbstractAsyncIOCallable.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.async;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * The abstract class encapsulates the lifecycle and execution strategy for asynchronous
IO operations
+ *
+ * @param <V> return type of the asynchronous call
+ * @param <D> type of the IO handle
+ */
+public abstract class AbstractAsyncIOCallable<V, D extends Closeable> implements StoppableCallbackCallable<V>
{
+
+	private volatile boolean stopped;
+
+	/**
+	 * Closable handle to IO, e.g. an InputStream
+	 */
+	private volatile D ioHandle;
+
+	/**
+	 * Stores exception that might happen during close
+	 */
+	private volatile IOException stopException;
+
+
+	public AbstractAsyncIOCallable() {
+		this.stopped = false;
+	}
+
+	/**
+	 * This method implements the strategy for the actual IO operation:
+	 *
+	 * 1) Open the IO handle
+	 * 2) Perform IO operation
+	 * 3) Close IO handle
+	 *
+	 * @return Result of the IO operation, e.g. a deserialized object.
+	 * @throws Exception exception that happened during the call.
+	 */
+	@Override
+	public V call() throws Exception {
+
+		synchronized (this) {
+			if (isStopped()) {
+				throw new IOException("Task was already stopped. No I/O handle opened.");
+			}
+
+			ioHandle = openIOHandle();
+		}
+
+		try {
+
+			return performOperation();
+
+		} finally {
+			closeIOHandle();
+		}
+
+	}
+
+	/**
+	 * Open the IO Handle (e.g. a stream) on which the operation will be performed.
+	 *
+	 * @return the opened IO handle that implements #Closeable
+	 * @throws Exception
+	 */
+	protected abstract D openIOHandle() throws Exception;
+
+	/**
+	 * Implements the actual IO operation on the opened IO handle.
+	 *
+	 * @return Result of the IO operation
+	 * @throws Exception
+	 */
+	protected abstract V performOperation() throws Exception;
+
+	/**
+	 * Stops the I/O operation by closing the I/O handle. If an exception is thrown on close,
it can be accessed via
+	 * #getStopException().
+	 */
+	@Override
+	public void stop() {
+		closeIOHandle();
+	}
+
+	private synchronized void closeIOHandle() {
+
+		if (!stopped) {
+			stopped = true;
+
+			final D handle = ioHandle;
+			if (handle != null) {
+				try {
+					handle.close();
+				} catch (IOException ex) {
+					stopException = ex;
+				}
+			}
+		}
+	}
+
+	/**
+	 * Returns the IO handle.
+	 * @return the IO handle
+	 */
+	protected D getIoHandle() {
+		return ioHandle;
+	}
+
+	/**
+	 * Optional callback that subclasses can implement. This is called when the callable method
completed, e.g. because
+	 * it finished or was stopped.
+	 */
+	@Override
+	public void done() {
+		//optional callback hook
+	}
+
+	/**
+	 * Check if the IO operation is stopped
+	 *
+	 * @return true if stop() was called
+	 */
+	@Override
+	public boolean isStopped() {
+		return stopped;
+	}
+
+	/**
+	 * Returns Exception that might happen on stop.
+	 *
+	 * @return Potential Exception that happened open stopping.
+	 */
+	@Override
+	public IOException getStopException() {
+		return stopException;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
new file mode 100644
index 0000000..13d9057
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncDoneCallback.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.async;
+
+/**
+ * Callback for an asynchronous operation that is called on termination
+ */
+public interface AsyncDoneCallback {
+
+	/**
+	 * the callback
+	 */
+	void done();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
new file mode 100644
index 0000000..560e56a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppable.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.async;
+
+import java.io.IOException;
+
+/**
+ * An asynchronous operation that can be stopped.
+ */
+public interface AsyncStoppable {
+
+	/**
+	 * Stop the operation
+	 */
+	void stop();
+
+	/**
+	 * Check whether the operation is stopped
+	 *
+	 * @return true iff operation is stopped
+	 */
+	boolean isStopped();
+
+	/**
+	 * Delivers Exception that might happen during {@link #stop()}
+	 *
+	 * @return Exception that can happen during stop
+	 */
+	IOException getStopException();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
new file mode 100644
index 0000000..8316e4f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/AsyncStoppableTaskWithCallback.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.async;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.concurrent.FutureTask;
+
+/**
+ * @param <V> return type of the callable function
+ */
+public class AsyncStoppableTaskWithCallback<V> extends FutureTask<V> {
+
+	protected final StoppableCallbackCallable<V> stoppableCallbackCallable;
+
+	public AsyncStoppableTaskWithCallback(StoppableCallbackCallable<V> callable) {
+		super(Preconditions.checkNotNull(callable));
+		this.stoppableCallbackCallable = callable;
+	}
+
+	@Override
+	public boolean cancel(boolean mayInterruptIfRunning) {
+		
+		if (mayInterruptIfRunning) {
+			stoppableCallbackCallable.stop();
+		}
+
+		return super.cancel(mayInterruptIfRunning);
+	}
+
+	@Override
+	protected void done() {
+		stoppableCallbackCallable.done();
+	}
+
+	public static <V> AsyncStoppableTaskWithCallback<V> from(StoppableCallbackCallable<V>
callable) {
+		return new AsyncStoppableTaskWithCallback<>(callable);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/StoppableCallbackCallable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/StoppableCallbackCallable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/StoppableCallbackCallable.java
new file mode 100644
index 0000000..d459316
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/async/StoppableCallbackCallable.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.async;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A {@link Callable} that can be stopped and offers a callback on termination.
+ *
+ * @param <V> return value of the call operation.
+ */
+public interface StoppableCallbackCallable<V> extends Callable<V>, AsyncStoppable,
AsyncDoneCallback {
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 4801d85..6f0a814 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -71,7 +71,7 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory
{
 	/**
 	 * A {@code CheckpointStateOutputStream} that writes into a byte array.
 	 */
-	public static final class MemoryCheckpointOutputStream extends CheckpointStateOutputStream
{
+	public static class MemoryCheckpointOutputStream extends CheckpointStateOutputStream {
 
 		private final ByteArrayOutputStreamWithPos os = new ByteArrayOutputStreamWithPos();
 
@@ -86,13 +86,13 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory
{
 		}
 
 		@Override
-		public void write(int b) {
+		public void write(int b) throws IOException {
 			os.write(b);
 			isEmpty = false;
 		}
 
 		@Override
-		public void write(byte[] b, int off, int len) {
+		public void write(byte[] b, int off, int len) throws IOException {
 			os.write(b, off, len);
 			isEmpty = false;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/addd0842/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 02579aa..13f650c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -66,6 +66,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Base class for all streaming tasks. A task is the unit of local processing that is deployed
@@ -176,8 +177,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 	private long lastCheckpointSize = 0;
 
+	/** Thread pool for async snapshot workers */
 	private ExecutorService asyncOperationsThreadPool;
 
+	/** Timeout to await the termination of the thread pool in milliseconds */
+	private long threadPoolTerminationTimeout = 0L;
+
 	// ------------------------------------------------------------------------
 	//  Life cycle methods for specific implementations
 	// ------------------------------------------------------------------------
@@ -441,6 +446,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		if (!asyncOperationsThreadPool.isShutdown()) {
 			asyncOperationsThreadPool.shutdownNow();
 		}
+
+		if(threadPoolTerminationTimeout > 0L) {
+			asyncOperationsThreadPool.awaitTermination(threadPoolTerminationTimeout, TimeUnit.MILLISECONDS);
+		}
 	}
 
 	/**
@@ -861,6 +870,15 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		};
 	}
 
+	/**
+	 * Sets a timeout for the async thread pool. Default should always be 0 to avoid blocking
restarts of task.
+	 *
+	 * @param threadPoolTerminationTimeout timeout for the async thread pool in milliseconds
+	 */
+	public void setThreadPoolTerminationTimeout(long threadPoolTerminationTimeout) {
+		this.threadPoolTerminationTimeout = threadPoolTerminationTimeout;
+	}
+
 	// ------------------------------------------------------------------------
 
 	/**


Mime
View raw message