flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [6/8] flink git commit: [FLINK-4844] Partitionable Raw Keyed/Operator State
Date Thu, 20 Oct 2016 14:15:24 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
index 3e2d713..1ad41ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStateHandle.java
@@ -38,8 +38,8 @@ public class OperatorStateHandle implements StreamStateHandle {
 	private final StreamStateHandle delegateStateHandle;
 
 	public OperatorStateHandle(
-			StreamStateHandle delegateStateHandle,
-			Map<String, long[]> stateNameToPartitionOffsets) {
+			Map<String, long[]> stateNameToPartitionOffsets,
+			StreamStateHandle delegateStateHandle) {
 
 		this.delegateStateHandle = Preconditions.checkNotNull(delegateStateHandle);
 		this.stateNameToPartitionOffsets = Preconditions.checkNotNull(stateNameToPartitionOffsets);

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableCheckpointStateOutputStream.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableCheckpointStateOutputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableCheckpointStateOutputStream.java
deleted file mode 100644
index 065f9c2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionableCheckpointStateOutputStream.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.util.Preconditions;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-
-public class PartitionableCheckpointStateOutputStream extends FSDataOutputStream {
-
-	private final Map<String, long[]> stateNameToPartitionOffsets;
-	private final CheckpointStreamFactory.CheckpointStateOutputStream delegate;
-
-	public PartitionableCheckpointStateOutputStream(CheckpointStreamFactory.CheckpointStateOutputStream delegate) {
-		this.delegate = Preconditions.checkNotNull(delegate);
-		this.stateNameToPartitionOffsets = new HashMap<>();
-	}
-
-	@Override
-	public long getPos() throws IOException {
-		return delegate.getPos();
-	}
-
-	@Override
-	public void flush() throws IOException {
-		delegate.flush();
-	}
-
-	@Override
-	public void sync() throws IOException {
-		delegate.sync();
-	}
-
-	@Override
-	public void write(int b) throws IOException {
-		delegate.write(b);
-	}
-
-	@Override
-	public void write(byte[] b) throws IOException {
-		delegate.write(b);
-	}
-
-	@Override
-	public void write(byte[] b, int off, int len) throws IOException {
-		delegate.write(b, off, len);
-	}
-
-	@Override
-	public void close() throws IOException {
-		delegate.close();
-	}
-
-	public OperatorStateHandle closeAndGetHandle() throws IOException {
-		StreamStateHandle streamStateHandle = delegate.closeAndGetHandle();
-		return new OperatorStateHandle(streamStateHandle, stateNameToPartitionOffsets);
-	}
-
-	public void startNewPartition(String stateName) throws IOException {
-		long[] offs = stateNameToPartitionOffsets.get(stateName);
-		if (offs == null) {
-			offs = new long[1];
-		} else {
-			//TODO maybe we can use some primitive array list here instead of an array to avoid resize on each call.
-			offs = Arrays.copyOf(offs, offs.length + 1);
-		}
-
-		offs[offs.length - 1] = getPos();
-		stateNameToPartitionOffsets.put(stateName, offs);
-	}
-
-	public static PartitionableCheckpointStateOutputStream wrap(
-			CheckpointStreamFactory.CheckpointStateOutputStream stream) {
-		return new PartitionableCheckpointStateOutputStream(stream);
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotProvider.java
deleted file mode 100644
index c47fedd..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotProvider.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.util.concurrent.RunnableFuture;
-
-/**
- * Interface for operations that can perform snapshots of their state.
- *
- * @param <S> Generic type of the state object that is created as handle to snapshots.
- */
-public interface SnapshotProvider<S extends StateObject> {
-
-	/**
-	 * Operation that writes a snapshot into a stream that is provided by the given {@link CheckpointStreamFactory} and
-	 * returns a @{@link RunnableFuture} that gives a state handle to the snapshot. It is up to the implementation if
-	 * the operation is performed synchronous or asynchronous. In the later case, the returned Runnable must be executed
-	 * first before obtaining the handle.
-	 *
-	 * @param checkpointId  The ID of the checkpoint.
-	 * @param timestamp     The timestamp of the checkpoint.
-	 * @param streamFactory The factory that we can use for writing our state to streams.
-	 * @return A runnable future that will yield a {@link StateObject}.
-	 */
-	RunnableFuture<S> snapshot(
-			long checkpointId,
-			long timestamp,
-			CheckpointStreamFactory streamFactory) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
new file mode 100644
index 0000000..2aa282d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Interface for operations that can perform snapshots of their state.
+ *
+ * @param <S> Generic type of the state object that is created as handle to snapshots.
+ */
+public interface Snapshotable<S extends StateObject> {
+
+	/**
+	 * Operation that writes a snapshot into a stream that is provided by the given {@link CheckpointStreamFactory} and
+	 * returns a @{@link RunnableFuture} that gives a state handle to the snapshot. It is up to the implementation if
+	 * the operation is performed synchronous or asynchronous. In the later case, the returned Runnable must be executed
+	 * first before obtaining the handle.
+	 *
+	 * @param checkpointId  The ID of the checkpoint.
+	 * @param timestamp     The timestamp of the checkpoint.
+	 * @param streamFactory The factory that we can use for writing our state to streams.
+	 * @return A runnable future that will yield a {@link StateObject}.
+	 */
+	RunnableFuture<S> snapshot(
+			long checkpointId,
+			long timestamp,
+			CheckpointStreamFactory streamFactory) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContext.java
new file mode 100644
index 0000000..a066739
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContext.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * This interface provides a context in which operators can initialize by registering to managed state (i.e. state that
+ * is managed by state backends) or iterating over streams of state partitions written as raw state in a previous
+ * snapshot.
+ *
+ * <p>
+ * Similar to the managed state from {@link ManagedInitializationContext} and in general,  raw operator state is
+ * available to all operators, while raw keyed state is only available for operators after keyBy.
+ *
+ * <p>
+ * For the purpose of initialization, the context signals if all state is empty (new operator) or if any state was
+ * restored from a previous execution of this operator.
+ *
+ */
+@PublicEvolving
+public interface StateInitializationContext extends FunctionInitializationContext {
+
+	/**
+	 * Returns an iterable to obtain input streams for previously stored operator state partitions that are assigned to
+	 * this operator.
+	 */
+	Iterable<StatePartitionStreamProvider> getRawOperatorStateInputs();
+
+	/**
+	 * Returns an iterable to obtain input streams for previously stored keyed state partitions that are assigned to
+	 * this operator.
+	 */
+	Iterable<KeyGroupStatePartitionStreamProvider> getRawKeyedStateInputs();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
new file mode 100644
index 0000000..8fbde05
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.OperatorStateStore;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * Default implementation of {@link StateInitializationContext}.
+ */
+public class StateInitializationContextImpl implements StateInitializationContext {
+
+	/** Closable registry to participate in the operator's cancel/close methods */
+	private final ClosableRegistry closableRegistry;
+
+	/** Signal whether any state to restore was found */
+	private final boolean restored;
+
+	private final OperatorStateStore operatorStateStore;
+	private final Collection<OperatorStateHandle> operatorStateHandles;
+
+	private final KeyedStateStore keyedStateStore;
+	private final Collection<KeyGroupsStateHandle> keyGroupsStateHandles;
+
+	private final Iterable<KeyGroupStatePartitionStreamProvider> keyedStateIterable;
+
+	public StateInitializationContextImpl(
+			boolean restored,
+			OperatorStateStore operatorStateStore,
+			KeyedStateStore keyedStateStore,
+			Collection<KeyGroupsStateHandle> keyGroupsStateHandles,
+			Collection<OperatorStateHandle> operatorStateHandles,
+			ClosableRegistry closableRegistry) {
+
+		this.restored = restored;
+		this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
+		this.operatorStateStore = operatorStateStore;
+		this.keyedStateStore = keyedStateStore;
+		this.operatorStateHandles = operatorStateHandles;
+		this.keyGroupsStateHandles = keyGroupsStateHandles;
+
+		this.keyedStateIterable = keyGroupsStateHandles == null ?
+				null
+				: new Iterable<KeyGroupStatePartitionStreamProvider>() {
+			@Override
+			public Iterator<KeyGroupStatePartitionStreamProvider> iterator() {
+				return new KeyGroupStreamIterator(getKeyGroupsStateHandles().iterator(), getClosableRegistry());
+			}
+		};
+	}
+
+	@Override
+	public boolean isRestored() {
+		return restored;
+	}
+
+	public Collection<OperatorStateHandle> getOperatorStateHandles() {
+		return operatorStateHandles;
+	}
+
+	public Collection<KeyGroupsStateHandle> getKeyGroupsStateHandles() {
+		return keyGroupsStateHandles;
+	}
+
+	public ClosableRegistry getClosableRegistry() {
+		return closableRegistry;
+	}
+
+	@Override
+	public Iterable<StatePartitionStreamProvider> getRawOperatorStateInputs() {
+		if (null != operatorStateHandles) {
+			return new Iterable<StatePartitionStreamProvider>() {
+				@Override
+				public Iterator<StatePartitionStreamProvider> iterator() {
+					return new OperatorStateStreamIterator(
+							DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME,
+							getOperatorStateHandles().iterator(), getClosableRegistry());
+				}
+			};
+		} else {
+			return Collections.emptyList();
+		}
+	}
+
+	@Override
+	public Iterable<KeyGroupStatePartitionStreamProvider> getRawKeyedStateInputs() {
+		if(null == keyedStateStore) {
+			throw new IllegalStateException("Attempt to access keyed state from non-keyed operator.");
+		}
+
+		if (null != keyGroupsStateHandles) {
+			return keyedStateIterable;
+		} else {
+			return Collections.emptyList();
+		}
+	}
+
+	@Override
+	public OperatorStateStore getManagedOperatorStateStore() {
+		return operatorStateStore;
+	}
+
+	@Override
+	public KeyedStateStore getManagedKeyedStateStore() {
+		return keyedStateStore;
+	}
+
+	public void close() {
+		IOUtils.closeQuietly(closableRegistry);
+	}
+
+	private static class KeyGroupStreamIterator implements Iterator<KeyGroupStatePartitionStreamProvider> {
+
+		private final Iterator<KeyGroupsStateHandle> stateHandleIterator;
+		private final ClosableRegistry closableRegistry;
+
+		private KeyGroupsStateHandle currentStateHandle;
+		private FSDataInputStream currentStream;
+		private Iterator<Tuple2<Integer, Long>> currentOffsetsIterator;
+
+		public KeyGroupStreamIterator(
+				Iterator<KeyGroupsStateHandle> stateHandleIterator, ClosableRegistry closableRegistry) {
+
+			this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator);
+			this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
+		}
+
+		@Override
+		public boolean hasNext() {
+			if (null != currentStateHandle && currentOffsetsIterator.hasNext()) {
+				return true;
+			} else {
+				while (stateHandleIterator.hasNext()) {
+					currentStateHandle = stateHandleIterator.next();
+					if (currentStateHandle.getNumberOfKeyGroups() > 0) {
+						currentOffsetsIterator = currentStateHandle.getGroupRangeOffsets().iterator();
+						closableRegistry.unregisterClosable(currentStream);
+						IOUtils.closeQuietly(currentStream);
+						currentStream = null;
+						return true;
+					}
+				}
+				return false;
+			}
+		}
+
+		private void openStream() throws IOException {
+			FSDataInputStream stream = currentStateHandle.openInputStream();
+			closableRegistry.registerClosable(stream);
+			currentStream = stream;
+		}
+
+		@Override
+		public KeyGroupStatePartitionStreamProvider next() {
+			Tuple2<Integer, Long> keyGroupOffset = currentOffsetsIterator.next();
+			try {
+				if (null == currentStream) {
+					openStream();
+				}
+				currentStream.seek(keyGroupOffset.f1);
+				return new KeyGroupStatePartitionStreamProvider(currentStream, keyGroupOffset.f0);
+			} catch (IOException ioex) {
+				return new KeyGroupStatePartitionStreamProvider(ioex, keyGroupOffset.f0);
+			}
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("Read only Iterator");
+		}
+	}
+
+	private static class OperatorStateStreamIterator implements Iterator<StatePartitionStreamProvider> {
+
+		private final String stateName; //TODO since we only support a single named state in raw, this could be dropped
+
+		private final Iterator<OperatorStateHandle> stateHandleIterator;
+		private final ClosableRegistry closableRegistry;
+
+		private OperatorStateHandle currentStateHandle;
+		private FSDataInputStream currentStream;
+		private long[] offsets;
+		private int offPos;
+
+		public OperatorStateStreamIterator(
+				String stateName,
+				Iterator<OperatorStateHandle> stateHandleIterator,
+				ClosableRegistry closableRegistry) {
+
+			this.stateName = Preconditions.checkNotNull(stateName);
+			this.stateHandleIterator = Preconditions.checkNotNull(stateHandleIterator);
+			this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
+		}
+
+		@Override
+		public boolean hasNext() {
+			if (null != currentStateHandle && offPos < offsets.length) {
+				return true;
+			} else {
+				while (stateHandleIterator.hasNext()) {
+					currentStateHandle = stateHandleIterator.next();
+					long[] offsets = currentStateHandle.getStateNameToPartitionOffsets().get(stateName);
+					if (null != offsets && offsets.length > 0) {
+
+						this.offsets = offsets;
+						this.offPos = 0;
+
+						closableRegistry.unregisterClosable(currentStream);
+						IOUtils.closeQuietly(currentStream);
+						currentStream = null;
+
+						return true;
+					}
+				}
+				return false;
+			}
+		}
+
+		private void openStream() throws IOException {
+			FSDataInputStream stream = currentStateHandle.openInputStream();
+			closableRegistry.registerClosable(stream);
+			currentStream = stream;
+		}
+
+		@Override
+		public StatePartitionStreamProvider next() {
+			long offset = offsets[offPos++];
+			try {
+				if (null == currentStream) {
+					openStream();
+				}
+				currentStream.seek(offset);
+
+				return new StatePartitionStreamProvider(currentStream);
+			} catch (IOException ioex) {
+				return new StatePartitionStreamProvider(ioex);
+			}
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("Read only Iterator");
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
new file mode 100644
index 0000000..8b07da8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StatePartitionStreamProvider.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.runtime.util.NonClosingStreamDecorator;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * This class provides access to input streams that contain data of one state partition of a partitionable state.
+ *
+ * TODO use bounded stream that fail fast if the limit is exceeded on corrupted reads.
+ */
+@PublicEvolving
+public class StatePartitionStreamProvider {
+
+	/** A ready-made stream that contains data for one state partition */
+	private final InputStream stream;
+
+	/** Holds potential exception that happened when actually trying to create the stream */
+	private final IOException creationException;
+
+	public StatePartitionStreamProvider(IOException creationException) {
+		this.creationException = Preconditions.checkNotNull(creationException);
+		this.stream = null;
+	}
+
+	public StatePartitionStreamProvider(InputStream stream) {
+		this.stream = new NonClosingStreamDecorator(Preconditions.checkNotNull(stream));
+		this.creationException = null;
+	}
+
+
+	/**
+	 * Returns a stream with the data of one state partition.
+	 */
+	public InputStream getStream() throws IOException {
+		if (creationException != null) {
+			throw new IOException(creationException);
+		}
+		return stream;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java
new file mode 100644
index 0000000..4dbbeaf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContext.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * This interface provides a context in which operators that use managed (i.e. state that is managed by state
+ * backends) or raw (i.e. the operator can write it's state streams) state can perform a snapshot.
+ */
+@PublicEvolving
+public interface StateSnapshotContext extends FunctionSnapshotContext {
+
+	/**
+	 * Returns an output stream for keyed state
+	 */
+	KeyedStateCheckpointOutputStream getRawKeyedOperatorStateOutput() throws Exception;
+
+	/**
+	 * Returns an output stream for operator state
+	 */
+	OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Exception;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
new file mode 100644
index 0000000..d632529
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * This class is a default implementation for StateSnapshotContext.
+ */
+public class StateSnapshotContextSynchronousImpl implements StateSnapshotContext {
+	
+	private final long checkpointId;
+	private final long checkpointTimestamp;
+	
+	/** Factory for he checkpointing stream */
+	private final CheckpointStreamFactory streamFactory;
+	
+	/** Key group range for the operator that created this context. Only for keyed operators */
+	private final KeyGroupRange keyGroupRange;
+
+	/**
+	 * Registry for opened streams to participate in the lifecycle of the stream task. Hence, this registry should be 
+	 * obtained from and managed by the stream task.
+	 */
+	private final ClosableRegistry closableRegistry;
+
+	private KeyedStateCheckpointOutputStream keyedStateCheckpointOutputStream;
+	private OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream;
+
+	@VisibleForTesting
+	public StateSnapshotContextSynchronousImpl(long checkpointId, long checkpointTimestamp) {
+		this.checkpointId = checkpointId;
+		this.checkpointTimestamp = checkpointTimestamp;
+		this.streamFactory = null;
+		this.keyGroupRange = KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
+		this.closableRegistry = null;
+	}
+
+
+	public StateSnapshotContextSynchronousImpl(
+			long checkpointId,
+			long checkpointTimestamp,
+			CheckpointStreamFactory streamFactory,
+			KeyGroupRange keyGroupRange,
+			ClosableRegistry closableRegistry) {
+
+		this.checkpointId = checkpointId;
+		this.checkpointTimestamp = checkpointTimestamp;
+		this.streamFactory = Preconditions.checkNotNull(streamFactory);
+		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
+		this.closableRegistry = Preconditions.checkNotNull(closableRegistry);
+	}
+
+	@Override
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	@Override
+	public long getCheckpointTimestamp() {
+		return checkpointTimestamp;
+	}
+
+	private CheckpointStreamFactory.CheckpointStateOutputStream openAndRegisterNewStream() throws Exception {
+		CheckpointStreamFactory.CheckpointStateOutputStream cout =
+				streamFactory.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
+
+		closableRegistry.registerClosable(cout);
+		return cout;
+	}
+
+	@Override
+	public KeyedStateCheckpointOutputStream getRawKeyedOperatorStateOutput() throws Exception {
+		if (null == keyedStateCheckpointOutputStream) {
+			Preconditions.checkState(keyGroupRange != KeyGroupRange.EMPTY_KEY_GROUP_RANGE, "Not a keyed operator");
+			keyedStateCheckpointOutputStream = new KeyedStateCheckpointOutputStream(openAndRegisterNewStream(), keyGroupRange);
+		}
+		return keyedStateCheckpointOutputStream;
+	}
+
+	@Override
+	public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Exception {
+		if (null == operatorStateCheckpointOutputStream) {
+			operatorStateCheckpointOutputStream = new OperatorStateCheckpointOutputStream(openAndRegisterNewStream());
+		}
+		return operatorStateCheckpointOutputStream;
+	}
+
+	public RunnableFuture<KeyGroupsStateHandle> getKeyedStateStreamFuture() throws IOException {
+		return closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream);
+	}
+
+	public RunnableFuture<OperatorStateHandle> getOperatorStateStreamFuture() throws IOException {
+		return closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream);
+	}
+
+	private <T extends StreamStateHandle> RunnableFuture<T> closeAndUnregisterStreamToObtainStateHandle(
+			NonClosingCheckpointOutputStream<T> stream) throws IOException {
+		if (null == stream) {
+			return null;
+		}
+
+		closableRegistry.unregisterClosable(stream.getDelegate());
+
+		// for now we only support synchronous writing
+		return new DoneFuture<>(stream.closeAndGetHandle());
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
new file mode 100644
index 0000000..ecd6399
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.util.CollectionUtil;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class encapsulates all state handles for a task.
+ */
+public class TaskStateHandles implements Serializable {
+
+	public static final TaskStateHandles EMPTY = new TaskStateHandles();
+
+	private static final long serialVersionUID = 267686583583579359L;
+
+	/** State handle with the (non-partitionable) legacy operator state*/
+	@Deprecated
+	private final ChainedStateHandle<StreamStateHandle> legacyOperatorState;
+
+	/** Collection of handles which represent the managed keyed state of the head operator */
+	private final Collection<KeyGroupsStateHandle> managedKeyedState;
+
+	/** Collection of handles which represent the raw/streamed keyed state of the head operator */
+	private final Collection<KeyGroupsStateHandle> rawKeyedState;
+
+	/** Outer list represents the operator chain, each collection holds handles for managed state of a single operator */
+	private final List<Collection<OperatorStateHandle>> managedOperatorState;
+
+	/** Outer list represents the operator chain, each collection holds handles for raw/streamed state of a single operator */
+	private final List<Collection<OperatorStateHandle>> rawOperatorState;
+
+	public TaskStateHandles() {
+		this(null, null, null, null, null);
+	}
+
+	public TaskStateHandles(SubtaskState checkpointStateHandles) {
+		this(checkpointStateHandles.getLegacyOperatorState(),
+				transform(checkpointStateHandles.getManagedOperatorState()),
+				transform(checkpointStateHandles.getRawOperatorState()),
+				transform(checkpointStateHandles.getManagedKeyedState()),
+				transform(checkpointStateHandles.getRawKeyedState()));
+	}
+
+	public TaskStateHandles(
+			ChainedStateHandle<StreamStateHandle> legacyOperatorState,
+			List<Collection<OperatorStateHandle>> managedOperatorState,
+			List<Collection<OperatorStateHandle>> rawOperatorState,
+			Collection<KeyGroupsStateHandle> managedKeyedState,
+			Collection<KeyGroupsStateHandle> rawKeyedState) {
+
+		this.legacyOperatorState = legacyOperatorState;
+		this.managedKeyedState = managedKeyedState;
+		this.rawKeyedState = rawKeyedState;
+		this.managedOperatorState = managedOperatorState;
+		this.rawOperatorState = rawOperatorState;
+	}
+
+	@Deprecated
+	public ChainedStateHandle<StreamStateHandle> getLegacyOperatorState() {
+		return legacyOperatorState;
+	}
+
+	public Collection<KeyGroupsStateHandle> getManagedKeyedState() {
+		return managedKeyedState;
+	}
+
+	public Collection<KeyGroupsStateHandle> getRawKeyedState() {
+		return rawKeyedState;
+	}
+
+	public List<Collection<OperatorStateHandle>> getRawOperatorState() {
+		return rawOperatorState;
+	}
+
+	public List<Collection<OperatorStateHandle>> getManagedOperatorState() {
+		return managedOperatorState;
+	}
+
+	public boolean hasState() {
+		return !ChainedStateHandle.isNullOrEmpty(legacyOperatorState)
+				|| !CollectionUtil.isNullOrEmpty(managedKeyedState)
+				|| !CollectionUtil.isNullOrEmpty(rawKeyedState)
+				|| !CollectionUtil.isNullOrEmpty(rawOperatorState)
+				|| !CollectionUtil.isNullOrEmpty(managedOperatorState);
+	}
+
+	private static List<Collection<OperatorStateHandle>> transform(ChainedStateHandle<OperatorStateHandle> in) {
+		if (null == in) {
+			return Collections.emptyList();
+		}
+		List<Collection<OperatorStateHandle>> out = new ArrayList<>(in.getLength());
+		for (int i = 0; i < in.getLength(); ++i) {
+			OperatorStateHandle osh = in.get(i);
+			out.add(osh != null ? Collections.singletonList(osh) : null);
+		}
+		return out;
+	}
+
+	private static List<KeyGroupsStateHandle> transform(KeyGroupsStateHandle in) {
+		return in == null ? Collections.<KeyGroupsStateHandle>emptyList() : Collections.singletonList(in);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		TaskStateHandles that = (TaskStateHandles) o;
+
+		if (legacyOperatorState != null ?
+				!legacyOperatorState.equals(that.legacyOperatorState)
+				: that.legacyOperatorState != null) {
+			return false;
+		}
+		if (managedKeyedState != null ?
+				!managedKeyedState.equals(that.managedKeyedState)
+				: that.managedKeyedState != null) {
+			return false;
+		}
+		if (rawKeyedState != null ?
+				!rawKeyedState.equals(that.rawKeyedState)
+				: that.rawKeyedState != null) {
+			return false;
+		}
+
+		if (rawOperatorState != null ?
+				!rawOperatorState.equals(that.rawOperatorState)
+				: that.rawOperatorState != null) {
+			return false;
+		}
+		return managedOperatorState != null ?
+				managedOperatorState.equals(that.managedOperatorState)
+				: that.managedOperatorState == null;
+	}
+
+	@Override
+	public int hashCode() {
+		int result = legacyOperatorState != null ? legacyOperatorState.hashCode() : 0;
+		result = 31 * result + (managedKeyedState != null ? managedKeyedState.hashCode() : 0);
+		result = 31 * result + (rawKeyedState != null ? rawKeyedState.hashCode() : 0);
+		result = 31 * result + (managedOperatorState != null ? managedOperatorState.hashCode() : 0);
+		result = 31 * result + (rawOperatorState != null ? rawOperatorState.hashCode() : 0);
+		return result;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
new file mode 100644
index 0000000..71026c6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/UserFacingListState.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.state.ListState;
+
+import java.util.Collections;
+
+/**
+ * Simple wrapper list state that exposes empty state properly as an empty list.
+ * 
+ * @param <T> The type of elements in the list state.
+ */
+class UserFacingListState<T> implements ListState<T> {
+
+	private final ListState<T> originalState;
+
+	private final Iterable<T> emptyState = Collections.emptyList();
+
+	UserFacingListState(ListState<T> originalState) {
+		this.originalState = originalState;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Iterable<T> get() throws Exception {
+		Iterable<T> original = originalState.get();
+		return original != null ? original : emptyState;
+	}
+
+	@Override
+	public void add(T value) throws Exception {
+		originalState.add(value);
+	}
+
+	@Override
+	public void clear() {
+		originalState.clear();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index e027632..4e15cd5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.List;
+import java.util.Collection;
 
 /**
  * The file state backend is a state backend that stores the state of streaming jobs in a file system.
@@ -199,7 +199,7 @@ public class FsStateBackend extends AbstractStateBackend {
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			List<KeyGroupsStateHandle> restoredState,
+			Collection<KeyGroupsStateHandle> restoredState,
 			TaskKvStateRegistry kvStateRegistry) throws Exception {
 		return new HeapKeyedStateBackend<>(
 				kvStateRegistry,

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index b283494..56be46f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -50,8 +50,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.RunnableFuture;
 
@@ -94,7 +94,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 			ClassLoader userCodeClassLoader,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			List<KeyGroupsStateHandle> restoredState) throws Exception {
+			Collection<KeyGroupsStateHandle> restoredState) throws Exception {
 		super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange);
 
 		LOG.info("Initializing heap keyed state backend from snapshot.");
@@ -248,7 +248,7 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	}
 
 	@SuppressWarnings({"unchecked"})
-	private void restorePartitionedState(List<KeyGroupsStateHandle> state) throws Exception {
+	private void restorePartitionedState(Collection<KeyGroupsStateHandle> state) throws Exception {
 
 		int numRegisteredKvStates = 0;
 		Map<Integer, String> kvStatesById = new HashMap<>();
@@ -259,13 +259,10 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 				continue;
 			}
 
-			FSDataInputStream fsDataInputStream = null;
+			FSDataInputStream fsDataInputStream = keyGroupsHandle.openInputStream();
+			cancelStreamRegistry.registerClosable(fsDataInputStream);
 
 			try {
-
-				fsDataInputStream = keyGroupsHandle.openInputStream();
-				cancelStreamRegistry.registerClosable(fsDataInputStream);
-
 				DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(fsDataInputStream);
 
 				int numKvStates = inView.readShort();

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
index 028f8c8..30de638 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemCheckpointStreamFactory.java
@@ -127,6 +127,10 @@ public class MemCheckpointStreamFactory implements CheckpointStreamFactory {
 			return os.getPosition();
 		}
 
+		public boolean isClosed() {
+			return closed;
+		}
+
 		/**
 		 * Closes the stream and returns the byte array containing the stream's data.
 		 * @return The byte array containing the stream's data.

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 1772dbe..33f03ad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.Collection;
 
 /**
  * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no
@@ -100,7 +100,7 @@ public class MemoryStateBackend extends AbstractStateBackend {
 			TypeSerializer<K> keySerializer,
 			int numberOfKeyGroups,
 			KeyGroupRange keyGroupRange,
-			List<KeyGroupsStateHandle> restoredState,
+			Collection<KeyGroupsStateHandle> restoredState,
 			TaskKvStateRegistry kvStateRegistry) throws Exception {
 
 		return new HeapKeyedStateBackend<>(

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
index 6f1bf7b..38defcc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
@@ -20,11 +20,11 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
-import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -43,7 +43,7 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder {
 			JobID jobID,
 			ExecutionAttemptID executionAttemptID,
 			CheckpointMetaData checkpointMetaData,
-			CheckpointStateHandles checkpointStateHandles) {
+			SubtaskState checkpointStateHandles) {
 
 		AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
 				jobID, executionAttemptID, checkpointMetaData,

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
index 4fa20e6..7dbb76c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.state.CheckpointStateHandles;
 
 /**
  * Responder for checkpoint acknowledge and decline messages in the {@link Task}.
@@ -35,7 +35,7 @@ public interface CheckpointResponder {
 	 *             Job ID of the running job
 	 * @param executionAttemptID
 	 *             Execution attempt ID of the running task
-	 * @param checkpointStateHandles
+	 * @param subtaskState
 	 *             State handles for the checkpoint
 	 * @param checkpointMetaData
 	 *             Meta data for this checkpoint
@@ -45,7 +45,7 @@ public interface CheckpointResponder {
 		JobID jobID,
 		ExecutionAttemptID executionAttemptID,
 		CheckpointMetaData checkpointMetaData,
-		CheckpointStateHandles checkpointStateHandles);
+		SubtaskState subtaskState);
 
 	/**
 	 * Declines the given checkpoint.

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index f6720e7..fa69a60 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -26,6 +26,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -36,7 +37,6 @@ import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.CheckpointStateHandles;
 
 import java.util.Map;
 import java.util.concurrent.Future;
@@ -245,7 +245,7 @@ public class RuntimeEnvironment implements Environment {
 	@Override
 	public void acknowledgeCheckpoint(
 			CheckpointMetaData checkpointMetaData,
-			CheckpointStateHandles checkpointStateHandles) {
+			SubtaskState checkpointStateHandles) {
 
 
 		checkpointResponder.acknowledgeCheckpoint(

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 02a41b5..bd522bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -25,16 +25,11 @@ import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.io.network.PartitionState;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -46,23 +41,24 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-
+import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
@@ -70,7 +66,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URL;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -233,18 +228,10 @@ public class Task implements Runnable, TaskActions {
 	private volatile ExecutorService asyncCallDispatcher;
 
 	/**
-	 * The handle to the chained operator state that the task was initialized with. Will be set
+	 * The handles to the states that the task was initialized with. Will be set
 	 * to null after the initialization, to be memory friendly.
 	 */
-	private volatile ChainedStateHandle<StreamStateHandle> chainedOperatorState;
-
-	/**
-	 * The handle to the key group state that the task was initialized with. Will be set
-	 * to null after the initialization, to be memory friendly.
-	 */
-	private volatile List<KeyGroupsStateHandle> keyGroupStates;
-
-	private volatile List<Collection<OperatorStateHandle>> partitionableOperatorState;
+	private volatile TaskStateHandles taskStateHandles;
 
 	/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
 	private long taskCancellationInterval;
@@ -280,10 +267,8 @@ public class Task implements Runnable, TaskActions {
 		this.requiredJarFiles = checkNotNull(tdd.getRequiredJarFiles());
 		this.requiredClasspaths = checkNotNull(tdd.getRequiredClasspaths());
 		this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName());
-		this.chainedOperatorState = tdd.getOperatorState();
 		this.serializedExecutionConfig = checkNotNull(tdd.getSerializedExecutionConfig());
-		this.keyGroupStates = tdd.getKeyGroupState();
-		this.partitionableOperatorState = tdd.getPartitionableOperatorState();
+		this.taskStateHandles = tdd.getTaskStateHandles();
 
 		this.taskCancellationInterval = jobConfiguration.getLong(
 			ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
@@ -570,20 +555,19 @@ public class Task implements Runnable, TaskActions {
 			// the state into the task. the state is non-empty if this is an execution
 			// of a task that failed but had backuped state from a checkpoint
 
-			if (chainedOperatorState != null || keyGroupStates != null || partitionableOperatorState != null) {
+			if (null != taskStateHandles) {
 				if (invokable instanceof StatefulTask) {
 					StatefulTask op = (StatefulTask) invokable;
-					op.setInitialState(chainedOperatorState, keyGroupStates, partitionableOperatorState);
+					op.setInitialState(taskStateHandles);
 				} else {
 					throw new IllegalStateException("Found operator state for a non-stateful task invokable");
 				}
+				// be memory and GC friendly - since the code stays in invoke() for a potentially long time,
+				// we clear the reference to the state handle
+				//noinspection UnusedAssignment
+				taskStateHandles = null;
 			}
 
-			// be memory and GC friendly - since the code stays in invoke() for a potentially long time,
-			// we clear the reference to the state handle
-			//noinspection UnusedAssignment
-			this.chainedOperatorState = null;
-			this.keyGroupStates = null;
 
 			// ----------------------------------------------------------------
 			//  actual task core work

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
index 27d958a..ce52eac 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/IntArrayList.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.util;
 
+import java.util.Arrays;
 import java.util.NoSuchElementException;
 
 /**
@@ -69,6 +70,10 @@ public class IntArrayList {
 		}
 	}
 
+	public int[] toArray() {
+		return Arrays.copyOf(array, size);
+	}
+
 	public static final IntArrayList EMPTY = new IntArrayList(0) {
 		
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
index e653209..f2d9556 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/LongArrayList.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.util;
 
+import java.util.Arrays;
+
 /**
  * Minimal implementation of an array-backed list of longs
  */
@@ -61,6 +63,10 @@ public class LongArrayList {
 	public boolean isEmpty() {
 		return (size==0);
 	}
+
+	public long[] toArray() {
+		return Arrays.copyOf(array, size);
+	}
 	
 	private void grow(int length) {
 		if(length > array.length) {

http://git-wip-us.apache.org/repos/asf/flink/blob/cab9cd44/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
new file mode 100644
index 0000000..ba7bc79
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/NonClosingStreamDecorator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * Decorator for input streams that ignores calls to {@link InputStream#close()}.
+ */
+public class NonClosingStreamDecorator extends InputStream {
+
+	private final InputStream delegate;
+
+	public NonClosingStreamDecorator(InputStream delegate) {
+		this.delegate = delegate;
+	}
+
+	@Override
+	public int read() throws IOException {
+		return delegate.read();
+	}
+
+	@Override
+	public int read(byte[] b) throws IOException {
+		return delegate.read(b);
+	}
+
+	@Override
+	public int read(byte[] b, int off, int len) throws IOException {
+		return delegate.read(b, off, len);
+	}
+
+	@Override
+	public long skip(long n) throws IOException {
+		return delegate.skip(n);
+	}
+
+	@Override
+	public int available() throws IOException {
+		return super.available();
+	}
+
+	@Override
+	public void close() throws IOException {
+		// ignore
+	}
+
+	@Override
+	public void mark(int readlimit) {
+		super.mark(readlimit);
+	}
+
+	@Override
+	public void reset() throws IOException {
+		super.reset();
+	}
+
+	@Override
+	public boolean markSupported() {
+		return super.markSupported();
+	}
+}
\ No newline at end of file


Mime
View raw message