flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [12/27] flink git commit: [FLINK-3761] Refactor State Backends/Make Keyed State Key-Group Aware
Date Wed, 31 Aug 2016 17:28:30 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index 657c57e..6f4a983 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -1,322 +1,322 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.contrib.streaming.state;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.TaskInfo;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.util.OperatingSystem;
-import org.junit.Assume;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.rocksdb.ColumnFamilyOptions;
-import org.rocksdb.CompactionStyle;
-import org.rocksdb.DBOptions;
-
-import java.io.File;
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
-/**
- * Tests for configuring the RocksDB State Backend 
- */
-@SuppressWarnings("serial")
-public class RocksDBStateBackendConfigTest {
-	
-	private static final String TEMP_URI = new File(System.getProperty("java.io.tmpdir")).toURI().toString();
-
-	@Before
-	public void checkOperatingSystem() {
-		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
-	}
-
-	// ------------------------------------------------------------------------
-	//  RocksDB local file directory
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testSetDbPath() throws Exception {
-		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-		
-		assertNull(rocksDbBackend.getDbStoragePaths());
-		
-		rocksDbBackend.setDbStoragePath("/abc/def");
-		assertArrayEquals(new String[] { "/abc/def" }, rocksDbBackend.getDbStoragePaths());
-
-		rocksDbBackend.setDbStoragePath(null);
-		assertNull(rocksDbBackend.getDbStoragePaths());
-
-		rocksDbBackend.setDbStoragePaths("/abc/def", "/uvw/xyz");
-		assertArrayEquals(new String[] { "/abc/def", "/uvw/xyz" }, rocksDbBackend.getDbStoragePaths());
-
-		//noinspection NullArgumentToVariableArgMethod
-		rocksDbBackend.setDbStoragePaths(null);
-		assertNull(rocksDbBackend.getDbStoragePaths());
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testSetNullPaths() throws Exception {
-		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-		rocksDbBackend.setDbStoragePaths();
-	}
-
-	@Test(expected = IllegalArgumentException.class)
-	public void testNonFileSchemePath() throws Exception {
-		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-		rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition");
-	}
-
-	// ------------------------------------------------------------------------
-	//  RocksDB local file automatic from temp directories
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testUseTempDirectories() throws Exception {
-		File dir1 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
-		File dir2 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
-
-		File[] tempDirs = new File[] { dir1, dir2 };
-		
-		try {
-			assertTrue(dir1.mkdirs());
-			assertTrue(dir2.mkdirs());
-
-			RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-			assertNull(rocksDbBackend.getDbStoragePaths());
-			
-			rocksDbBackend.initializeForJob(getMockEnvironment(tempDirs), "foobar", IntSerializer.INSTANCE);
-			assertArrayEquals(tempDirs, rocksDbBackend.getStoragePaths());
-		}
-		finally {
-			FileUtils.deleteDirectory(dir1);
-			FileUtils.deleteDirectory(dir2);
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  RocksDB local file directory initialization
-	// ------------------------------------------------------------------------
-
-	@Test
-	public void testFailWhenNoLocalStorageDir() throws Exception {
-		File targetDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
-		try {
-			assertTrue(targetDir.mkdirs());
-			
-			if (!targetDir.setWritable(false, false)) {
-				System.err.println("Cannot execute 'testFailWhenNoLocalStorageDir' because cannot mark directory non-writable");
-				return;
-			}
-			
-			RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-			rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());
-
-			boolean hasFailure = false;
-			try {
-				rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE);
-			}
-			catch (Exception e) {
-				assertTrue(e.getMessage().contains("No local storage directories available"));
-				assertTrue(e.getMessage().contains(targetDir.getAbsolutePath()));
-				hasFailure = true;
-			}
-			assertTrue("We must see a failure because no storaged directory is feasible.", hasFailure);
-		}
-		finally {
-			//noinspection ResultOfMethodCallIgnored
-			targetDir.setWritable(true, false);
-			FileUtils.deleteDirectory(targetDir);
-		}
-	}
-
-	@Test
-	public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
-		File targetDir1 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
-		File targetDir2 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
-		
-		try {
-			assertTrue(targetDir1.mkdirs());
-			assertTrue(targetDir2.mkdirs());
-
-			if (!targetDir1.setWritable(false, false)) {
-				System.err.println("Cannot execute 'testContinueOnSomeDbDirectoriesMissing' because cannot mark directory non-writable");
-				return;
-			}
-	
-			RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-			rocksDbBackend.setDbStoragePaths(targetDir1.getAbsolutePath(), targetDir2.getAbsolutePath());
-	
-			try {
-				rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE);
-
-				// actually get a state to see whether we can write to the storage directory
-				rocksDbBackend.getPartitionedState(
-						VoidNamespace.INSTANCE,
-						VoidNamespaceSerializer.INSTANCE,
-						new ValueStateDescriptor<>("test", String.class, ""));
-			}
-			catch (Exception e) {
-				e.printStackTrace();
-				fail("Backend initialization failed even though some paths were available");
-			}
-		} finally {
-			//noinspection ResultOfMethodCallIgnored
-			targetDir1.setWritable(true, false);
-			FileUtils.deleteDirectory(targetDir1);
-			FileUtils.deleteDirectory(targetDir2);
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  RocksDB Options
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testPredefinedOptions() throws Exception {
-		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-		
-		assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions());
-		
-		rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
-		assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
-
-		DBOptions opt1 = rocksDbBackend.getDbOptions();
-		DBOptions opt2 = rocksDbBackend.getDbOptions();
-		
-		assertEquals(opt1, opt2);
-
-		ColumnFamilyOptions columnOpt1 = rocksDbBackend.getColumnOptions();
-		ColumnFamilyOptions columnOpt2 = rocksDbBackend.getColumnOptions();
-
-		assertEquals(columnOpt1, columnOpt2);
-
-		assertEquals(CompactionStyle.LEVEL, columnOpt1.compactionStyle());
-	}
-
-	@Test
-	public void testOptionsFactory() throws Exception {
-		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-		
-		rocksDbBackend.setOptions(new OptionsFactory() {
-			@Override
-			public DBOptions createDBOptions(DBOptions currentOptions) {
-				return currentOptions;
-			}
-
-			@Override
-			public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
-				return currentOptions.setCompactionStyle(CompactionStyle.FIFO);
-			}
-		});
-		
-		assertNotNull(rocksDbBackend.getOptions());
-		assertEquals(CompactionStyle.FIFO, rocksDbBackend.getColumnOptions().compactionStyle());
-	}
-
-	@Test
-	public void testPredefinedAndOptionsFactory() throws Exception {
-		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
-
-		assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions());
-
-		rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
-		rocksDbBackend.setOptions(new OptionsFactory() {
-			@Override
-			public DBOptions createDBOptions(DBOptions currentOptions) {
-				return currentOptions;
-			}
-
-			@Override
-			public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
-				return currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
-			}
-		});
-		
-		assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
-		assertNotNull(rocksDbBackend.getOptions());
-		assertEquals(CompactionStyle.UNIVERSAL, rocksDbBackend.getColumnOptions().compactionStyle());
-	}
-
-	@Test
-	public void testPredefinedOptionsEnum() {
-		for (PredefinedOptions o : PredefinedOptions.values()) {
-			DBOptions opt = o.createDBOptions();
-			try {
-				assertNotNull(opt);
-			} finally {
-				opt.dispose();
-			}
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Contained Non-partitioned State Backend
-	// ------------------------------------------------------------------------
-	
-	@Test
-	public void testCallsForwardedToNonPartitionedBackend() throws Exception {
-		AbstractStateBackend nonPartBackend = mock(AbstractStateBackend.class);
-		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI, nonPartBackend);
-
-		rocksDbBackend.initializeForJob(getMockEnvironment(), "foo", IntSerializer.INSTANCE);
-		verify(nonPartBackend, times(1)).initializeForJob(any(Environment.class), anyString(), any(TypeSerializer.class));
-
-		rocksDbBackend.disposeAllStateForCurrentJob();
-		verify(nonPartBackend, times(1)).disposeAllStateForCurrentJob();
-		
-		rocksDbBackend.close();
-		verify(nonPartBackend, times(1)).close();
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Utilities
-	// ------------------------------------------------------------------------
-
-	private static Environment getMockEnvironment() {
-		return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) });
-	}
-	
-	private static Environment getMockEnvironment(File[] tempDirs) {
-		IOManager ioMan = mock(IOManager.class);
-		when(ioMan.getSpillingDirectories()).thenReturn(tempDirs);
-		
-		Environment env = mock(Environment.class);
-		when(env.getJobID()).thenReturn(new JobID());
-		when(env.getUserClassLoader()).thenReturn(RocksDBStateBackendConfigTest.class.getClassLoader());
-		when(env.getIOManager()).thenReturn(ioMan);
-
-		TaskInfo taskInfo = mock(TaskInfo.class);
-		when(env.getTaskInfo()).thenReturn(taskInfo);
-
-		when(taskInfo.getIndexOfThisSubtask()).thenReturn(0);
-		return env;
-	}
-}
+///*
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements.  See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership.  The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License.  You may obtain a copy of the License at
+// *
+// *    http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.flink.contrib.streaming.state;
+//
+//import org.apache.commons.io.FileUtils;
+//import org.apache.flink.api.common.JobID;
+//import org.apache.flink.api.common.TaskInfo;
+//import org.apache.flink.api.common.state.ValueStateDescriptor;
+//import org.apache.flink.api.common.typeutils.TypeSerializer;
+//import org.apache.flink.api.common.typeutils.base.IntSerializer;
+//import org.apache.flink.runtime.execution.Environment;
+//import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+//import org.apache.flink.runtime.state.AbstractStateBackend;
+//
+//import org.apache.flink.runtime.state.VoidNamespace;
+//import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+//import org.apache.flink.util.OperatingSystem;
+//import org.junit.Assume;
+//import org.junit.Before;
+//import org.junit.Test;
+//
+//import org.rocksdb.ColumnFamilyOptions;
+//import org.rocksdb.CompactionStyle;
+//import org.rocksdb.DBOptions;
+//
+//import java.io.File;
+//import java.util.UUID;
+//
+//import static org.junit.Assert.*;
+//import static org.mockito.Mockito.*;
+//
+///**
+// * Tests for configuring the RocksDB State Backend
+// */
+//@SuppressWarnings("serial")
+//public class RocksDBStateBackendConfigTest {
+//
+//	private static final String TEMP_URI = new File(System.getProperty("java.io.tmpdir")).toURI().toString();
+//
+//	@Before
+//	public void checkOperatingSystem() {
+//		Assume.assumeTrue("This test can't run successfully on Windows.", !OperatingSystem.isWindows());
+//	}
+//
+//	// ------------------------------------------------------------------------
+//	//  RocksDB local file directory
+//	// ------------------------------------------------------------------------
+//
+//	@Test
+//	public void testSetDbPath() throws Exception {
+//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
+//
+//		assertNull(rocksDbBackend.getDbStoragePaths());
+//
+//		rocksDbBackend.setDbStoragePath("/abc/def");
+//		assertArrayEquals(new String[] { "/abc/def" }, rocksDbBackend.getDbStoragePaths());
+//
+//		rocksDbBackend.setDbStoragePath(null);
+//		assertNull(rocksDbBackend.getDbStoragePaths());
+//
+//		rocksDbBackend.setDbStoragePaths("/abc/def", "/uvw/xyz");
+//		assertArrayEquals(new String[] { "/abc/def", "/uvw/xyz" }, rocksDbBackend.getDbStoragePaths());
+//
+//		//noinspection NullArgumentToVariableArgMethod
+//		rocksDbBackend.setDbStoragePaths(null);
+//		assertNull(rocksDbBackend.getDbStoragePaths());
+//	}
+//
+//	@Test(expected = IllegalArgumentException.class)
+//	public void testSetNullPaths() throws Exception {
+//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
+//		rocksDbBackend.setDbStoragePaths();
+//	}
+//
+//	@Test(expected = IllegalArgumentException.class)
+//	public void testNonFileSchemePath() throws Exception {
+//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
+//		rocksDbBackend.setDbStoragePath("hdfs:///some/path/to/perdition");
+//	}
+//
+//	// ------------------------------------------------------------------------
+//	//  RocksDB local file automatic from temp directories
+//	// ------------------------------------------------------------------------
+//
+//	@Test
+//	public void testUseTempDirectories() throws Exception {
+//		File dir1 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
+//		File dir2 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
+//
+//		File[] tempDirs = new File[] { dir1, dir2 };
+//
+//		try {
+//			assertTrue(dir1.mkdirs());
+//			assertTrue(dir2.mkdirs());
+//
+//			RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
+//			assertNull(rocksDbBackend.getDbStoragePaths());
+//
+//			rocksDbBackend.initializeForJob(getMockEnvironment(tempDirs), "foobar", IntSerializer.INSTANCE);
+//			assertArrayEquals(tempDirs, rocksDbBackend.getStoragePaths());
+//		}
+//		finally {
+//			FileUtils.deleteDirectory(dir1);
+//			FileUtils.deleteDirectory(dir2);
+//		}
+//	}
+//
+//	// ------------------------------------------------------------------------
+//	//  RocksDB local file directory initialization
+//	// ------------------------------------------------------------------------
+//
+//	@Test
+//	public void testFailWhenNoLocalStorageDir() throws Exception {
+//		File targetDir = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
+//		try {
+//			assertTrue(targetDir.mkdirs());
+//
+//			if (!targetDir.setWritable(false, false)) {
+//				System.err.println("Cannot execute 'testFailWhenNoLocalStorageDir' because cannot mark directory non-writable");
+//				return;
+//			}
+//
+//			RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
+//			rocksDbBackend.setDbStoragePath(targetDir.getAbsolutePath());
+//
+//			boolean hasFailure = false;
+//			try {
+//				rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE);
+//			}
+//			catch (Exception e) {
+//				assertTrue(e.getMessage().contains("No local storage directories available"));
+//				assertTrue(e.getMessage().contains(targetDir.getAbsolutePath()));
+//				hasFailure = true;
+//			}
+//			assertTrue("We must see a failure because no storaged directory is feasible.", hasFailure);
+//		}
+//		finally {
+//			//noinspection ResultOfMethodCallIgnored
+//			targetDir.setWritable(true, false);
+//			FileUtils.deleteDirectory(targetDir);
+//		}
+//	}
+//
+//	@Test
+//	public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
+//		File targetDir1 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
+//		File targetDir2 = new File(System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString());
+//
+//		try {
+//			assertTrue(targetDir1.mkdirs());
+//			assertTrue(targetDir2.mkdirs());
+//
+//			if (!targetDir1.setWritable(false, false)) {
+//				System.err.println("Cannot execute 'testContinueOnSomeDbDirectoriesMissing' because cannot mark directory non-writable");
+//				return;
+//			}
+//
+//			RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
+//			rocksDbBackend.setDbStoragePaths(targetDir1.getAbsolutePath(), targetDir2.getAbsolutePath());
+//
+//			try {
+//				rocksDbBackend.initializeForJob(getMockEnvironment(), "foobar", IntSerializer.INSTANCE);
+//
+//				// actually get a state to see whether we can write to the storage directory
+//				rocksDbBackend.getPartitionedState(
+//						VoidNamespace.INSTANCE,
+//						VoidNamespaceSerializer.INSTANCE,
+//						new ValueStateDescriptor<>("test", String.class, ""));
+//			}
+//			catch (Exception e) {
+//				e.printStackTrace();
+//				fail("Backend initialization failed even though some paths were available");
+//			}
+//		} finally {
+//			//noinspection ResultOfMethodCallIgnored
+//			targetDir1.setWritable(true, false);
+//			FileUtils.deleteDirectory(targetDir1);
+//			FileUtils.deleteDirectory(targetDir2);
+//		}
+//	}
+//
+//	// ------------------------------------------------------------------------
+//	//  RocksDB Options
+//	// ------------------------------------------------------------------------
+//
+//	@Test
+//	public void testPredefinedOptions() throws Exception {
+//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
+//
+//		assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions());
+//
+//		rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
+//		assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
+//
+//		DBOptions opt1 = rocksDbBackend.getDbOptions();
+//		DBOptions opt2 = rocksDbBackend.getDbOptions();
+//
+//		assertEquals(opt1, opt2);
+//
+//		ColumnFamilyOptions columnOpt1 = rocksDbBackend.getColumnOptions();
+//		ColumnFamilyOptions columnOpt2 = rocksDbBackend.getColumnOptions();
+//
+//		assertEquals(columnOpt1, columnOpt2);
+//
+//		assertEquals(CompactionStyle.LEVEL, columnOpt1.compactionStyle());
+//	}
+//
+//	@Test
+//	public void testOptionsFactory() throws Exception {
+//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
+//
+//		rocksDbBackend.setOptions(new OptionsFactory() {
+//			@Override
+//			public DBOptions createDBOptions(DBOptions currentOptions) {
+//				return currentOptions;
+//			}
+//
+//			@Override
+//			public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
+//				return currentOptions.setCompactionStyle(CompactionStyle.FIFO);
+//			}
+//		});
+//
+//		assertNotNull(rocksDbBackend.getOptions());
+//		assertEquals(CompactionStyle.FIFO, rocksDbBackend.getColumnOptions().compactionStyle());
+//	}
+//
+//	@Test
+//	public void testPredefinedAndOptionsFactory() throws Exception {
+//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI);
+//
+//		assertEquals(PredefinedOptions.DEFAULT, rocksDbBackend.getPredefinedOptions());
+//
+//		rocksDbBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED);
+//		rocksDbBackend.setOptions(new OptionsFactory() {
+//			@Override
+//			public DBOptions createDBOptions(DBOptions currentOptions) {
+//				return currentOptions;
+//			}
+//
+//			@Override
+//			public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOptions) {
+//				return currentOptions.setCompactionStyle(CompactionStyle.UNIVERSAL);
+//			}
+//		});
+//
+//		assertEquals(PredefinedOptions.SPINNING_DISK_OPTIMIZED, rocksDbBackend.getPredefinedOptions());
+//		assertNotNull(rocksDbBackend.getOptions());
+//		assertEquals(CompactionStyle.UNIVERSAL, rocksDbBackend.getColumnOptions().compactionStyle());
+//	}
+//
+//	@Test
+//	public void testPredefinedOptionsEnum() {
+//		for (PredefinedOptions o : PredefinedOptions.values()) {
+//			DBOptions opt = o.createDBOptions();
+//			try {
+//				assertNotNull(opt);
+//			} finally {
+//				opt.dispose();
+//			}
+//		}
+//	}
+//
+//	// ------------------------------------------------------------------------
+//	//  Contained Non-partitioned State Backend
+//	// ------------------------------------------------------------------------
+//
+//	@Test
+//	public void testCallsForwardedToNonPartitionedBackend() throws Exception {
+//		AbstractStateBackend nonPartBackend = mock(AbstractStateBackend.class);
+//		RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(TEMP_URI, nonPartBackend);
+//
+//		rocksDbBackend.initializeForJob(getMockEnvironment(), "foo", IntSerializer.INSTANCE);
+//		verify(nonPartBackend, times(1)).initializeForJob(any(Environment.class), anyString(), any(TypeSerializer.class));
+//
+//		rocksDbBackend.disposeAllStateForCurrentJob();
+//		verify(nonPartBackend, times(1)).disposeAllStateForCurrentJob();
+//
+//		rocksDbBackend.close();
+//		verify(nonPartBackend, times(1)).close();
+//	}
+//
+//	// ------------------------------------------------------------------------
+//	//  Utilities
+//	// ------------------------------------------------------------------------
+//
+//	private static Environment getMockEnvironment() {
+//		return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) });
+//	}
+//
+//	private static Environment getMockEnvironment(File[] tempDirs) {
+//		IOManager ioMan = mock(IOManager.class);
+//		when(ioMan.getSpillingDirectories()).thenReturn(tempDirs);
+//
+//		Environment env = mock(Environment.class);
+//		when(env.getJobID()).thenReturn(new JobID());
+//		when(env.getUserClassLoader()).thenReturn(RocksDBStateBackendConfigTest.class.getClassLoader());
+//		when(env.getIOManager()).thenReturn(ioMan);
+//
+//		TaskInfo taskInfo = mock(TaskInfo.class);
+//		when(env.getTaskInfo()).thenReturn(taskInfo);
+//
+//		when(taskInfo.getIndexOfThisSubtask()).thenReturn(0);
+//		return env;
+//	}
+//}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 57f906e..9222f0b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -18,25 +18,23 @@
 
 package org.apache.flink.contrib.streaming.state;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.state.StateBackendTestBase;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.util.OperatingSystem;
 import org.junit.Assume;
 import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
 import java.io.IOException;
-import java.util.UUID;
 
 /**
  * Tests for the partitioned state part of {@link RocksDBStateBackend}.
  */
 public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBackend> {
 
-	private File dbDir;
-	private File chkDir;
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
 
 	@Before
 	public void checkOperatingSystem() {
@@ -45,19 +43,10 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 
 	@Override
 	protected RocksDBStateBackend getStateBackend() throws IOException {
-		dbDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "state");
-		chkDir = new File(new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()), "snapshots");
-
-		RocksDBStateBackend backend = new RocksDBStateBackend(chkDir.getAbsoluteFile().toURI(), new MemoryStateBackend());
-		backend.setDbStoragePath(dbDir.getAbsolutePath());
+		String dbPath = tempFolder.newFolder().getAbsolutePath();
+		String checkpointPath = tempFolder.newFolder().toURI().toString();
+		RocksDBStateBackend backend = new RocksDBStateBackend(checkpointPath, new FsStateBackend(checkpointPath));
+		backend.setDbStoragePath(dbPath);
 		return backend;
 	}
-
-	@Override
-	protected void cleanup() {
-		try {
-			FileUtils.deleteDirectory(dbDir);
-			FileUtils.deleteDirectory(chkDir);
-		} catch (IOException ignore) {}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
index eb04038..5df1337 100644
--- a/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
+++ b/flink-contrib/flink-storm-examples/src/test/java/org/apache/flink/storm/tests/StormFieldsGroupingITCase.java
@@ -61,12 +61,22 @@ public class StormFieldsGroupingITCase extends StreamingProgramTestBase {
 		List<String> actualResults = new ArrayList<>();
 		readAllResultLines(actualResults, resultPath, new String[0], false);
 
+		//remove potential operator id prefix
+		for(int i = 0; i < actualResults.size(); ++i) {
+			String s = actualResults.get(i);
+			if(s.contains(">")) {
+				s = s.substring(s.indexOf(">") + 2);
+				actualResults.set(i, s);
+			}
+		}
+
 		Assert.assertEquals(expectedResults.size(),actualResults.size());
 		Collections.sort(actualResults);
 		Collections.sort(expectedResults);
+		System.out.println(actualResults);
 		for(int i=0; i< actualResults.size(); ++i) {
 			//compare against actual results with removed prefex (as it depends e.g. on the hash function used)
-			Assert.assertEquals(expectedResults.get(i), actualResults.get(i).substring(3));
+			Assert.assertEquals(expectedResults.get(i), actualResults.get(i));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
index 6e316e7..d59ff04 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/wrappers/BoltWrapper.java
@@ -294,7 +294,7 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements
 	}
 
 	@Override
-	public void dispose() {
+	public void dispose() throws Exception {
 		super.dispose();
 		this.bolt.cleanup();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index 2ebb917..c15b5f6 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -327,7 +327,7 @@ public class BoltWrapperTest extends AbstractTest {
 
 	private static final class TestBolt implements IRichBolt {
 		private static final long serialVersionUID = 7278692872260138758L;
-		private OutputCollector collector;
+		private transient OutputCollector collector;
 
 		@SuppressWarnings("rawtypes")
 		@Override
@@ -366,7 +366,7 @@ public class BoltWrapperTest extends AbstractTest {
 
 	public static StreamTask<?, ?> createMockStreamTask(ExecutionConfig execConfig) {
 		Environment env = mock(Environment.class);
-		when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 0, 1, 0));
+		when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 1, 0, 1, 0));
 		when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
 		when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
index ac87e74..5627ca8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java
@@ -31,16 +31,20 @@ public class TaskInfo {
 
 	private final String taskName;
 	private final String taskNameWithSubtasks;
+	private final int numberOfKeyGroups;
 	private final int indexOfSubtask;
 	private final int numberOfParallelSubtasks;
 	private final int attemptNumber;
 
-	public TaskInfo(String taskName, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
+	public TaskInfo(String taskName, int numberOfKeyGroups, int indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
 		checkArgument(indexOfSubtask >= 0, "Task index must be a non-negative number.");
+		checkArgument(numberOfKeyGroups >= 1, "Max parallelism must be a positive number.");
+		checkArgument(numberOfKeyGroups >= numberOfParallelSubtasks, "Max parallelism must be >= than parallelism.");
 		checkArgument(numberOfParallelSubtasks >= 1, "Parallelism must be a positive number.");
 		checkArgument(indexOfSubtask < numberOfParallelSubtasks, "Task index must be less than parallelism.");
 		checkArgument(attemptNumber >= 0, "Attempt number must be a non-negative number.");
 		this.taskName = checkNotNull(taskName, "Task Name must not be null.");
+		this.numberOfKeyGroups = numberOfKeyGroups;
 		this.indexOfSubtask = indexOfSubtask;
 		this.numberOfParallelSubtasks = numberOfParallelSubtasks;
 		this.attemptNumber = attemptNumber;
@@ -57,6 +61,13 @@ public class TaskInfo {
 	}
 
 	/**
+	 * Gets the number of key groups aka the max parallelism aka the max number of subtasks.
+	 */
+	public int getNumberOfKeyGroups() {
+		return numberOfKeyGroups;
+	}
+
+	/**
 	 * Gets the number of this parallel subtask. The numbering starts from 0 and goes up to
 	 * parallelism-1 (parallelism as returned by {@link #getNumberOfParallelSubtasks()}).
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
index 913b205..d9240fe 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/CollectionExecutor.java
@@ -183,7 +183,7 @@ public class CollectionExecutor {
 		GenericDataSinkBase<IN> typedSink = (GenericDataSinkBase<IN>) sink;
 
 		// build the runtime context and compute broadcast variables, if necessary
-		TaskInfo taskInfo = new TaskInfo(typedSink.getName(), 0, 1, 0);
+		TaskInfo taskInfo = new TaskInfo(typedSink.getName(), 1, 0, 1, 0);
 		RuntimeUDFContext ctx;
 
 		MetricGroup metrics = new UnregisteredMetricsGroup();
@@ -203,7 +203,7 @@ public class CollectionExecutor {
 		@SuppressWarnings("unchecked")
 		GenericDataSourceBase<OUT, ?> typedSource = (GenericDataSourceBase<OUT, ?>) source;
 		// build the runtime context and compute broadcast variables, if necessary
-		TaskInfo taskInfo = new TaskInfo(typedSource.getName(), 0, 1, 0);
+		TaskInfo taskInfo = new TaskInfo(typedSource.getName(), 1, 0, 1, 0);
 		
 		RuntimeUDFContext ctx;
 
@@ -230,7 +230,7 @@ public class CollectionExecutor {
 		SingleInputOperator<IN, OUT, ?> typedOp = (SingleInputOperator<IN, OUT, ?>) operator;
 		
 		// build the runtime context and compute broadcast variables, if necessary
-		TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
+		TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 1, 0, 1, 0);
 		RuntimeUDFContext ctx;
 
 		MetricGroup metrics = new UnregisteredMetricsGroup();
@@ -270,7 +270,7 @@ public class CollectionExecutor {
 		DualInputOperator<IN1, IN2, OUT, ?> typedOp = (DualInputOperator<IN1, IN2, OUT, ?>) operator;
 		
 		// build the runtime context and compute broadcast variables, if necessary
-		TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 0, 1, 0);
+		TaskInfo taskInfo = new TaskInfo(typedOp.getName(), 1, 0, 1, 0);
 		RuntimeUDFContext ctx;
 
 		MetricGroup metrics = new UnregisteredMetricsGroup();

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
index a6becf7..0318d1f 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FSDataOutputStream.java
@@ -29,6 +29,8 @@ import java.io.OutputStream;
 @Public
 public abstract class FSDataOutputStream extends OutputStream {
 
+	public abstract long getPos() throws IOException;
+
 	public abstract void flush() throws IOException;
 
 	public abstract void sync() throws IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
index 54ec8dd..c3b793d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/local/LocalDataOutputStream.java
@@ -90,4 +90,9 @@ public class LocalDataOutputStream extends FSDataOutputStream {
 	public void sync() throws IOException {
 		fos.getFD().sync();
 	}
+
+	@Override
+	public long getPos() throws IOException {
+		return fos.getChannel().position();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
new file mode 100644
index 0000000..285e016
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/memory/ByteArrayOutputStreamWithPos.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.memory;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+/**
+ * Un-synchronized copy of Java's ByteArrayOutputStream that also exposes the current position.
+ */
+public class ByteArrayOutputStreamWithPos extends OutputStream {
+
+	/**
+	 * The buffer where data is stored.
+	 */
+	protected byte[] buf;
+
+	/**
+	 * The number of valid bytes in the buffer.
+	 */
+	protected int count;
+
+	/**
+	 * Creates a new byte array output stream. The buffer capacity is
+	 * initially 32 bytes, though its size increases if necessary.
+	 */
+	public ByteArrayOutputStreamWithPos() {
+		this(32);
+	}
+
+	/**
+	 * Creates a new byte array output stream, with a buffer capacity of
+	 * the specified size, in bytes.
+	 *
+	 * @param size the initial size.
+	 * @throws IllegalArgumentException if size is negative.
+	 */
+	public ByteArrayOutputStreamWithPos(int size) {
+		if (size < 0) {
+			throw new IllegalArgumentException("Negative initial size: "
+					+ size);
+		}
+		buf = new byte[size];
+	}
+
+	/**
+	 * Increases the capacity if necessary to ensure that it can hold
+	 * at least the number of elements specified by the minimum
+	 * capacity argument.
+	 *
+	 * @param minCapacity the desired minimum capacity
+	 * @throws OutOfMemoryError if {@code minCapacity < 0}.  This is
+	 *                          interpreted as a request for the unsatisfiably large capacity
+	 *                          {@code (long) Integer.MAX_VALUE + (minCapacity - Integer.MAX_VALUE)}.
+	 */
+	private void ensureCapacity(int minCapacity) {
+		// overflow-conscious code
+		if (minCapacity - buf.length > 0) {
+			grow(minCapacity);
+		}
+	}
+
+	/**
+	 * Increases the capacity to ensure that it can hold at least the
+	 * number of elements specified by the minimum capacity argument.
+	 *
+	 * @param minCapacity the desired minimum capacity
+	 */
+	private void grow(int minCapacity) {
+		// overflow-conscious code
+		int oldCapacity = buf.length;
+		int newCapacity = oldCapacity << 1;
+		if (newCapacity - minCapacity < 0) {
+			newCapacity = minCapacity;
+		}
+		if (newCapacity < 0) {
+			if (minCapacity < 0) { // overflow
+				throw new OutOfMemoryError();
+			}
+			newCapacity = Integer.MAX_VALUE;
+		}
+		buf = Arrays.copyOf(buf, newCapacity);
+	}
+
+	/**
+	 * Writes the specified byte to this byte array output stream.
+	 *
+	 * @param b the byte to be written.
+	 */
+	public void write(int b) {
+		ensureCapacity(count + 1);
+		buf[count] = (byte) b;
+		count += 1;
+	}
+
+	/**
+	 * Writes <code>len</code> bytes from the specified byte array
+	 * starting at offset <code>off</code> to this byte array output stream.
+	 *
+	 * @param b   the data.
+	 * @param off the start offset in the data.
+	 * @param len the number of bytes to write.
+	 */
+	public void write(byte[] b, int off, int len) {
+		if ((off < 0) || (off > b.length) || (len < 0) ||
+				((off + len) - b.length > 0)) {
+			throw new IndexOutOfBoundsException();
+		}
+		ensureCapacity(count + len);
+		System.arraycopy(b, off, buf, count, len);
+		count += len;
+	}
+
+	/**
+	 * Writes the complete contents of this byte array output stream to
+	 * the specified output stream argument, as if by calling the output
+	 * stream's write method using <code>out.write(buf, 0, count)</code>.
+	 *
+	 * @param out the output stream to which to write the data.
+	 * @throws IOException if an I/O error occurs.
+	 */
+	public void writeTo(OutputStream out) throws IOException {
+		out.write(buf, 0, count);
+	}
+
+	/**
+	 * Resets the <code>count</code> field of this byte array output
+	 * stream to zero, so that all currently accumulated output in the
+	 * output stream is discarded. The output stream can be used again,
+	 * reusing the already allocated buffer space.
+	 *
+	 * @see java.io.ByteArrayInputStream#count
+	 */
+	public void reset() {
+		count = 0;
+	}
+
+	/**
+	 * Creates a newly allocated byte array. Its size is the current
+	 * size of this output stream and the valid contents of the buffer
+	 * have been copied into it.
+	 *
+	 * @return the current contents of this output stream, as a byte array.
+	 * @see java.io.ByteArrayOutputStream#size()
+	 */
+	public byte toByteArray()[] {
+		return Arrays.copyOf(buf, count);
+	}
+
+	/**
+	 * Returns the current size of the buffer.
+	 *
+	 * @return the value of the <code>count</code> field, which is the number
+	 * of valid bytes in this output stream.
+	 * @see java.io.ByteArrayOutputStream#count
+	 */
+	public int size() {
+		return count;
+	}
+
+	/**
+	 * Converts the buffer's contents into a string decoding bytes using the
+	 * platform's default character set. The length of the new <tt>String</tt>
+	 * is a function of the character set, and hence may not be equal to the
+	 * size of the buffer.
+	 * <p>
+	 * <p> This method always replaces malformed-input and unmappable-character
+	 * sequences with the default replacement string for the platform's
+	 * default character set. The {@linkplain java.nio.charset.CharsetDecoder}
+	 * class should be used when more control over the decoding process is
+	 * required.
+	 *
+	 * @return String decoded from the buffer's contents.
+	 * @since JDK1.1
+	 */
+	public String toString() {
+		return new String(buf, 0, count);
+	}
+
+	/**
+	 * Converts the buffer's contents into a string by decoding the bytes using
+	 * the named {@link java.nio.charset.Charset charset}. The length of the new
+	 * <tt>String</tt> is a function of the charset, and hence may not be equal
+	 * to the length of the byte array.
+	 * <p>
+	 * <p> This method always replaces malformed-input and unmappable-character
+	 * sequences with this charset's default replacement string. The {@link
+	 * java.nio.charset.CharsetDecoder} class should be used when more control
+	 * over the decoding process is required.
+	 *
+	 * @param charsetName the name of a supported
+	 *                    {@link java.nio.charset.Charset charset}
+	 * @return String decoded from the buffer's contents.
+	 * @throws UnsupportedEncodingException If the named charset is not supported
+	 * @since JDK1.1
+	 */
+	public String toString(String charsetName)
+			throws UnsupportedEncodingException {
+		return new String(buf, 0, count, charsetName);
+	}
+
+	/**
+	 * Creates a newly allocated string. Its size is the current size of
+	 * the output stream and the valid contents of the buffer have been
+	 * copied into it. Each character <i>c</i> in the resulting string is
+	 * constructed from the corresponding element <i>b</i> in the byte
+	 * array such that:
+	 * <blockquote><pre>
+	 *     c == (char)(((hibyte &amp; 0xff) &lt;&lt; 8) | (b &amp; 0xff))
+	 * </pre></blockquote>
+	 *
+	 * @param hibyte the high byte of each resulting Unicode character.
+	 * @return the current contents of the output stream, as a string.
+	 * @see java.io.ByteArrayOutputStream#size()
+	 * @see java.io.ByteArrayOutputStream#toString(String)
+	 * @see java.io.ByteArrayOutputStream#toString()
+	 * @deprecated This method does not properly convert bytes into characters.
+	 * As of JDK&nbsp;1.1, the preferred way to do this is via the
+	 * <code>toString(String enc)</code> method, which takes an encoding-name
+	 * argument, or the <code>toString()</code> method, which uses the
+	 * platform's default character encoding.
+	 */
+	@Deprecated
+	public String toString(int hibyte) {
+		return new String(buf, hibyte, 0, count);
+	}
+
+	/**
+	 * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in
+	 * this class can be called after the stream has been closed without
+	 * generating an <tt>IOException</tt>.
+	 */
+	public void close() throws IOException {
+	}
+
+	/**
+	 * Returns the read/write offset position for the stream.
+	 * @return the current position in the stream.
+	 */
+	public int getPosition() {
+		return count;
+	}
+
+	/**
+	 * Sets the read/write offset position for the stream.
+	 *
+	 * @param position the position to which the offset in the stream shall be set. Must be < getEndPosition
+	 */
+	public void setPosition(int position) {
+		Preconditions.checkArgument(position < getEndPosition(), "Position out of bounds.");
+		count = position;
+	}
+
+	/**
+	 * Returns the size of the internal buffer, which is the current end position for all setPosition calls.
+	 * @return size of the internal buffer
+	 */
+	public int getEndPosition() {
+		return buf.length;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
index 4cd2a64..7c5878d 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/functions/util/RuntimeUDFContextTest.java
@@ -38,7 +38,7 @@ import org.junit.Test;
 
 public class RuntimeUDFContextTest {
 
-	private final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0);
+	private final TaskInfo taskInfo = new TaskInfo("test name", 3, 1, 3, 0);
 
 	@Test
 	public void testBroadcastVariableNotFound() {

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
index c3cbb58..fc3fb1a 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichInputFormatTest.java
@@ -41,7 +41,7 @@ public class RichInputFormatTest {
 	@Test
 	public void testCheckRuntimeContextAccess() {
 		final SerializedInputFormat<Value> inputFormat = new SerializedInputFormat<Value>();
-		final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0);
+		final TaskInfo taskInfo = new TaskInfo("test name", 3, 1, 3, 0);
 		inputFormat.setRuntimeContext(
 				new RuntimeUDFContext(
 						taskInfo, getClass().getClassLoader(), new ExecutionConfig(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
index 4c303a6..95f8497 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/io/RichOutputFormatTest.java
@@ -41,7 +41,7 @@ public class RichOutputFormatTest {
 	@Test
 	public void testCheckRuntimeContextAccess() {
 		final SerializedOutputFormat<Value> inputFormat = new SerializedOutputFormat<Value>();
-		final TaskInfo taskInfo = new TaskInfo("test name", 1, 3, 0);
+		final TaskInfo taskInfo = new TaskInfo("test name", 3, 1, 3, 0);
 		
 		inputFormat.setRuntimeContext(new RuntimeUDFContext(
 				taskInfo, getClass().getClassLoader(), new ExecutionConfig(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
index 71bb102..b952c58 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSinkBaseTest.java
@@ -93,7 +93,7 @@ public class GenericDataSinkBaseTest implements java.io.Serializable {
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
 			final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
-			final TaskInfo taskInfo = new TaskInfo("test_sink", 0, 1, 0);
+			final TaskInfo taskInfo = new TaskInfo("test_sink", 1, 0, 1, 0);
 			executionConfig.disableObjectReuse();
 			in.reset();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
index 2dabe48..9a2b877 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/GenericDataSourceBaseTest.java
@@ -79,7 +79,7 @@ public class GenericDataSourceBaseTest implements java.io.Serializable {
 
 			final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
 			final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
-			final TaskInfo taskInfo = new TaskInfo("test_source", 0, 1, 0);
+			final TaskInfo taskInfo = new TaskInfo("test_source", 1, 0, 1, 0);
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
index f125c4b..232f510 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/FlatMapOperatorCollectionTest.java
@@ -77,7 +77,7 @@ public class FlatMapOperatorCollectionTest implements Serializable {
 		} else {
 			executionConfig.enableObjectReuse();
 		}
-		final TaskInfo taskInfo = new TaskInfo("Test UDF", 0, 4, 0);
+		final TaskInfo taskInfo = new TaskInfo("Test UDF", 4, 0, 4, 0);
 		// run on collections
 		final List<String> result = getTestFlatMapOperator(udf)
 				.executeOnCollections(input,

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
index 8befcb9..72f2f2e 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
@@ -121,7 +121,7 @@ public class InnerJoinOperatorBaseTest implements Serializable {
 
 
 		try {
-			final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0);
+			final TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0);
 			final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
 			final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
index d79e2a5..1b2af7c 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/MapOperatorTest.java
@@ -112,7 +112,7 @@ public class MapOperatorTest implements java.io.Serializable {
 			List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6"));
 			final HashMap<String, Accumulator<?, ?>> accumulatorMap = new HashMap<String, Accumulator<?, ?>>();
 			final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
-			final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0);
+			final TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0);
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
index 83c194a..e709152 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/operators/base/PartitionMapOperatorTest.java
@@ -83,7 +83,7 @@ public class PartitionMapOperatorTest implements java.io.Serializable {
 			
 			List<String> input = new ArrayList<String>(asList("1", "2", "3", "4", "5", "6"));
 
-			final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0);
+			final TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0);
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index fd7bf5d..df40998 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -20,17 +20,16 @@ package org.apache.flink.hdfstests;
 
 import org.apache.commons.io.FileUtils;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.StateBackendTestBase;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
 import org.apache.hadoop.conf.Configuration;
@@ -51,25 +50,23 @@ import java.util.UUID;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
-	
+
 	private static File TEMP_DIR;
-	
+
 	private static String HDFS_ROOT_URI;
-	
+
 	private static MiniDFSCluster HDFS_CLUSTER;
-	
+
 	private static FileSystem FS;
-	
+
 	// ------------------------------------------------------------------------
 	//  startup / shutdown
 	// ------------------------------------------------------------------------
-	
+
 	@BeforeClass
 	public static void createHDFS() {
 		try {
@@ -82,7 +79,7 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 
 			HDFS_ROOT_URI = "hdfs://" + HDFS_CLUSTER.getURI().getHost() + ":"
 					+ HDFS_CLUSTER.getNameNodePort() + "/";
-			
+
 			FS = FileSystem.get(new URI(HDFS_ROOT_URI));
 		}
 		catch (Exception e) {
@@ -109,11 +106,6 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 
 	}
 
-	@Override
-	protected void cleanup() throws Exception {
-		FileSystem.get(stateBaseURI).delete(new Path(stateBaseURI), true);
-	}
-
 	// ------------------------------------------------------------------------
 	//  Tests
 	// ------------------------------------------------------------------------
@@ -132,60 +124,19 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 	public void testReducingStateRestoreWithWrongSerializers() {}
 
 	@Test
-	public void testSetupAndSerialization() {
-		try {
-			URI baseUri = new URI(HDFS_ROOT_URI + UUID.randomUUID().toString());
-			
-			FsStateBackend originalBackend = new FsStateBackend(baseUri);
-
-			assertFalse(originalBackend.isInitialized());
-			assertEquals(baseUri, originalBackend.getBasePath().toUri());
-			assertNull(originalBackend.getCheckpointDirectory());
-
-			// serialize / copy the backend
-			FsStateBackend backend = CommonTestUtils.createCopySerializable(originalBackend);
-			assertFalse(backend.isInitialized());
-			assertEquals(baseUri, backend.getBasePath().toUri());
-			assertNull(backend.getCheckpointDirectory());
-
-			// no file operations should be possible right now
-			try {
-				FsStateBackend.FsCheckpointStateOutputStream out = backend.createCheckpointStateOutputStream(
-						2L,
-						System.currentTimeMillis());
-
-				out.write(1);
-				out.closeAndGetHandle();
-				fail("should fail with an exception");
-			} catch (IllegalStateException e) {
-				// supreme!
-			}
-
-			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "dummy", IntSerializer.INSTANCE);
-			assertNotNull(backend.getCheckpointDirectory());
-
-			Path checkpointDir = backend.getCheckpointDirectory();
-			assertTrue(FS.exists(checkpointDir));
-			assertTrue(isDirectoryEmpty(checkpointDir));
+	public void testStateOutputStream() {
+		URI basePath = randomHdfsFileUri();
 
-			backend.disposeAllStateForCurrentJob();
-			assertNull(backend.getCheckpointDirectory());
+		try {
+			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(basePath, 15));
+			JobID jobId = new JobID();
 
-			assertTrue(isDirectoryEmpty(baseUri));
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
 
-	@Test
-	public void testStateOutputStream() {
-		try {
-			FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(randomHdfsFileUri(), 15));
-			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "dummy", IntSerializer.INSTANCE);
+			CheckpointStreamFactory streamFactory = backend.createStreamFactory(jobId, "test_op");
 
-			Path checkpointDir = backend.getCheckpointDirectory();
+			// we know how FsCheckpointStreamFactory is implemented so we know where it
+			// will store checkpoints
+			Path checkpointPath = new Path(new Path(basePath), jobId.toString());
 
 			byte[] state1 = new byte[1274673];
 			byte[] state2 = new byte[1];
@@ -200,12 +151,12 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 
 			long checkpointId = 97231523452L;
 
-			FsStateBackend.FsCheckpointStateOutputStream stream1 =
-					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
-			FsStateBackend.FsCheckpointStateOutputStream stream2 =
-					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
-			FsStateBackend.FsCheckpointStateOutputStream stream3 =
-					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			CheckpointStreamFactory.CheckpointStateOutputStream stream1 =
+					streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			CheckpointStreamFactory.CheckpointStateOutputStream stream2 =
+					streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			CheckpointStreamFactory.CheckpointStateOutputStream stream3 =
+					streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
 
 			stream1.write(state1);
 			stream2.write(state2);
@@ -217,15 +168,15 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 
 			// use with try-with-resources
 			FileStateHandle handle4;
-			try (AbstractStateBackend.CheckpointStateOutputStream stream4 =
-						 backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) {
+			try (CheckpointStreamFactory.CheckpointStateOutputStream stream4 =
+						 streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis())) {
 				stream4.write(state4);
 				handle4 = (FileStateHandle) stream4.closeAndGetHandle();
 			}
 
 			// close before accessing handle
-			AbstractStateBackend.CheckpointStateOutputStream stream5 =
-					backend.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
+			CheckpointStreamFactory.CheckpointStateOutputStream stream5 =
+					streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
 			stream5.write(state4);
 			stream5.close();
 			try {
@@ -237,7 +188,7 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 
 			validateBytesInStream(handle1.openInputStream(), state1);
 			handle1.discardState();
-			assertFalse(isDirectoryEmpty(checkpointDir));
+			assertFalse(isDirectoryEmpty(checkpointPath));
 			ensureFileDeleted(handle1.getFilePath());
 
 			validateBytesInStream(handle2.openInputStream(), state2);
@@ -248,7 +199,7 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 
 			validateBytesInStream(handle4.openInputStream(), state4);
 			handle4.discardState();
-			assertTrue(isDirectoryEmpty(checkpointDir));
+			assertTrue(isDirectoryEmpty(checkpointPath));
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -270,7 +221,7 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 	private static boolean isDirectoryEmpty(URI directory) {
 		return isDirectoryEmpty(new Path(directory));
 	}
-	
+
 	private static boolean isDirectoryEmpty(Path directory) {
 		try {
 			FileStatus[] nested = FS.listStatus(directory);
@@ -293,14 +244,14 @@ public class FileStateBackendTest extends StateBackendTestBase<FsStateBackend> {
 
 	private static void validateBytesInStream(InputStream is, byte[] data) throws IOException {
 		byte[] holder = new byte[data.length];
-		
+
 		int pos = 0;
 		int read;
 		while (pos < holder.length && (read = is.read(holder, pos, holder.length - pos)) != -1) {
 			pos += read;
 		}
-			
-		assertEquals("not enough data", holder.length, pos); 
+
+		assertEquals("not enough data", holder.length, pos);
 		assertEquals("too much data", -1, is.read());
 		assertArrayEquals("wrong data", data, holder);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
index 2682584..a4426e0 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/CoGroupOperatorCollectionTest.java
@@ -77,7 +77,7 @@ public class CoGroupOperatorCollectionTest implements Serializable {
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			final HashMap<String, Accumulator<?, ?>> accumulators = new HashMap<String, Accumulator<?, ?>>();
 			final HashMap<String, Future<Path>> cpTasks = new HashMap<>();
-			final TaskInfo taskInfo = new TaskInfo("Test UDF", 0, 4, 0);
+			final TaskInfo taskInfo = new TaskInfo("Test UDF", 4, 0, 4, 0);
 			final RuntimeContext ctx = new RuntimeUDFContext(
 					taskInfo, null, executionConfig, cpTasks, accumulators, new UnregisteredMetricsGroup());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
index c5a247a..d0784a8 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/GroupReduceOperatorTest.java
@@ -165,7 +165,7 @@ public class GroupReduceOperatorTest implements java.io.Serializable {
 					Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
 					Integer>("bar", 4)));
 
-			final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0);
+			final TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0);
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			executionConfig.disableObjectReuse();

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
index 89574a8..ef33ac0 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/InnerJoinOperatorBaseTest.java
@@ -107,7 +107,7 @@ public class InnerJoinOperatorBaseTest implements Serializable {
 		));
 
 		try {
-			final TaskInfo taskInfo = new TaskInfo("op", 0, 1, 0);
+			final TaskInfo taskInfo = new TaskInfo("op", 1, 0, 1, 0);
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			
 			executionConfig.disableObjectReuse();

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
index 150854d..9427d6f 100644
--- a/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/common/operators/base/ReduceOperatorTest.java
@@ -145,7 +145,7 @@ public class ReduceOperatorTest implements java.io.Serializable {
 					Integer>("foo", 3), new Tuple2<String, Integer>("bar", 2), new Tuple2<String,
 					Integer>("bar", 4)));
 
-			final TaskInfo taskInfo = new TaskInfo(taskName, 0, 1, 0);
+			final TaskInfo taskInfo = new TaskInfo(taskName, 1, 0, 1, 0);
 
 			ExecutionConfig executionConfig = new ExecutionConfig();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 624db0d..5ac638e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -520,7 +520,7 @@ public class NFA<T> implements Serializable {
 		public void serialize(NFA<T> record, DataOutputView target) throws IOException {
 			ObjectOutputStream oos = new ObjectOutputStream(new DataOutputViewStream(target));
 			oos.writeObject(record);
-			oos.close();
+			oos.flush();
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index e3f924c..09773a2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -92,6 +92,8 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 	@Override
 	@SuppressWarnings("unchecked")
 	public void open() throws Exception {
+		super.open();
+
 		if (keys == null) {
 			keys = new HashSet<>();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 54c1477..52a02d1 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
@@ -83,16 +84,15 @@ public class CEPOperatorTest extends TestLogger {
 			}
 		};
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
-			new KeyedCEPPatternOperator<>(
-				Event.createTypeSerializer(),
-				false,
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new KeyedOneInputStreamOperatorTestHarness<>(
+				new KeyedCEPPatternOperator<>(
+					Event.createTypeSerializer(),
+					false,
+					keySelector,
+					IntSerializer.INSTANCE,
+					new NFAFactory()),
 				keySelector,
-				IntSerializer.INSTANCE,
-			new NFAFactory())
-		);
-
-		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+				BasicTypeInfo.INT_TYPE_INFO);
 
 		harness.open();
 
@@ -206,15 +206,15 @@ public class CEPOperatorTest extends TestLogger {
 			}
 		};
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new KeyedOneInputStreamOperatorTestHarness<>(
 				new KeyedCEPPatternOperator<>(
 						Event.createTypeSerializer(),
 						false,
 						keySelector,
 						IntSerializer.INSTANCE,
-						new NFAFactory()));
-
-		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+						new NFAFactory()),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
 
 		harness.open();
 
@@ -228,15 +228,16 @@ public class CEPOperatorTest extends TestLogger {
 		// simulate snapshot/restore with some elements in internal sorting queue
 		StreamStateHandle snapshot = harness.snapshot(0, 0);
 
-		harness = new OneInputStreamOperatorTestHarness<>(
+		harness = new KeyedOneInputStreamOperatorTestHarness<>(
 				new KeyedCEPPatternOperator<>(
 						Event.createTypeSerializer(),
 						false,
 						keySelector,
 						IntSerializer.INSTANCE,
-						new NFAFactory()));
+						new NFAFactory()),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
 
-		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
 		harness.setup();
 		harness.restore(snapshot);
 		harness.open();
@@ -252,15 +253,16 @@ public class CEPOperatorTest extends TestLogger {
 		// simulate snapshot/restore with empty element queue but NFA state
 		StreamStateHandle snapshot2 = harness.snapshot(1, 1);
 
-		harness = new OneInputStreamOperatorTestHarness<>(
+		harness = new KeyedOneInputStreamOperatorTestHarness<>(
 				new KeyedCEPPatternOperator<>(
 						Event.createTypeSerializer(),
 						false,
 						keySelector,
 						IntSerializer.INSTANCE,
-						new NFAFactory()));
+						new NFAFactory()),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
 
-		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
 		harness.setup();
 		harness.restore(snapshot2);
 		harness.open();
@@ -309,16 +311,17 @@ public class CEPOperatorTest extends TestLogger {
 			}
 		};
 
-		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new OneInputStreamOperatorTestHarness<>(
+		OneInputStreamOperatorTestHarness<Event, Map<String, Event>> harness = new KeyedOneInputStreamOperatorTestHarness<>(
 				new KeyedCEPPatternOperator<>(
 						Event.createTypeSerializer(),
 						false,
 						keySelector,
 						IntSerializer.INSTANCE,
-						new NFAFactory()));
+						new NFAFactory()),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
 
 		harness.setStateBackend(rocksDBStateBackend);
-		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
 
 		harness.open();
 
@@ -332,19 +335,21 @@ public class CEPOperatorTest extends TestLogger {
 		// simulate snapshot/restore with some elements in internal sorting queue
 		StreamStateHandle snapshot = harness.snapshot(0, 0);
 
-		harness = new OneInputStreamOperatorTestHarness<>(
+		harness = new KeyedOneInputStreamOperatorTestHarness<>(
 				new KeyedCEPPatternOperator<>(
 						Event.createTypeSerializer(),
 						false,
 						keySelector,
 						IntSerializer.INSTANCE,
-						new NFAFactory()));
+						new NFAFactory()),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
 
 		rocksDBStateBackend =
 				new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
 		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 		harness.setStateBackend(rocksDBStateBackend);
-		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+
 		harness.setup();
 		harness.restore(snapshot);
 		harness.open();
@@ -360,19 +365,20 @@ public class CEPOperatorTest extends TestLogger {
 		// simulate snapshot/restore with empty element queue but NFA state
 		StreamStateHandle snapshot2 = harness.snapshot(1, 1);
 
-		harness = new OneInputStreamOperatorTestHarness<>(
+		harness = new KeyedOneInputStreamOperatorTestHarness<>(
 				new KeyedCEPPatternOperator<>(
 						Event.createTypeSerializer(),
 						false,
 						keySelector,
 						IntSerializer.INSTANCE,
-						new NFAFactory()));
+						new NFAFactory()),
+				keySelector,
+				BasicTypeInfo.INT_TYPE_INFO);
 
 		rocksDBStateBackend =
 				new RocksDBStateBackend(rocksDbBackups, new MemoryStateBackend());
 		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 		harness.setStateBackend(rocksDBStateBackend);
-		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
 		harness.setup();
 		harness.restore(snapshot2);
 		harness.open();

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e78e203..e751e08 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -46,6 +46,7 @@ import scala.concurrent.Future;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -821,14 +822,10 @@ public class CheckpointCoordinator {
 						}
 
 						KeyGroupRange subtaskKeyGroupIds = keyGroupPartitions.get(i);
-						List<KeyGroupsStateHandle> subtaskKeyGroupStates = new ArrayList<>();
 
-						for (KeyGroupsStateHandle storedKeyGroup : taskState.getKeyGroupStates()) {
-							KeyGroupsStateHandle intersection = storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds);
-							if(intersection.getNumberOfKeyGroups() > 0) {
-								subtaskKeyGroupStates.add(intersection);
-							}
-						}
+						List<KeyGroupsStateHandle> subtaskKeyGroupStates = getKeyGroupsStateHandles(
+								taskState.getKeyGroupStates(),
+								subtaskKeyGroupIds);
 
 						Execution currentExecutionAttempt = executionJobVertex
 							.getTaskVertices()[i]
@@ -852,6 +849,27 @@ public class CheckpointCoordinator {
 	}
 
 	/**
+	 * Determine the subset of {@link KeyGroupsStateHandle KeyGroupsStateHandles} with correct
+	 * key group index for the given subtask {@link KeyGroupRange}.
+	 *
+	 * <p>This is publicly visible to be used in tests.
+	 */
+	public static List<KeyGroupsStateHandle> getKeyGroupsStateHandles(
+			Collection<KeyGroupsStateHandle> allKeyGroupsHandles,
+			KeyGroupRange subtaskKeyGroupIds) {
+
+		List<KeyGroupsStateHandle> subtaskKeyGroupStates = new ArrayList<>();
+
+		for (KeyGroupsStateHandle storedKeyGroup : allKeyGroupsHandles) {
+			KeyGroupsStateHandle intersection = storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds);
+			if(intersection.getNumberOfKeyGroups() > 0) {
+				subtaskKeyGroupStates.add(intersection);
+			}
+		}
+		return subtaskKeyGroupStates;
+	}
+
+	/**
 	 * Groups the available set of key groups into key group partitions. A key group partition is
 	 * the set of key groups which is assigned to the same task. Each set of the returned list
 	 * constitutes a key group partition.

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 8849e93..ca976e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -59,6 +59,9 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	/** The task's name. */
 	private final String taskName;
 
+	/** The number of key groups aka the max parallelism aka the max number of subtasks. */
+	private final int numberOfKeyGroups;
+
 	/** The task's index in the subtask group. */
 	private final int indexInSubtaskGroup;
 
@@ -110,6 +113,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			ExecutionAttemptID executionId,
 			SerializedValue<ExecutionConfig> serializedExecutionConfig,
 			String taskName,
+			int numberOfKeyGroups,
 			int indexInSubtaskGroup,
 			int numberOfSubtasks,
 			int attemptNumber,
@@ -135,6 +139,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		this.executionId = checkNotNull(executionId);
 		this.serializedExecutionConfig = checkNotNull(serializedExecutionConfig);
 		this.taskName = checkNotNull(taskName);
+		this.numberOfKeyGroups = numberOfKeyGroups;
 		this.indexInSubtaskGroup = indexInSubtaskGroup;
 		this.numberOfSubtasks = numberOfSubtasks;
 		this.attemptNumber = attemptNumber;
@@ -157,6 +162,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		ExecutionAttemptID executionId,
 		SerializedValue<ExecutionConfig> serializedExecutionConfig,
 		String taskName,
+		int numberOfKeyGroups,
 		int indexInSubtaskGroup,
 		int numberOfSubtasks,
 		int attemptNumber,
@@ -176,6 +182,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			executionId,
 			serializedExecutionConfig,
 			taskName,
+			numberOfKeyGroups,
 			indexInSubtaskGroup,
 			numberOfSubtasks,
 			attemptNumber,
@@ -227,6 +234,13 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	}
 
 	/**
+	 * Returns the task's number of key groups.
+	 */
+	public int getNumberOfKeyGroups() {
+		return numberOfKeyGroups;
+	}
+
+	/**
 	 * Returns the task's index in the subtask group.
 	 *
 	 * @return the task's index in the subtask group
@@ -253,7 +267,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	 * Returns the {@link TaskInfo} object for the subtask
 	 */
 	public TaskInfo getTaskInfo() {
-		return new TaskInfo(taskName, indexInSubtaskGroup, numberOfSubtasks, attemptNumber);
+		return new TaskInfo(taskName, numberOfKeyGroups, indexInSubtaskGroup, numberOfSubtasks, attemptNumber);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index f3a8b6d..b215394 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -682,6 +682,7 @@ public class ExecutionVertex {
 			executionId,
 			serializedConfig,
 			getTaskName(),
+			getMaxParallelism(),
 			subTaskIndex,
 			getTotalNumberOfParallelSubtasks(),
 			attemptNumber,

http://git-wip-us.apache.org/repos/asf/flink/blob/4809f536/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
index b0ff4b3..d6fbc19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopDataOutputStream.java
@@ -51,6 +51,11 @@ public class HadoopDataOutputStream extends FSDataOutputStream {
 	}
 
 	@Override
+	public long getPos() throws IOException {
+		return fdos.getPos();
+	}
+
+	@Override
 	public void flush() throws IOException {
 		if (HFLUSH_METHOD != null) {
 			try {


Mime
View raw message