flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [18/27] flink git commit: [FLINK-4381] Refactor State to Prepare For Key-Group State Backends
Date Wed, 31 Aug 2016 17:28:36 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
new file mode 100644
index 0000000..9b308a3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Handle to the non-partitioned states for the operators in an operator chain.
+ */
+public class ChainedStateHandle<T extends StateObject> implements StateObject {
+
+	private static final long serialVersionUID = 1L;
+
+	/** The state handles for all operators in the chain */
+	private final List<? extends T> operatorStateHandles;
+
+	/**
+	 * Wraps a list to the state handles for the operators in a chain. Individual state handles can be null.
+	 *
+	 * @param operatorStateHandles list with the state handles for the states in the operator chain.
+	 */
+	public ChainedStateHandle(List<? extends T> operatorStateHandles) {
+		this.operatorStateHandles = Preconditions.checkNotNull(operatorStateHandles);
+	}
+
+	/**
+	 * Check if there are any states handles present. Notice that this can be true even if {@link #getLength()} is
+	 * greater than zero, because state handles can be null.
+	 *
+	 * @return true if there are no state handles for any operator.
+	 */
+	public boolean isEmpty() {
+		for (T state : operatorStateHandles) {
+			if (state != null) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	/**
+	 * Returns the length of the operator chain. This can be different from the number of operator state handles,
+	 * because the some operators in the chain can have no state and thus their state handle can be null.
+	 *
+	 * @return length of the operator chain
+	 */
+	public int getLength() {
+		return operatorStateHandles.size();
+	}
+
+	/**
+	 * Get the state handle for a single operator in the operator chain by it's index.
+	 *
+	 * @param index the index in the operator chain
+	 * @return state handle to the operator at the given position in the operator chain. can be null.
+	 */
+	public T get(int index) {
+		return operatorStateHandles.get(index);
+	}
+
+	@Override
+	public void discardState() throws Exception {
+		StateUtil.bestEffortDiscardAllStateObjects(operatorStateHandles);
+	}
+
+	@Override
+	public long getStateSize() throws Exception {
+		long sumStateSize = 0;
+
+		if (operatorStateHandles != null) {
+			for (T state : operatorStateHandles) {
+				if (state != null) {
+					sumStateSize += state.getStateSize();
+				}
+			}
+		}
+
+		// State size as sum of all state sizes
+		return sumStateSize;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		ChainedStateHandle<?> that = (ChainedStateHandle<?>) o;
+
+		return operatorStateHandles.equals(that.operatorStateHandles);
+
+	}
+
+	@Override
+	public int hashCode() {
+		return operatorStateHandles.hashCode();
+	}
+
+	public static <T extends StateObject> ChainedStateHandle<T> wrapSingleHandle(T stateHandleToWrap) {
+		return new ChainedStateHandle<T>(Collections.singletonList(stateHandleToWrap));
+	}
+
+	@Override
+	public void close() throws IOException {
+		StateUtil.bestEffortCloseAllStateObjects(operatorStateHandles);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
index 280746d..9ee4b90 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/HashKeyGroupAssigner.java
@@ -56,11 +56,11 @@ public class HashKeyGroupAssigner<K> implements KeyGroupAssigner<K> {
 	}
 
 	@Override
-	public void setup(int numberKeyGroups) {
-		Preconditions.checkArgument(numberKeyGroups > 0, "The number of key groups has to be " +
+	public void setup(int numberOfKeygroups) {
+		Preconditions.checkArgument(numberOfKeygroups > 0, "The number of key groups has to be " +
 			"greater than 0. Use setMaxParallelism() to specify the number of key " +
 			"groups.");
 
-		this.numberKeyGroups = numberKeyGroups;
+		this.numberKeyGroups = numberOfKeygroups;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
new file mode 100644
index 0000000..de42bdb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRange.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Iterator;
+
+/**
+ * This class defines a range of key-group indexes. Key-groups are the granularity into which the keyspace of a job
+ * is partitioned for keyed state-handling in state backends. The boundaries of the range are inclusive.
+ */
+public class KeyGroupRange implements Iterable<Integer>, Serializable {
+
+	/** The empty key-group */
+	public static final KeyGroupRange EMPTY_KEY_GROUP = new KeyGroupRange();
+
+	private final int startKeyGroup;
+	private final int endKeyGroup;
+
+	/**
+	 * Empty KeyGroup Constructor
+	 */
+	private KeyGroupRange() {
+		this.startKeyGroup = 0;
+		this.endKeyGroup = -1;
+	}
+
+	/**
+	 * Defines the range [startKeyGroup, endKeyGroup]
+	 *
+	 * @param startKeyGroup start of the range (inclusive)
+	 * @param endKeyGroup end of the range (inclusive)
+	 */
+	public KeyGroupRange(int startKeyGroup, int endKeyGroup) {
+		Preconditions.checkArgument(startKeyGroup >= 0);
+		Preconditions.checkArgument(startKeyGroup <= endKeyGroup);
+		this.startKeyGroup = startKeyGroup;
+		this.endKeyGroup = endKeyGroup;
+		Preconditions.checkArgument(getNumberOfKeyGroups() >= 0, "Potential overflow detected.");
+	}
+
+
+	/**
+	 * Checks whether or not a single key-group is contained in the range.
+	 *
+	 * @param keyGroup Key-group to check for inclusion.
+	 * @return True, only if the key-group is in the range.
+	 */
+	public boolean contains(int keyGroup) {
+		return keyGroup >= startKeyGroup && keyGroup <= endKeyGroup;
+	}
+
+	/**
+	 * Create a range that represent the intersection between this range and the given range.
+	 *
+	 * @param other A KeyGroupRange to intersect.
+	 * @return Key-group range that is the intersection between this and the given key-group range.
+	 */
+	public KeyGroupRange getIntersection(KeyGroupRange other) {
+		int start = Math.max(startKeyGroup, other.startKeyGroup);
+		int end = Math.min(endKeyGroup, other.endKeyGroup);
+		return start <= end ? new KeyGroupRange(start, end) : EMPTY_KEY_GROUP;
+	}
+
+	/**
+	 *
+	 * @return The number of key-groups in the range
+	 */
+	public int getNumberOfKeyGroups() {
+		return 1 + endKeyGroup - startKeyGroup;
+	}
+
+	/**
+	 *
+	 * @return The first key-group in the range.
+	 */
+	public int getStartKeyGroup() {
+		return startKeyGroup;
+	}
+
+	/**
+	 *
+	 * @return The last key-group in the range.
+	 */
+	public int getEndKeyGroup() {
+		return endKeyGroup;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (!(o instanceof KeyGroupRange)) {
+			return false;
+		}
+
+		KeyGroupRange that = (KeyGroupRange) o;
+		return startKeyGroup == that.startKeyGroup && endKeyGroup == that.endKeyGroup;
+	}
+
+
+	@Override
+	public int hashCode() {
+		int result = startKeyGroup;
+		result = 31 * result + endKeyGroup;
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "KeyGroupRange{" +
+				"startKeyGroup=" + startKeyGroup +
+				", endKeyGroup=" + endKeyGroup +
+				'}';
+	}
+
+	@Override
+	public Iterator<Integer> iterator() {
+		return new KeyGroupIterator();
+	}
+
+	private final class KeyGroupIterator implements Iterator<Integer> {
+
+		public KeyGroupIterator() {
+			this.iteratorPos = 0;
+		}
+
+		private int iteratorPos;
+
+		@Override
+		public boolean hasNext() {
+			return iteratorPos < getNumberOfKeyGroups();
+		}
+
+		@Override
+		public Integer next() {
+			int rv = startKeyGroup + iteratorPos;
+			++iteratorPos;
+			return rv;
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("Unsupported by this iterator!");
+		}
+	}
+
+	/**
+	 * Factory method that also handles creation of empty key-groups.
+	 *
+	 * @param startKeyGroup start of the range (inclusive)
+	 * @param endKeyGroup end of the range (inclusive)
+	 * @return the key-group from start to end or an empty key-group range.
+	 */
+	public static KeyGroupRange of(int startKeyGroup, int endKeyGroup) {
+		return startKeyGroup <= endKeyGroup ? new KeyGroupRange(startKeyGroup, endKeyGroup) : EMPTY_KEY_GROUP;
+	}
+
+	/**
+	 * Computes the range of key-groups that are assigned to a given operator under the given parallelism and maximum
+	 * parallelism.
+	 *
+	 * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
+	 * to go beyond this boundary, this method must perform arithmetic on long values.
+	 *
+	 * @param maxParallelism Maximal parallelism that the job was initially created with.
+	 * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
+	 * @param operatorIndex  Id of a key-group. 0 <= keyGroupID < maxParallelism.
+	 * @return
+	 */
+	public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
+			int maxParallelism,
+			int parallelism,
+			int operatorIndex) {
+
+		int start = operatorIndex == 0 ? 0 : ((operatorIndex * maxParallelism - 1) / parallelism) + 1;
+		int end = ((operatorIndex + 1) * maxParallelism - 1) / parallelism;
+		return new KeyGroupRange(start, end);
+	}
+
+	/**
+	 * Computes the index of the operator to which a key-group belongs under the given parallelism and maximum
+	 * parallelism.
+	 *
+	 * IMPORTANT: maxParallelism must be <= Short.MAX_VALUE to avoid rounding problems in this method. If we ever want
+	 * to go beyond this boundary, this method must perform arithmetic on long values.
+	 *
+	 * @param maxParallelism Maximal parallelism that the job was initially created with.
+	 *                       0 < parallelism <= maxParallelism <= Short.MAX_VALUE must hold.
+	 * @param parallelism    The current parallelism under which the job runs. Must be <= maxParallelism.
+	 * @param keyGroupId     Id of a key-group. 0 <= keyGroupID < maxParallelism.
+	 * @return The index of the operator to which elements from the given key-group should be routed under the given
+	 * parallelism and maxParallelism.
+	 */
+	public static final int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) {
+		return keyGroupId * parallelism / maxParallelism;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
new file mode 100644
index 0000000..4f0a82b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeOffsets.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Iterator;
+
+
+/**
+ * This class combines a key-group range with offsets that correspond to the key-groups in the range.
+ */
+public class KeyGroupRangeOffsets implements Iterable<Tuple2<Integer, Long>> , Serializable {
+
+	/** the range of key-groups */
+	private final KeyGroupRange keyGroupRange;
+
+	/** the aligned array of offsets for the key-groups */
+	private final long[] offsets;
+
+	/**
+	 * Creates key-group range with offsets from the given key-group range. The order of given offsets must be aligned
+	 * with respect to the key-groups in the range.
+	 *
+	 * @param keyGroupRange The range of key-groups.
+	 * @param offsets The aligned array of offsets for the given key-groups.
+	 */
+	public KeyGroupRangeOffsets(KeyGroupRange keyGroupRange, long[] offsets) {
+		this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
+		this.offsets = Preconditions.checkNotNull(offsets);
+		Preconditions.checkArgument(offsets.length == keyGroupRange.getNumberOfKeyGroups());
+	}
+
+	/**
+	 * Creates key-group range with offsets from the given start key-group to end key-group. The order of given offsets
+	 * must be aligned with respect to the key-groups in the range.
+	 *
+	 * @param rangeStart Start key-group of the range (inclusive)
+	 * @param rangeEnd End key-group of the range (inclusive)
+	 * @param offsets The aligned array of offsets for the given key-groups.
+	 */
+	public KeyGroupRangeOffsets(int rangeStart, int rangeEnd, long[] offsets) {
+		this(KeyGroupRange.of(rangeStart, rangeEnd), offsets);
+	}
+
+	/**
+	 * Creates key-group range with offsets from the given start key-group to end key-group.
+	 * All offsets are initially zero.
+	 *
+	 * @param rangeStart Start key-group of the range (inclusive)
+	 * @param rangeEnd End key-group of the range (inclusive)
+	 */
+	public KeyGroupRangeOffsets(int rangeStart, int rangeEnd) {
+		this(KeyGroupRange.of(rangeStart, rangeEnd));
+	}
+
+	/**
+	 * Creates key-group range with offsets for the given key-group range, where all offsets are initially zero.
+	 *
+	 * @param keyGroupRange The range of key-groups.
+	 */
+	public KeyGroupRangeOffsets(KeyGroupRange keyGroupRange) {
+		this(keyGroupRange, new long[keyGroupRange.getNumberOfKeyGroups()]);
+	}
+
+	/**
+	 * Returns the offset for the given key-group. The key-group must be contained in the range.
+	 *
+	 * @param keyGroup Key-group for which we query the offset. Key-group must be contained in the range.
+	 * @return The offset for the given key-group which must be contained in the range.
+	 */
+	public long getKeyGroupOffset(int keyGroup) {
+		return offsets[computeKeyGroupIndex(keyGroup)];
+	}
+
+	/**
+	 * Sets the offset for the given key-group. The key-group must be contained in the range.
+	 *
+	 * @param keyGroup Key-group for which we set the offset. Must be contained in the range.
+	 * @param offset Offset for the key-group.
+	 */
+	public void setKeyGroupOffset(int keyGroup, long offset) {
+		offsets[computeKeyGroupIndex(keyGroup)] = offset;
+	}
+
+	/**
+	 * Returns a key-group range with offsets which is the intersection of the internal key-group range with the given
+	 * key-group range.
+	 *
+	 * @param keyGroupRange Key-group range to intersect with the internal key-group range.
+	 * @return The key-group range with offsets for the intersection of the internal key-group range with the given
+	 *         key-group range.
+	 */
+	public KeyGroupRangeOffsets getIntersection(KeyGroupRange keyGroupRange) {
+		Preconditions.checkNotNull(keyGroupRange);
+		KeyGroupRange intersection = this.keyGroupRange.getIntersection(keyGroupRange);
+		long[] subOffsets = new long[intersection.getNumberOfKeyGroups()];
+		if(subOffsets.length > 0) {
+			System.arraycopy(
+					offsets,
+					computeKeyGroupIndex(intersection.getStartKeyGroup()),
+					subOffsets,
+					0,
+					subOffsets.length);
+		}
+		return new KeyGroupRangeOffsets(intersection, subOffsets);
+	}
+
+	public KeyGroupRange getKeyGroupRange() {
+		return keyGroupRange;
+	}
+
+	@Override
+	public Iterator<Tuple2<Integer, Long>> iterator() {
+		return new KeyGroupOffsetsIterator();
+	}
+
+	private int computeKeyGroupIndex(int keyGroup) {
+		return keyGroup - keyGroupRange.getStartKeyGroup();
+	}
+
+	/**
+	 * Iterator for the Key-group/Offset pairs.
+	 */
+	private final class KeyGroupOffsetsIterator implements Iterator<Tuple2<Integer, Long>> {
+
+		public KeyGroupOffsetsIterator() {
+			this.keyGroupIterator = keyGroupRange.iterator();
+		}
+
+		private final Iterator<Integer> keyGroupIterator;
+
+		@Override
+		public boolean hasNext() {
+			return keyGroupIterator.hasNext();
+		}
+
+		@Override
+		public Tuple2<Integer, Long> next() {
+			Integer currentKeyGroup = keyGroupIterator.next();
+			Tuple2<Integer,Long> result = new Tuple2<>(
+					currentKeyGroup,
+					offsets[currentKeyGroup - keyGroupRange.getStartKeyGroup()]);
+			return result;
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException("Unsupported by this iterator!");
+		}
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (!(o instanceof KeyGroupRangeOffsets)) {
+			return false;
+		}
+
+		KeyGroupRangeOffsets that = (KeyGroupRangeOffsets) o;
+
+		if (keyGroupRange != null ? !keyGroupRange.equals(that.keyGroupRange) : that.keyGroupRange != null) {
+			return false;
+		}
+		return Arrays.equals(offsets, that.offsets);
+	}
+
+	@Override
+	public int hashCode() {
+		int result = keyGroupRange != null ? keyGroupRange.hashCode() : 0;
+		result = 31 * result + Arrays.hashCode(offsets);
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "KeyGroupRangeOffsets{" +
+				"keyGroupRange=" + keyGroupRange +
+				", offsets=" + Arrays.toString(offsets) +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
new file mode 100644
index 0000000..0a36f92
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A handle to the partitioned stream operator state after it has been checkpointed. This state
+ * consists of a range of key group snapshots. A key group is subset of the available
+ * key space. The key groups are identified by their key group indices.
+ */
+public class KeyGroupsStateHandle implements StateObject {
+
+	private static final long serialVersionUID = -8070326169926626355L;
+
+	/** Range of key-groups with their respective offsets in the stream state */
+	private final KeyGroupRangeOffsets groupRangeOffsets;
+
+	/** Inner stream handle to the actual states of the key-groups in the range */
+	private final StreamStateHandle stateHandle;
+
+	/**
+	 *
+	 * @param groupRangeOffsets range of key-group ids that in the state of this handle
+	 * @param streamStateHandle handle to the actual state of the key-groups
+	 */
+	public KeyGroupsStateHandle(KeyGroupRangeOffsets groupRangeOffsets, StreamStateHandle streamStateHandle) {
+		Preconditions.checkNotNull(groupRangeOffsets);
+		Preconditions.checkNotNull(streamStateHandle);
+
+		this.groupRangeOffsets = groupRangeOffsets;
+		this.stateHandle = streamStateHandle;
+	}
+
+	/**
+	 *
+	 * @return iterable over the key-group range for the key-group state referenced by this handle
+	 */
+	public Iterable<Integer> keyGroups() {
+		return groupRangeOffsets.getKeyGroupRange();
+	}
+
+
+	/**
+	 *
+	 * @param keyGroupId the id of a key-group
+	 * @return true if the provided key-group id is contained in the key-group range of this handle
+	 */
+	public boolean containsKeyGroup(int keyGroupId) {
+		return groupRangeOffsets.getKeyGroupRange().contains(keyGroupId);
+	}
+
+	/**
+	 *
+	 * @param keyGroupId the id of a key-group. the id must be contained in the range of this handle.
+	 * @return offset to the position of data for the provided key-group in the stream referenced by this state handle
+	 */
+	public long getOffsetForKeyGroup(int keyGroupId) {
+		return groupRangeOffsets.getKeyGroupOffset(keyGroupId);
+	}
+
+	/**
+	 *
+	 * @param keyGroupRange a key group range to intersect.
+	 * @return key-group state over a range that is the intersection between this handle's key-group range and the
+	 *          provided key-group range.
+	 */
+	public KeyGroupsStateHandle getKeyGroupIntersection(KeyGroupRange keyGroupRange) {
+		return new KeyGroupsStateHandle(groupRangeOffsets.getIntersection(keyGroupRange), stateHandle);
+	}
+
+	/**
+	 *
+	 * @return the internal key-group range to offsets metadata
+	 */
+	public KeyGroupRangeOffsets getGroupRangeOffsets() {
+		return groupRangeOffsets;
+	}
+
+	/**
+	 *
+	 * @return number of key-groups in the key-group range of this handle
+	 */
+	public int getNumberOfKeyGroups() {
+		return groupRangeOffsets.getKeyGroupRange().getNumberOfKeyGroups();
+	}
+
+	/**
+	 *
+	 * @return the inner stream state handle to the actual key-group states
+	 */
+	public StreamStateHandle getStateHandle() {
+		return stateHandle;
+	}
+
+	@Override
+	public void discardState() throws Exception {
+		stateHandle.discardState();
+	}
+
+	@Override
+	public long getStateSize() throws Exception {
+		return stateHandle.getStateSize();
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+
+		if (!(o instanceof KeyGroupsStateHandle)) {
+			return false;
+		}
+
+		KeyGroupsStateHandle that = (KeyGroupsStateHandle) o;
+
+		if (!groupRangeOffsets.equals(that.groupRangeOffsets)) {
+			return false;
+		}
+		return stateHandle.equals(that.stateHandle);
+
+	}
+
+	@Override
+	public int hashCode() {
+		int result = groupRangeOffsets.hashCode();
+		result = 31 * result + stateHandle.hashCode();
+		return result;
+	}
+
+	@Override
+	public String toString() {
+		return "KeyGroupsStateHandle{" +
+				"groupRangeOffsets=" + groupRangeOffsets +
+				", data=" + stateHandle +
+				'}';
+	}
+
+	@Override
+	public void close() throws IOException {
+		stateHandle.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
deleted file mode 100644
index 4e7531f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * A StateHandle that includes the operator states directly.
- */
-public class LocalStateHandle<T extends Serializable> implements StateHandle<T> {
-
-	private static final long serialVersionUID = 2093619217898039610L;
-
-	private final T state;
-
-	public LocalStateHandle(T state) {
-		this.state = state;
-	}
-
-	@Override
-	public T getState(ClassLoader userCodeClassLoader) {
-		// The object has been deserialized correctly before
-		return state;
-	}
-
-	@Override
-	public void discardState() {}
-
-	@Override
-	public long getStateSize() {
-		return 0;
-	}
-
-	@Override
-	public void close() throws IOException {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
new file mode 100644
index 0000000..d547624
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStateHandle.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+
+/**
+ * Handle to state that can be read back again via {@link #retrieveState()}.
+ */
+public interface RetrievableStateHandle<T extends Serializable> extends StateObject {
+
+	/**
+	 * Retrieves the object that was previously written to state.
+	 */
+	T retrieveState() throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
new file mode 100644
index 0000000..e3538af
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * Wrapper around a {@link StreamStateHandle} to make the referenced state object retrievable trough a simple get call.
+ * This implementation expects that the object was serialized through default serialization of Java's
+ * {@link java.io.ObjectOutputStream}.
+ *
+ * @param <T> type of the retrievable object which is stored under the wrapped stream handle
+ */
+public class RetrievableStreamStateHandle<T extends Serializable> implements
+		StreamStateHandle, RetrievableStateHandle<T>, Closeable {
+
+	private static final long serialVersionUID = 314567453677355L;
+	/** wrapped inner stream state handle from which we deserialize on retrieval */
+	private final StreamStateHandle wrappedStreamStateHandle;
+
+	public RetrievableStreamStateHandle(StreamStateHandle streamStateHandle) {
+		this.wrappedStreamStateHandle = Preconditions.checkNotNull(streamStateHandle);
+	}
+
+	public RetrievableStreamStateHandle(Path filePath) {
+		Preconditions.checkNotNull(filePath);
+		this.wrappedStreamStateHandle = new FileStateHandle(filePath);
+	}
+
+	@Override
+	public T retrieveState() throws Exception {
+		try (FSDataInputStream in = openInputStream()) {
+			return InstantiationUtil.deserializeObject(in);
+		}
+	}
+
+	@Override
+	public FSDataInputStream openInputStream() throws Exception {
+		return wrappedStreamStateHandle.openInputStream();
+	}
+
+	@Override
+	public void discardState() throws Exception {
+		wrappedStreamStateHandle.discardState();
+	}
+
+	@Override
+	public long getStateSize() throws Exception {
+		return wrappedStreamStateHandle.getStateSize();
+	}
+
+	@Override
+	public void close() throws IOException {
+		wrappedStreamStateHandle.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
index f17eb6e..39e7ed2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackendFactory.java
@@ -20,13 +20,15 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.configuration.Configuration;
 
+import java.io.Serializable;
+
 /**
  * A factory to create a specific state backend. The state backend creation gets a Configuration
  * object that can be used to read further config values.
  * 
  * @param <T> The type of the state backend created.
  */
-public interface StateBackendFactory<T extends AbstractStateBackend> {
+public interface StateBackendFactory<T extends AbstractStateBackend> extends Serializable {
 
 	/**
 	 * Creates the state backend, optionally using the given configuration.

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
new file mode 100644
index 0000000..3c5157e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtil.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.IOException;
+
+/**
+ * Helpers for {@link StateObject} related code.
+ */
+public class StateUtil {
+
+	private StateUtil() {
+		throw new AssertionError();
+	}
+
+	/**
+	 * Iterates through the passed state handles and calls discardState() on each handle that is not null. All
+	 * occurring exceptions are suppressed and collected until the iteration is over and emitted as a single exception.
+	 *
+	 * @param handlesToDiscard State handles to discard. Passed iterable is allowed to deliver null values.
+	 * @throws Exception exception that is a collection of all suppressed exceptions that were caught during iteration
+	 */
+	public static void bestEffortDiscardAllStateObjects(
+			Iterable<? extends StateObject> handlesToDiscard) throws Exception {
+
+		if (handlesToDiscard != null) {
+
+			Exception suppressedExceptions = null;
+
+			for (StateObject state : handlesToDiscard) {
+
+				if (state != null) {
+					try {
+						state.discardState();
+					} catch (Exception ex) {
+						//best effort to still cleanup other states and deliver exceptions in the end
+						if (suppressedExceptions == null) {
+							suppressedExceptions = new Exception(ex);
+						}
+						suppressedExceptions.addSuppressed(ex);
+					}
+				}
+			}
+
+			if (suppressedExceptions != null) {
+				throw suppressedExceptions;
+			}
+		}
+	}
+
+	/**
+	 * Iterates through the passed state handles and calls discardState() on each handle that is not null. All
+	 * occurring exceptions are suppressed and collected until the iteration is over and emitted as a single exception.
+	 *
+	 * @param handlesToDiscard State handles to discard. Passed iterable is allowed to deliver null values.
+	 * @throws Exception exception that is a collection of all suppressed exceptions that were caught during iteration
+	 */
+	public static void bestEffortCloseAllStateObjects(
+			Iterable<? extends StateObject> handlesToDiscard) throws IOException {
+
+		if (handlesToDiscard != null) {
+
+			IOException suppressedExceptions = null;
+
+			for (StateObject state : handlesToDiscard) {
+
+				if (state != null) {
+					try {
+						state.close();
+					} catch (Exception ex) {
+						//best effort to still cleanup other states and deliver exceptions in the end
+						if (suppressedExceptions == null) {
+							suppressedExceptions = new IOException(ex);
+						}
+						suppressedExceptions.addSuppressed(ex);
+					}
+				}
+			}
+
+			if (suppressedExceptions != null) {
+				throw suppressedExceptions;
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
deleted file mode 100644
index b130c70..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-
-/**
- * A collection of utility methods for dealing with operator state.
- */
-public class StateUtils {
-
-	/**
-	 * Utility method to define a common generic bound to be used for setting a
-	 * generic state handle on a generic state carrier.
-	 * 
-	 * This has no impact on runtime, since internally, it performs unchecked
-	 * casts. The purpose is merely to allow the use of generic interfaces
-	 * without resorting to raw types, by giving the compiler a common type
-	 * bound.
-	 * 
-	 * @param op
-	 *            The state carrier operator.
-	 * @param state
-	 *            The state handle.
-	 * @param <T>
-	 *            Type bound for the
-	 */
-	public static <T extends StateHandle<?>> void setOperatorState(StatefulTask<?> op, StateHandle<?> state)
-			throws Exception {
-
-		@SuppressWarnings("unchecked")
-		StatefulTask<T> typedOp = (StatefulTask<T>) op;
-		@SuppressWarnings("unchecked")
-		T typedHandle = (T) state;
-
-		typedOp.setInitialState(typedHandle);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/** Do not instantiate */
-	private StateUtils() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
index 891243b..46e4299 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -18,19 +18,17 @@
 
 package org.apache.flink.runtime.state;
 
-import java.io.InputStream;
-import java.io.Serializable;
+import org.apache.flink.core.fs.FSDataInputStream;
 
 /**
- * A state handle that produces an input stream when resolved.
+ * A {@link StateObject} that represents state that was written to a stream. The data can be read
+ * back via {@link #openInputStream()}.
  */
-public interface StreamStateHandle extends StateHandle<InputStream> {
+public interface StreamStateHandle extends StateObject {
 
 	/**
-	 * Converts this stream state handle into a state handle that de-serializes
-	 * the stream into an object using Java's serialization mechanism.
-	 *
-	 * @return The state handle that automatically de-serializes.
+	 * Returns an {@link FSDataInputStream} that can be used to read back the data that
+	 * was previously written to the stream.
 	 */
-	<T extends Serializable> StateHandle<T> toSerializableHandle();
+	FSDataInputStream openInputStream() throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
deleted file mode 100644
index 0585062..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateHandle.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.AbstractCloseableHandle;
-import org.apache.flink.runtime.state.StateObject;
-
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Base class for state that is stored in a file.
- */
-public abstract class AbstractFileStateHandle extends AbstractCloseableHandle implements StateObject {
-
-	private static final long serialVersionUID = 350284443258002355L;
-
-	/** The path to the file in the filesystem, fully describing the file system */
-	private final Path filePath;
-
-	/** Cached file system handle */
-	private transient FileSystem fs;
-
-	/**
-	 * Creates a new file state for the given file path.
-	 * 
-	 * @param filePath The path to the file that stores the state.
-	 */
-	protected AbstractFileStateHandle(Path filePath) {
-		this.filePath = checkNotNull(filePath);
-	}
-
-	/**
-	 * Gets the path where this handle's state is stored.
-	 * @return The path where this handle's state is stored.
-	 */
-	public Path getFilePath() {
-		return filePath;
-	}
-
-	/**
-	 * Discard the state by deleting the file that stores the state. If the parent directory
-	 * of the state is empty after deleting the state file, it is also deleted.
-	 * 
-	 * @throws Exception Thrown, if the file deletion (not the directory deletion) fails.
-	 */
-	@Override
-	public void discardState() throws Exception {
-		getFileSystem().delete(filePath, false);
-
-		// send a call to delete the checkpoint directory containing the file. This will
-		// fail (and be ignored) when some files still exist
-		try {
-			getFileSystem().delete(filePath.getParent(), false);
-		} catch (IOException ignored) {}
-	}
-
-	/**
-	 * Gets the file system that stores the file state.
-	 * @return The file system that stores the file state.
-	 * @throws IOException Thrown if the file system cannot be accessed.
-	 */
-	protected FileSystem getFileSystem() throws IOException {
-		if (fs == null) {
-			fs = FileSystem.get(filePath.toUri());
-		}
-		return fs;
-	}
-
-	/**
-	 * Returns the file size in bytes.
-	 *
-	 * @return The file size in bytes.
-	 * @throws IOException Thrown if the file system cannot be accessed.
-	 */
-	protected long getFileSize() throws IOException {
-		return getFileSystem().getFileStatus(filePath).getLen();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
index 0692541..51e8b5a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -38,8 +38,9 @@ import java.util.Map;
  * @param <N> The type of the namespace in the snapshot state.
  * @param <SV> The type of the state value.
  */
-public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>> 
-		extends AbstractFileStateHandle implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {
+public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD extends StateDescriptor<S, ?>>
+		extends FileStateHandle
+		implements KvStateSnapshot<K, N, S, SD, FsStateBackend> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -132,7 +133,7 @@ public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD exte
 	 * @throws IOException Thrown if the file system cannot be accessed.
 	 */
 	@Override
-	public long getStateSize() throws IOException {
-		return getFileSize();
+	public void discardState() throws Exception {
+		super.discardState();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
deleted file mode 100644
index 34a1cb0..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileSerializableStateHandle.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.Serializable;
-
-/**
- * A state handle that points to state stored in a file via Java Serialization.
- * 
- * @param <T> The type of state pointed to by the state handle.
- */
-public class FileSerializableStateHandle<T extends Serializable> extends AbstractFileStateHandle implements StateHandle<T> {
-
-	private static final long serialVersionUID = -657631394290213622L;
-
-	/**
-	 * Creates a new FileSerializableStateHandle pointing to state at the given file path.
-	 * 
-	 * @param filePath The path to the file containing the checkpointed state.
-	 */
-	public FileSerializableStateHandle(Path filePath) {
-		super(filePath);
-	}
-
-	@Override
-	@SuppressWarnings("unchecked")
-	public T getState(ClassLoader classLoader) throws Exception {
-		ensureNotClosed();
-
-		try (FSDataInputStream inStream = getFileSystem().open(getFilePath())) {
-			// make sure any deserialization can be aborted
-			registerCloseable(inStream);
-
-			ObjectInputStream ois = new InstantiationUtil.ClassLoaderObjectInputStream(inStream, classLoader);
-			return (T) ois.readObject();
-		}
-	}
-
-	/**
-	 * Returns the file size in bytes.
-	 *
-	 * @return The file size in bytes.
-	 * @throws IOException Thrown if the file system cannot be accessed.
-	 */
-	@Override
-	public long getStateSize() throws IOException {
-		return getFileSize();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
new file mode 100644
index 0000000..871e56c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractCloseableHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
+import java.io.IOException;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * {@link StreamStateHandle} for state that was written to a file stream. The written data is
+ * identifier by the file path. The state can be read again by calling {@link #openInputStream()}.
+ */
+public class FileStateHandle extends AbstractCloseableHandle implements StreamStateHandle {
+
+	private static final long serialVersionUID = 350284443258002355L;
+
+	/**
+	 * The path to the file in the filesystem, fully describing the file system
+	 */
+	private final Path filePath;
+
+	/**
+	 * Cached file system handle
+	 */
+	private transient FileSystem fs;
+
+	/**
+	 * Creates a new file state for the given file path.
+	 *
+	 * @param filePath The path to the file that stores the state.
+	 */
+	public FileStateHandle(Path filePath) {
+		this.filePath = requireNonNull(filePath);
+	}
+
+	/**
+	 * Gets the path where this handle's state is stored.
+	 *
+	 * @return The path where this handle's state is stored.
+	 */
+	public Path getFilePath() {
+		return filePath;
+	}
+
+	@Override
+	public FSDataInputStream openInputStream() throws Exception {
+		ensureNotClosed();
+		FSDataInputStream inputStream = getFileSystem().open(filePath);
+		registerCloseable(inputStream);
+		return inputStream;
+	}
+
+	/**
+	 * Discard the state by deleting the file that stores the state. If the parent directory
+	 * of the state is empty after deleting the state file, it is also deleted.
+	 *
+	 * @throws Exception Thrown, if the file deletion (not the directory deletion) fails.
+	 */
+	@Override
+	public void discardState() throws Exception {
+		getFileSystem().delete(filePath, false);
+
+		// send a call to delete the checkpoint directory containing the file. This will
+		// fail (and be ignored) when some files still exist
+		try {
+			getFileSystem().delete(filePath.getParent(), false);
+		} catch (IOException ignored) {
+		}
+	}
+
+	/**
+	 * Returns the file size in bytes.
+	 *
+	 * @return The file size in bytes.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	@Override
+	public long getStateSize() throws IOException {
+		return getFileSystem().getFileStatus(filePath).getLen();
+	}
+
+	/**
+	 * Gets the file system that stores the file state.
+	 *
+	 * @return The file system that stores the file state.
+	 * @throws IOException Thrown if the file system cannot be accessed.
+	 */
+	private FileSystem getFileSystem() throws IOException {
+		if (fs == null) {
+			fs = FileSystem.get(filePath.toUri());
+		}
+		return fs;
+	}
+
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (!(o instanceof FileStateHandle)) {
+			return false;
+		}
+
+		FileStateHandle that = (FileStateHandle) o;
+		return filePath.equals(that.filePath);
+
+	}
+
+	@Override
+	public int hashCode() {
+		return filePath.hashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
deleted file mode 100644
index 5bfb4ee..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStreamStateHandle.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-
-/**
- * A state handle that points to state in a file system, accessible as an input stream.
- */
-public class FileStreamStateHandle extends AbstractFileStateHandle implements StreamStateHandle {
-
-	private static final long serialVersionUID = -6826990484549987311L;
-
-	/**
-	 * Creates a new FileStreamStateHandle pointing to state at the given file path.
-	 * 
-	 * @param filePath The path to the file containing the checkpointed state.
-	 */
-	public FileStreamStateHandle(Path filePath) {
-		super(filePath);
-	}
-
-	@Override
-	public InputStream getState(ClassLoader userCodeClassLoader) throws Exception {
-		ensureNotClosed();
-
-		InputStream inStream = getFileSystem().open(getFilePath());
-		// make sure the state handle is cancelable
-		registerCloseable(inStream);
-
-		return inStream; 
-	}
-
-	/**
-	 * Returns the file size in bytes.
-	 *
-	 * @return The file size in bytes.
-	 * @throws IOException Thrown if the file system cannot be accessed.
-	 */
-	@Override
-	public long getStateSize() throws IOException {
-		return getFileSize();
-	}
-
-	@Override
-	public <T extends Serializable> StateHandle<T> toSerializableHandle() {
-		FileSerializableStateHandle<T> handle = new FileSerializableStateHandle<>(getFilePath());
-
-		// forward closed status
-		if (isClosed()) {
-			try {
-				handle.close();
-			} catch (IOException e) {
-				// should not happen on a fresh handle, but forward anyways
-				throw new RuntimeException(e);
-			}
-		}
-
-		return handle;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 61cf741..a3f4682 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -32,15 +32,12 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.Arrays;
@@ -294,24 +291,6 @@ public class FsStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
-	public <S extends Serializable> StateHandle<S> checkpointStateSerializable(
-			S state, long checkpointID, long timestamp) throws Exception
-	{
-		checkFileSystemInitialized();
-		
-		Path checkpointDir = createCheckpointDirPath(checkpointID);
-		int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
-
-		FsCheckpointStateOutputStream stream = 
-			new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
-		
-		try (ObjectOutputStream os = new ObjectOutputStream(stream)) {
-			os.writeObject(state);
-			return stream.closeAndGetHandle().toSerializableHandle();
-		}
-	}
-
-	@Override
 	public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
 		checkFileSystemInitialized();
 
@@ -520,6 +499,11 @@ public class FsStateBackend extends AbstractStateBackend {
 			}
 		}
 
+		@Override
+		public void sync() throws IOException {
+			outStream.sync();
+		}
+
 		/**
 		 * If the stream is only closed, we remove the produced file (cleanup through the auto close
 		 * feature, for example). This method throws no exception if the deletion fails, but only
@@ -559,7 +543,7 @@ public class FsStateBackend extends AbstractStateBackend {
 						flush();
 						outStream.close();
 						closed = true;
-						return new FileStreamStateHandle(statePath);
+						return new FileStateHandle(statePath);
 					}
 				}
 				else {

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index ba6de42..a42bec2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -18,28 +18,32 @@
 
 package org.apache.flink.runtime.state.memory;
 
+import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.state.AbstractCloseableHandle;
-import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.InstantiationUtil;
+
+import org.apache.flink.util.Preconditions;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.Serializable;
+import java.util.Arrays;
 
 /**
  * A state handle that contains stream state in a byte array.
  */
-public final class ByteStreamStateHandle extends AbstractCloseableHandle implements StreamStateHandle {
+public class ByteStreamStateHandle extends AbstractCloseableHandle implements StreamStateHandle {
 
 	private static final long serialVersionUID = -5280226231200217594L;
-	
-	/** the state data */
-	private final byte[] data;
+
+	/**
+	 * the state data
+	 */
+	protected final byte[] data;
 
 	/**
 	 * Creates a new ByteStreamStateHandle containing the given data.
-	 * 
+	 *
 	 * @param data The state data.
 	 */
 	public ByteStreamStateHandle(byte[] data) {
@@ -47,17 +51,39 @@ public final class ByteStreamStateHandle extends AbstractCloseableHandle impleme
 	}
 
 	@Override
-	public InputStream getState(ClassLoader userCodeClassLoader) throws Exception {
+	public FSDataInputStream openInputStream() throws Exception {
 		ensureNotClosed();
 
-		ByteArrayInputStream stream = new ByteArrayInputStream(data);
-		registerCloseable(stream);
+		FSDataInputStream inputStream = new FSDataInputStream() {
+			int index = 0;
+
+			@Override
+			public void seek(long desired) throws IOException {
+				Preconditions.checkArgument(desired >= 0 && desired < Integer.MAX_VALUE);
+				index = (int) desired;
+			}
+
+			@Override
+			public long getPos() throws IOException {
+				return index;
+			}
+
+			@Override
+			public int read() throws IOException {
+				return index < data.length ? data[index++] & 0xFF : -1;
+			}
+		};
+		registerCloseable(inputStream);
+		return inputStream;
+	}
 
-		return stream;
+	public byte[] getData() {
+		return data;
 	}
 
 	@Override
-	public void discardState() {}
+	public void discardState() {
+	}
 
 	@Override
 	public long getStateSize() {
@@ -65,19 +91,27 @@ public final class ByteStreamStateHandle extends AbstractCloseableHandle impleme
 	}
 
 	@Override
-	public <T extends Serializable> StateHandle<T> toSerializableHandle() {
-		SerializedStateHandle<T> serializableHandle = new SerializedStateHandle<T>(data);
-
-		// forward the closed status
-		if (isClosed()) {
-			try {
-				serializableHandle.close();
-			} catch (IOException e) {
-				// should not happen on a fresh handle, but forward anyways
-				throw new RuntimeException(e);
-			}
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
 		}
+		if (!(o instanceof ByteStreamStateHandle)) {
+			return false;
+		}
+
+		ByteStreamStateHandle that = (ByteStreamStateHandle) o;
+		return Arrays.equals(data, that.data);
+
+	}
+
+	@Override
+	public int hashCode() {
+		int result = super.hashCode();
+		result = 31 * result + Arrays.hashCode(data);
+		return result;
+	}
 
-		return serializableHandle;
+	public static StreamStateHandle fromSerializable(Serializable value) throws IOException {
+		return new ByteStreamStateHandle(InstantiationUtil.serializeObject(value));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 7b9d21b..af84394 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -27,13 +27,12 @@ import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.Serializable;
 
 /**
  * A {@link AbstractStateBackend} that stores all its data and checkpoints in memory and has no
@@ -104,27 +103,6 @@ public class MemoryStateBackend extends AbstractStateBackend {
 		return new MemFoldingState<>(keySerializer, namespaceSerializer, stateDesc);
 	}
 
-	/**
-	 * Serialized the given state into bytes using Java serialization and creates a state handle that
-	 * can re-create that state.
-	 *
-	 * @param state The state to checkpoint.
-	 * @param checkpointID The ID of the checkpoint.
-	 * @param timestamp The timestamp of the checkpoint.
-	 * @param <S> The type of the state.
-	 *
-	 * @return A state handle that contains the given state serialized as bytes.
-	 * @throws Exception Thrown, if the serialization fails.
-	 */
-	@Override
-	public <S extends Serializable> StateHandle<S> checkpointStateSerializable(
-			S state, long checkpointID, long timestamp) throws Exception
-	{
-		SerializedStateHandle<S> handle = new SerializedStateHandle<>(state);
-		checkSize(handle.getSizeOfSerializedState(), maxStateSize);
-		return new SerializedStateHandle<S>(state);
-	}
-
 	@Override
 	public CheckpointStateOutputStream createCheckpointStateOutputStream(
 			long checkpointID, long timestamp) throws Exception
@@ -177,6 +155,14 @@ public class MemoryStateBackend extends AbstractStateBackend {
 			os.write(b, off, len);
 		}
 
+		@Override
+		public void flush() throws IOException {
+			os.flush();
+		}
+
+		@Override
+		public void sync() throws IOException { }
+
 		// --------------------------------------------------------------------
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
deleted file mode 100644
index 4420470..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/SerializedStateHandle.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.memory;
-
-import org.apache.flink.runtime.state.AbstractCloseableHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.InstantiationUtil;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-/**
- * A state handle that represents its state in serialized form as bytes.
- *
- * @param <T> The type of state represented by this state handle.
- */
-public class SerializedStateHandle<T extends Serializable> extends AbstractCloseableHandle implements StateHandle<T> {
-	
-	private static final long serialVersionUID = 4145685722538475769L;
-
-	/** The serialized data */
-	private final byte[] serializedData;
-	
-	/**
-	 * Creates a new serialized state handle, eagerly serializing the given state object.
-	 * 
-	 * @param value The state object.
-	 * @throws IOException Thrown, if the serialization fails.
-	 */
-	public SerializedStateHandle(T value) throws IOException {
-		this.serializedData = value == null ? null : InstantiationUtil.serializeObject(value);
-	}
-
-	/**
-	 * Creates a new serialized state handle, based in the given already serialized data.
-	 * 
-	 * @param serializedData The serialized data.
-	 */
-	public SerializedStateHandle(byte[] serializedData) {
-		this.serializedData = serializedData;
-	}
-	
-	@Override
-	public T getState(ClassLoader classLoader) throws Exception {
-		if (classLoader == null) {
-			throw new NullPointerException();
-		}
-
-		ensureNotClosed();
-		return serializedData == null ? null : InstantiationUtil.<T>deserializeObject(serializedData, classLoader);
-	}
-
-	/**
-	 * Gets the size of the serialized state.
-	 * @return The size of the serialized state.
-	 */
-	public int getSizeOfSerializedState() {
-		return serializedData.length;
-	}
-
-	/**
-	 * Discarding heap-memory backed state is a no-op, so this method does nothing.
-	 */
-	@Override
-	public void discardState() {}
-
-	@Override
-	public long getStateSize() {
-		return serializedData.length;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 6958784..d54826a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
@@ -36,10 +35,13 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -240,39 +242,21 @@ public class RuntimeEnvironment implements Environment {
 
 	@Override
 	public void acknowledgeCheckpoint(long checkpointId) {
-		acknowledgeCheckpoint(checkpointId, null);
+		acknowledgeCheckpoint(checkpointId, null, null);
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
-		// try and create a serialized version of the state handle
-		SerializedValue<StateHandle<?>> serializedState;
-		long stateSize;
-
-		if (state == null) {
-			serializedState = null;
-			stateSize = 0;
-		} else {
-			try {
-				serializedState = new SerializedValue<StateHandle<?>>(state);
-			} catch (Exception e) {
-				throw new RuntimeException("Failed to serialize state handle during checkpoint confirmation", e);
-			}
-
-			try {
-				stateSize = state.getStateSize();
-			}
-			catch (Exception e) {
-				throw new RuntimeException("Failed to fetch state handle size", e);
-			}
-		}
-		
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+			List<KeyGroupsStateHandle> keyGroupStateHandles) {
+
 		AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
 				jobId,
 				executionId,
 				checkpointId,
-				serializedState,
-				stateSize);
+				chainedStateHandle,
+				keyGroupStateHandles);
 
 		jobManager.tell(message);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c98d512..73601c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -58,10 +58,11 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.StateUtils;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 
+import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -223,9 +224,17 @@ public class Task implements Runnable {
 	/** Serial executor for asynchronous calls (checkpoints, etc), lazily initialized */
 	private volatile ExecutorService asyncCallDispatcher;
 
-	/** The handle to the state that the operator was initialized with. Will be set to null after the
-	 * initialization, to be memory friendly */
-	private volatile SerializedValue<StateHandle<?>> operatorState;
+	/**
+	 * The handle to the chained operator state that the task was initialized with. Will be set
+	 * to null after the initialization, to be memory friendly.
+	 */
+	private volatile ChainedStateHandle<StreamStateHandle> chainedOperatorState;
+
+	/**
+	 * The handle to the key group state that the task was initialized with. Will be set
+	 * to null after the initialization, to be memory friendly.
+	 */
+	private volatile List<KeyGroupsStateHandle> keyGroupStates;
 
 	/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
 	private long taskCancellationInterval;
@@ -257,8 +266,9 @@ public class Task implements Runnable {
 		this.requiredJarFiles = checkNotNull(tdd.getRequiredJarFiles());
 		this.requiredClasspaths = checkNotNull(tdd.getRequiredClasspaths());
 		this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName());
-		this.operatorState = tdd.getOperatorState();
+		this.chainedOperatorState = tdd.getOperatorState();
 		this.serializedExecutionConfig = checkNotNull(tdd.getSerializedExecutionConfig());
+		this.keyGroupStates = tdd.getKeyGroupState();
 
 		this.taskCancellationInterval = jobConfiguration.getLong(
 			ConfigConstants.TASK_CANCELLATION_INTERVAL_MILLIS,
@@ -538,21 +548,11 @@ public class Task implements Runnable {
 			// the state into the task. the state is non-empty if this is an execution
 			// of a task that failed but had backuped state from a checkpoint
 
-			// get our private reference onto the stack (be safe against concurrent changes)
-			SerializedValue<StateHandle<?>> operatorState = this.operatorState;
-
-			if (operatorState != null) {
+			if (chainedOperatorState != null || keyGroupStates != null) {
 				if (invokable instanceof StatefulTask) {
-					try {
-						StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader);
-						StatefulTask<?> op = (StatefulTask<?>) invokable;
-						StateUtils.setOperatorState(op, state);
-					}
-					catch (Exception e) {
-						throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);
-					}
-				}
-				else {
+					StatefulTask op = (StatefulTask) invokable;
+					op.setInitialState(chainedOperatorState, keyGroupStates);
+				} else {
 					throw new IllegalStateException("Found operator state for a non-stateful task invokable");
 				}
 			}
@@ -560,8 +560,8 @@ public class Task implements Runnable {
 			// be memory and GC friendly - since the code stays in invoke() for a potentially long time,
 			// we clear the reference to the state handle
 			//noinspection UnusedAssignment
-			operatorState = null;
-			this.operatorState = null;
+			this.chainedOperatorState = null;
+			this.keyGroupStates = null;
 
 			// ----------------------------------------------------------------
 			//  actual task core work
@@ -936,7 +936,7 @@ public class Task implements Runnable {
 			if (invokable instanceof StatefulTask) {
 
 				// build a local closure
-				final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
+				final StatefulTask statefulTask = (StatefulTask) invokable;
 				final String taskName = taskNameWithSubtask;
 
 				Runnable runnable = new Runnable() {
@@ -977,7 +977,7 @@ public class Task implements Runnable {
 			if (invokable instanceof StatefulTask) {
 
 				// build a local closure
-				final StatefulTask<?> statefulTask = (StatefulTask<?>) invokable;
+				final StatefulTask statefulTask = (StatefulTask) invokable;
 				final String taskName = taskNameWithSubtask;
 
 				Runnable runnable = new Runnable() {
@@ -1192,7 +1192,6 @@ public class Task implements Runnable {
 				// reason, we spawn a separate thread that repeatedly interrupts the user code until
 				// it exits
 				while (executer.isAlive()) {
-
 					// build the stack trace of where the thread is stuck, for the log
 					StringBuilder bld = new StringBuilder();
 					StackTraceElement[] stack = executer.getStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index b585fe6..91db564 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.filesystem.FileSystemStateStorageHelper;
 import org.apache.flink.util.ConfigurationUtil;
 import org.slf4j.Logger;
@@ -228,7 +228,7 @@ public class ZooKeeperUtils {
 
 		checkNotNull(configuration, "Configuration");
 
-		StateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph");
+		RetrievableStateStorageHelper<SubmittedJobGraph> stateStorage = createFileSystemStateStorage(configuration, "submittedJobGraph");
 
 		// ZooKeeper submitted jobs root dir
 		String zooKeeperSubmittedJobsPath = ConfigurationUtil.getStringWithDeprecatedKeys(
@@ -266,7 +266,7 @@ public class ZooKeeperUtils {
 				ConfigConstants.DEFAULT_ZOOKEEPER_CHECKPOINTS_PATH,
 				ConfigConstants.ZOOKEEPER_CHECKPOINTS_PATH);
 
-		StateStorageHelper<CompletedCheckpoint> stateStorage = createFileSystemStateStorage(
+		RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage = createFileSystemStateStorage(
 			configuration,
 			"completedCheckpoint");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
new file mode 100644
index 0000000..1434f74
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/RetrievableStateStorageHelper.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+
+import java.io.Serializable;
+
+/**
+ * State storage helper which is used by {@link ZooKeeperStateHandleStore} to persiste state before
+ * the state handle is written to ZooKeeper.
+ *
+ * @param <T> The type of the data that can be stored by this storage helper.
+ */
+public interface RetrievableStateStorageHelper<T extends Serializable> {
+
+	/**
+	 * Stores the given state and returns a state handle to it.
+	 *
+	 * @param state State to be stored
+	 * @return State handle to the stored state
+	 * @throws Exception
+	 */
+	RetrievableStateHandle<T> store(T state) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
deleted file mode 100644
index 36fb849..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/StateStorageHelper.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.zookeeper;
-
-import org.apache.flink.runtime.state.StateHandle;
-
-import java.io.Serializable;
-
-/**
- * State storage helper which is used by {@link ZooKeeperStateHandleStore} to persiste state before
- * the state handle is written to ZooKeeper.
- *
- * @param <T>
- */
-public interface StateStorageHelper<T extends Serializable> {
-
-	/**
-	 * Stores the given state and returns a state handle to it.
-	 *
-	 * @param state State to be stored
-	 * @return State handle to the stored state
-	 * @throws Exception
-	 */
-	StateHandle<T> store(T state) throws Exception;
-}


Mime
View raw message