flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [04/12] flink git commit: [streaming] Initial rework of the operator state interfaces
Date Thu, 25 Jun 2015 17:21:37 GMT
[streaming] Initial rework of the operator state interfaces


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7e24580
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7e24580
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7e24580

Branch: refs/heads/master
Commit: a7e24580a2178b27ea77e7327c39ad7e75cac0a3
Parents: d42c732
Author: Gyula Fora <gyfora@apache.org>
Authored: Sun May 17 20:33:38 2015 +0200
Committer: Gyula Fora <gyfora@apache.org>
Committed: Thu Jun 25 16:38:06 2015 +0200

----------------------------------------------------------------------
 .../api/common/functions/RichMapFunction.java   |   2 +-
 .../api/common/functions/RuntimeContext.java    |  47 +++++
 .../util/AbstractRuntimeUDFContext.java         |  12 ++
 .../flink/api/common/state/OperatorState.java   |  67 ++++++
 .../api/common/state/StateCheckpointer.java     |  73 +++++++
 .../flink/runtime/state/LocalStateHandle.java   |  19 +-
 .../runtime/state/PartitionedStateHandle.java   |  47 +++++
 .../runtime/state/PartitionedStateStore.java    |  47 +++++
 .../apache/flink/runtime/state/StateUtils.java  |  40 ++--
 .../api/checkpoint/CheckpointCommitter.java     |   7 +-
 .../api/datastream/IterativeDataStream.java     |   2 +
 .../api/datastream/StreamProjection.java        |   3 +-
 .../api/functions/source/FileReadFunction.java  |   8 +-
 .../api/functions/source/SourceFunction.java    |   4 +-
 .../flink/streaming/api/graph/StreamConfig.java |  21 +-
 .../api/operators/AbstractStreamOperator.java   |   6 +-
 .../operators/AbstractUdfStreamOperator.java    |  68 +++---
 .../api/operators/StatefulStreamOperator.java   |   4 +-
 .../streaming/api/operators/StreamOperator.java |   8 +-
 .../streaming/api/state/BasicCheckpointer.java  |  37 ++++
 .../streaming/api/state/EagerStateStore.java    |  86 ++++++++
 .../streaming/api/state/LazyStateStore.java     | 117 ++++++++++
 .../state/PartitionedStreamOperatorState.java   | 126 +++++++++++
 .../api/state/StreamOperatorState.java          | 100 +++++++++
 .../runtime/io/BlockingQueueBroker.java         |   4 +-
 .../runtime/io/StreamRecordWriter.java          |   6 +-
 .../runtime/tasks/OneInputStreamTask.java       |   4 +-
 .../runtime/tasks/StreamIterationHead.java      |   1 -
 .../streaming/runtime/tasks/StreamTask.java     |  22 +-
 .../runtime/tasks/StreamingRuntimeContext.java  |  30 ++-
 .../runtime/tasks/TwoInputStreamTask.java       |   1 -
 .../serialization/DeserializationSchema.java    |   4 +-
 .../api/state/StatefulFunctionTest.java         | 211 +++++++++++++++++++
 .../runtime/tasks/SourceStreamTaskTest.java     |  68 +++---
 .../flink/streaming/util/MockCoContext.java     |   7 +-
 .../flink/streaming/util/MockContext.java       |   7 +-
 .../StreamCheckpointingITCase.java              | 118 ++++-------
 .../ProcessFailureStreamingRecoveryITCase.java  |  79 ++-----
 38 files changed, 1244 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
index 2005be0..7adb25b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RichMapFunction.java
@@ -34,7 +34,7 @@ import org.apache.flink.api.common.functions.RichFunction;
 public abstract class RichMapFunction<IN, OUT> extends AbstractRichFunction implements MapFunction<IN, OUT> {
 
 	private static final long serialVersionUID = 1L;
-
+	
 	@Override
 	public abstract OUT map(IN value) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index f68d2b0..a3b8f65 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -29,6 +29,8 @@ import org.apache.flink.api.common.accumulators.Histogram;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
 
 /**
  * A RuntimeContext contains information about the context in which functions are executed. Each parallel instance
@@ -160,4 +162,49 @@ public interface RuntimeContext {
 	 * @return The distributed cache of the worker executing this instance.
 	 */
 	DistributedCache getDistributedCache();
+	
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Returns the {@link OperatorState} of this operator instance, which can be
+	 * used to store and update user state in a fault tolerant fashion. The
+	 * state will be initialized by the provided default value, and the
+	 * {@link StateCheckpointer} will be used to draw the state snapshots.
+	 * 
+	 * <p>
+	 * When storing a {@link Serializable} state the user can omit the
+	 * {@link StateCheckpointer} in which case the full state will be written as
+	 * the snapshot.
+	 * </p>
+	 * 
+	 * @param defaultState
+	 *            Default value for the operator state. This will be returned
+	 *            the first time {@link OperatorState#getState()} (for every
+	 *            state partition) is called before
+	 *            {@link OperatorState#updateState(Object)}.
+	 * @param checkpointer
+	 *            The {@link StateCheckpointer} that will be used to draw
+	 *            snapshots from the user state.
+	 * @return The {@link OperatorState} for this instance.
+	 */
+	<S,C extends Serializable> OperatorState<S> getOperatorState(S defaultState, StateCheckpointer<S,C> checkpointer);
+
+	/**
+	 * Returns the {@link OperatorState} of this operator instance, which can be
+	 * used to store and update user state in a fault tolerant fashion. The
+	 * state will be initialized by the provided default value.
+	 * 
+	 * <p>
+	 * When storing a non-{@link Serializable} state the user needs to specify a
+	 * {@link StateCheckpointer} for drawing snapshots.
+	 * </p>
+	 * 
+	 * @param defaultState
+	 *            Default value for the operator state. This will be returned
+	 *            the first time {@link OperatorState#getState()} (for every
+	 *            state partition) is called before
+	 *            {@link OperatorState#updateState(Object)}.
+	 * @return The {@link OperatorState} for this instance.
+	 */
+	<S extends Serializable> OperatorState<S> getOperatorState(S defaultState);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 735fe8e..413565b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -33,6 +33,8 @@ import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.cache.DistributedCache;
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
 import org.apache.flink.core.fs.Path;
 
 /**
@@ -170,4 +172,14 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 		}
 		return (Accumulator<V, A>) accumulator;
 	}
+	
+	@Override
+	public <S, C extends Serializable> OperatorState<S> getOperatorState(S defaultState, StateCheckpointer<S, C> checkpointer) {
+		throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
+	}
+
+	@Override
+	public <S extends Serializable> OperatorState<S> getOperatorState(S defaultState) {
+		throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
new file mode 100644
index 0000000..5b3fa05
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.api.common.functions.MapFunction;
+
+/**
+ * Base class for all streaming operator states. It can represent both
+ * partitioned (when state partitioning is defined in the program) or
+ * non-partitioned user states.
+ * 
+ * State can be accessed and manipulated using the {@link #getState()} and
+ * {@link #updateState(T)} methods. These calls are only valid in the
+ * transformation call the operator represents, for instance inside
+ * {@link MapFunction#map()} and invalid in
+ * {@link #open(org.apache.flink.configuration.Configuration)} or
+ * {@link #close()}.
+ * 
+ * @param <T>
+ *            Type of the operator state
+ */
+public interface OperatorState<T> {
+
+	/**
+	 * Gets the current state for the operator. When the state is not
+	 * partitioned the returned state is the same for all inputs. If state
+	 * partitioning is applied the state returned depends on the current
+	 * operator input, as the operator maintains an independent state for each
+	 * partitions.
+	 * 
+	 * <p>
+	 * {@link #getState()} returns <code>null</code> if there is no state stored
+	 * in the operator. This is the expected behaviour before initializing the
+	 * state with {@link #updateState(T)}.
+	 * </p>
+	 * 
+	 * @return The operator state corresponding to the current input.
+	 */
+	T getState();
+
+	/**
+	 * Updates the operator state accessible by {@link #getState()} to the given
+	 * value. The next time {@link #getState()} is called (for the same state
+	 * partition) the returned state will represent the updated value.
+	 * 
+	 * @param state
+	 *            The updated state.
+	 */
+	void updateState(T state);
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
new file mode 100644
index 0000000..488e308
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateCheckpointer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import java.io.Serializable;
+
+/**
+ * Basic interface for creating {@link OperatorState} snapshots in stateful
+ * streaming programs.
+ * 
+ * The user needs to implement the {@link #snapshotState(S, long, long)} and
+ * {@link #restoreState(C)} methods that will be called to create and restore
+ * state snapshots of the given states.
+ * 
+ * <p>
+ * Note that the {@link OperatorState} is <i>synchronously</i> checkpointed.
+ * While the state is written, the state cannot be accessed or modified so the
+ * function needs not return a copy of its state, but may return a reference to
+ * its state.
+ * </p>
+ * 
+ * @param <S>
+ *            Type of the operator state.
+ * @param <C>
+ *            Type of the snapshot that will be persisted.
+ */
+public interface StateCheckpointer<S, C extends Serializable> {
+
+	/**
+	 * Takes a snapshot of a given operator state. The snapshot returned will be
+	 * persisted in the state backend for this job and restored upon failure.
+	 * This method is called for all state partitions in case of partitioned
+	 * state when creating a checkpoint.
+	 * 
+	 * @param state
+	 *            The state for which the snapshot needs to be taken
+	 * @param checkpointId
+	 *            The ID of the checkpoint.
+	 * @param checkpointTimestamp
+	 *            The timestamp of the checkpoint, as derived by
+	 *            System.currentTimeMillis() on the JobManager.
+	 * 
+	 * @return A snapshot of the operator state.
+	 */
+	public C snapshotState(S state, long checkpointId, long checkpointTimestamp);
+
+	/**
+	 * Restores the operator states from a given snapshot. The restores state
+	 * will be loaded back to the function. In case of partitioned state, each
+	 * partition is restored independently.
+	 * 
+	 * @param stateSnapshot
+	 *            The state snapshot that needs to be restored.
+	 * @return The state corresponding to the snapshot.
+	 */
+	public S restoreState(C stateSnapshot);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
index a53d8da..5ba372d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/LocalStateHandle.java
@@ -23,36 +23,33 @@ import java.io.Serializable;
 /**
  * A StateHandle that includes the operator states directly.
  */
-public class LocalStateHandle implements StateHandle<Serializable> {
+public class LocalStateHandle<T extends Serializable> implements StateHandle<T> {
 
 	private static final long serialVersionUID = 2093619217898039610L;
 
-	private final Serializable state;
+	private final T state;
 
-	public LocalStateHandle(Serializable state) {
+	public LocalStateHandle(T state) {
 		this.state = state;
 	}
 
 	@Override
-	public Serializable getState() {
+	public T getState() {
 		return state;
 	}
 
 	@Override
 	public void discardState() throws Exception {
 	}
-	
-	public static LocalStateHandleProvider createProvider(){
-		return new LocalStateHandleProvider();
-	}
 
-	private static class LocalStateHandleProvider implements StateHandleProvider<Serializable> {
+	public static class LocalStateHandleProvider<R extends Serializable> implements
+			StateHandleProvider<R> {
 
 		private static final long serialVersionUID = 4665419208932921425L;
 
 		@Override
-		public LocalStateHandle createStateHandle(Serializable state) {
-			return new LocalStateHandle(state);
+		public LocalStateHandle<R> createStateHandle(R state) {
+			return new LocalStateHandle<R>(state);
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
new file mode 100644
index 0000000..4119df1
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateHandle.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class PartitionedStateHandle implements
+		StateHandle<Map<Serializable, StateHandle<Serializable>>> {
+
+	private static final long serialVersionUID = 7505365403501402100L;
+
+	Map<Serializable, StateHandle<Serializable>> handles;
+
+	public PartitionedStateHandle(Map<Serializable, StateHandle<Serializable>> handles) {
+		this.handles = handles;
+	}
+
+	@Override
+	public Map<Serializable, StateHandle<Serializable>> getState() throws Exception {
+		return handles;
+	}
+
+	@Override
+	public void discardState() throws Exception {
+		for (StateHandle<Serializable> handle : handles.values()) {
+			handle.discardState();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateStore.java
new file mode 100644
index 0000000..8b73edf
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PartitionedStateStore.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/**
+ * Interface for storing and accessing partitioned state. The interface is
+ * designed in a way that allows implementations for lazily state access.
+ * 
+ * @param <S>
+ *            Type of the state.
+ * @param <C>
+ *            Type of the state snapshot.
+ */
+public interface PartitionedStateStore<S, C extends Serializable> {
+
+	S getStateForKey(Serializable key) throws Exception;
+
+	void setStateForKey(Serializable key, S state);
+
+	Map<Serializable, S> getPartitionedState() throws Exception;
+
+	Map<Serializable, StateHandle<C>> snapshotStates(long checkpointId, long checkpointTimestamp) throws Exception;
+
+	void restoreStates(Map<Serializable, StateHandle<C>> snapshots) throws Exception;
+
+	boolean containsKey(Serializable key);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
index fbd76ba..7977e09 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.state;
 
+import java.util.List;
+
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 
 /**
@@ -26,20 +28,23 @@ import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 public class StateUtils {
 
 	/**
-	 * Utility method to define a common generic bound to be used for setting a generic state
-	 * handle on a generic state carrier.
+	 * Utility method to define a common generic bound to be used for setting a
+	 * generic state handle on a generic state carrier.
 	 * 
-	 * This has no impact on runtime, since internally, it performs
-	 * unchecked casts. The purpose is merely to allow the use of generic interfaces without resorting
-	 * to raw types, by giving the compiler a common type bound. 
+	 * This has no impact on runtime, since internally, it performs unchecked
+	 * casts. The purpose is merely to allow the use of generic interfaces
+	 * without resorting to raw types, by giving the compiler a common type
+	 * bound.
 	 * 
-	 * @param op The state carrier operator.
-	 * @param state The state handle.
-	 * @param <T> Type bound for the  
+	 * @param op
+	 *            The state carrier operator.
+	 * @param state
+	 *            The state handle.
+	 * @param <T>
+	 *            Type bound for the
 	 */
-	public static <T extends StateHandle<?>> void setOperatorState(OperatorStateCarrier<?> op, StateHandle<?> state) 
-				throws Exception
-	{
+	public static <T extends StateHandle<?>> void setOperatorState(OperatorStateCarrier<?> op,
+			StateHandle<?> state) throws Exception {
 		@SuppressWarnings("unchecked")
 		OperatorStateCarrier<T> typedOp = (OperatorStateCarrier<T>) op;
 		@SuppressWarnings("unchecked")
@@ -47,10 +52,15 @@ public class StateUtils {
 
 		typedOp.setInitialState(typedHandle);
 	}
-	
-	
+
+	public static List<PartitionedStateHandle> rePartitionHandles(
+			List<PartitionedStateHandle> handles, int numPartitions) {
+		return null;
+	}
+
 	// ------------------------------------------------------------------------
-	
+
 	/** Do not instantiate */
-	private StateUtils() {}
+	private StateUtils() {
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
index a95b540..82ef6f3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.streaming.api.checkpoint;
 
+import java.io.Serializable;
+
+import org.apache.flink.runtime.state.StateHandle;
+
 /**
  * This interface must be implemented by functions/operations that want to receive
  * a commit notification once a checkpoint has been completely acknowledged by all
@@ -32,6 +36,7 @@ public interface CheckpointCommitter {
 	 * fail any more.
 	 * 
 	 * @param checkpointId The ID of the checkpoint that has been completed.
+	 * @param checkPointedState Handle to the state that was checkpointed with this checkpoint id.
 	 */
-	void commitCheckpoint(long checkpointId);
+	void commitCheckpoint(long checkpointId, StateHandle<Serializable> checkPointedState);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index 6a48b6a..2d70d49 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -17,6 +17,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+
 /**
  * The iterative data stream represents the start of an iteration in a
  * {@link DataStream}.

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
index 16e9deb..149d7a8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/StreamProjection.java
@@ -17,7 +17,6 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
@@ -48,6 +47,8 @@ import org.apache.flink.api.java.tuple.Tuple9;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.streaming.api.operators.StreamProject;
 
+import com.google.common.base.Preconditions;
+
 public class StreamProjection<IN> {
 
 	private DataStream<IN> dataStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
index 945f953..4f859e8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/FileReadFunction.java
@@ -17,6 +17,10 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.URI;
+
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -24,10 +28,6 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.util.Collector;
 
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.URI;
-
 public class FileReadFunction implements FlatMapFunction<Tuple3<String, Long, Long>, String> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index 17ca34d..921a33b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.streaming.api.functions.source;
 
-import org.apache.flink.api.common.functions.Function;
-
 import java.io.Serializable;
 
+import org.apache.flink.api.common.functions.Function;
+
 /**
  * Base interface for all stream data sources in Flink. The contract of a stream source
  * is the following: When the source should start emitting elements the {@link #run} method

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 329b4dd..0784582 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
@@ -59,6 +60,7 @@ public class StreamConfig implements Serializable {
 	private static final String OUT_STREAM_EDGES = "outStreamEdges";
 	private static final String IN_STREAM_EDGES = "inStreamEdges";
 	private static final String STATEHANDLE_PROVIDER = "stateHandleProvider";
+	private static final String STATE_PARTITIONER = "statePartitioner";
 
 	// DEFAULT VALUES
 	private static final long DEFAULT_TIMEOUT = 100;
@@ -381,7 +383,6 @@ public class StreamConfig implements Serializable {
 	}
 	
 	public void setStateHandleProvider(StateHandleProvider<?> provider) {
-
 		try {
 			InstantiationUtil.writeObjectToConfig(provider, this.config, STATEHANDLE_PROVIDER);
 		} catch (IOException e) {
@@ -398,6 +399,24 @@ public class StreamConfig implements Serializable {
 			throw new StreamTaskException("Could not instantiate statehandle provider.", e);
 		}
 	}
+	
+	public void setStatePartitioner(KeySelector<?, Serializable> partitioner) {
+		try {
+			InstantiationUtil.writeObjectToConfig(partitioner, this.config, STATE_PARTITIONER);
+		} catch (IOException e) {
+			throw new StreamTaskException("Could not serialize state partitioner.", e);
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	public KeySelector<?, Serializable> getStatePartitioner(ClassLoader cl) {
+		try {
+			return (KeySelector<?, Serializable>) InstantiationUtil
+					.readObjectFromConfig(this.config, STATE_PARTITIONER, cl);
+		} catch (Exception e) {
+			throw new StreamTaskException("Could not instantiate state partitioner.", e);
+		}
+	}
 
 	public void setChainStart() {
 		config.setBoolean(IS_CHAINED_VERTEX, true);

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a365587..40f61d9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -19,8 +19,8 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 /**
  * Base class for operators that do not contain a user-defined function.
@@ -31,7 +31,7 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
 
 	private static final long serialVersionUID = 1L;
 
-	protected transient RuntimeContext runtimeContext;
+	protected transient StreamingRuntimeContext runtimeContext;
 
 	protected transient ExecutionConfig executionConfig;
 
@@ -43,7 +43,7 @@ public abstract class AbstractStreamOperator<OUT> implements StreamOperator<OUT>
 	protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
 
 	@Override
-	public void setup(Output<OUT> output, RuntimeContext runtimeContext) {
+	public void setup(Output<OUT> output, StreamingRuntimeContext runtimeContext) {
 		this.output = output;
 		this.executionConfig = runtimeContext.getExecutionConfig();
 		this.runtimeContext = runtimeContext;

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 90b2b2f..cbcbcee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -18,20 +18,25 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import java.io.Serializable;
+import java.util.Map;
+
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-
-import java.io.Serializable;
+import org.apache.flink.streaming.api.state.StreamOperatorState;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
 /**
- * This is used as the base class for operators that have a user-defined function.
+ * This is used as the base class for operators that have a user-defined
+ * function.
  * 
- * @param <OUT> The output type of the operator
- * @param <F> The type of the user function
+ * @param <OUT>
+ *            The output type of the operator
+ * @param <F>
+ *            The type of the user function
  */
 public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serializable> extends AbstractStreamOperator<OUT> implements StatefulStreamOperator<OUT> {
 
@@ -44,7 +49,7 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 	}
 
 	@Override
-	public final void setup(Output<OUT> output, RuntimeContext runtimeContext) {
+	public void setup(Output<OUT> output, StreamingRuntimeContext runtimeContext) {
 		super.setup(output, runtimeContext);
 		FunctionUtils.setFunctionRuntimeContext(userFunction, runtimeContext);
 	}
@@ -57,35 +62,37 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 	}
 
 	@Override
-	public void close() throws Exception{
+	public void close() throws Exception {
 		super.close();
 		FunctionUtils.closeFunction(userFunction);
 	}
 
+	@SuppressWarnings("unchecked")
 	public void restoreInitialState(Serializable state) throws Exception {
-		if (userFunction instanceof Checkpointed) {
-			setStateOnFunction(state, userFunction);
-		}
-		else {
-			throw new IllegalStateException("Trying to restore state of a non-checkpointed function");
-		}
+
+		Map<Serializable, StateHandle<Serializable>> snapshots = (Map<Serializable, StateHandle<Serializable>>) state;
+
+		StreamOperatorState<?, Serializable> operatorState = (StreamOperatorState<?, Serializable>) runtimeContext
+				.getOperatorState();
+
+		operatorState.restoreState(snapshots);
+
 	}
 
-	public Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception {
-		if (userFunction instanceof Checkpointed) {
-			return ((Checkpointed<?>) userFunction).snapshotState(checkpointId, timestamp);
-		}
-		else {
-			return null;
-		}
+	public Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp)
+			throws Exception {
+		
+		StreamOperatorState<?,?> operatorState = (StreamOperatorState<?,?>) runtimeContext.getOperatorState();
+		
+		return (Serializable) operatorState.snapshotState(checkpointId, timestamp); 
 	}
 
-	public void confirmCheckpointCompleted(long checkpointId, long timestamp) throws Exception {
+	public void confirmCheckpointCompleted(long checkpointId, long timestamp,
+			StateHandle<Serializable> checkpointedState) throws Exception {
 		if (userFunction instanceof CheckpointCommitter) {
 			try {
-				((CheckpointCommitter) userFunction).commitCheckpoint(checkpointId);
-			}
-			catch (Exception e) {
+				((CheckpointCommitter) userFunction).commitCheckpoint(checkpointId, checkpointedState);
+			} catch (Exception e) {
 				throw new Exception("Error while confirming checkpoint " + checkpointId + " to the stream function", e);
 			}
 		}
@@ -94,13 +101,4 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 	public F getUserFunction() {
 		return userFunction;
 	}
-
-	private static <T extends Serializable> void setStateOnFunction(Serializable state, Function function) {
-		@SuppressWarnings("unchecked")
-		T typedState = (T) state;
-		@SuppressWarnings("unchecked")
-		Checkpointed<T> typedFunction = (Checkpointed<T>) function;
-
-		typedFunction.restoreState(typedState);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
index e171af8..343f87d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
@@ -19,6 +19,8 @@ package org.apache.flink.streaming.api.operators;
 
 import java.io.Serializable;
 
+import org.apache.flink.runtime.state.StateHandle;
+
 /**
  * Interface for Stream operators that can have state. This interface is used for checkpointing
  * and restoring that state.
@@ -31,5 +33,5 @@ public interface StatefulStreamOperator<OUT> extends StreamOperator<OUT> {
 
 	Serializable getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception;
 
-	void confirmCheckpointCompleted(long checkpointId, long timestamp) throws Exception;
+	void confirmCheckpointCompleted(long checkpointId, long timestamp, StateHandle<Serializable> checkpointedState) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 05b15be..aebff5c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -17,11 +17,11 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-
 import java.io.Serializable;
 
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
+
 /**
  * Basic interface for stream operators. Implementers would implement one of
  * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} or
@@ -37,7 +37,7 @@ public interface StreamOperator<OUT> extends Serializable {
 	/**
 	 * Initializes the {@link StreamOperator} for input and output handling.
 	 */
-	public void setup(Output<OUT> output, RuntimeContext runtimeContext);
+	public void setup(Output<OUT> output, StreamingRuntimeContext runtimeContext);
 
 	/**
 	 * This method is called before any elements are processed.

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java
new file mode 100644
index 0000000..14d1504
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/BasicCheckpointer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.state.StateCheckpointer;
+
+public class BasicCheckpointer implements StateCheckpointer<Serializable, Serializable> {
+
+	@Override
+	public Serializable snapshotState(Serializable state, long checkpointId, long checkpointTimestamp) {
+		return state;
+	}
+
+	@Override
+	public Serializable restoreState(Serializable stateSnapshot) {
+		return stateSnapshot;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
new file mode 100644
index 0000000..4ac01a5
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.runtime.state.PartitionedStateStore;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+
+public class EagerStateStore<S, C extends Serializable> implements PartitionedStateStore<S, C> {
+
+	protected StateCheckpointer<S, C> checkpointer;
+	protected StateHandleProvider<C> provider;
+
+	Map<Serializable, S> fetchedState;
+
+	public EagerStateStore(StateCheckpointer<S, C> checkpointer, StateHandleProvider<C> provider) {
+		this.checkpointer = checkpointer;
+		this.provider = provider;
+
+		fetchedState = new HashMap<Serializable, S>();
+	}
+
+	@Override
+	public S getStateForKey(Serializable key) throws Exception {
+		return fetchedState.get(key);
+	}
+
+	@Override
+	public void setStateForKey(Serializable key, S state) {
+		fetchedState.put(key, state);
+	}
+
+	@Override
+	public Map<Serializable, S> getPartitionedState() throws Exception {
+		return fetchedState;
+	}
+
+	@Override
+	public Map<Serializable, StateHandle<C>> snapshotStates(long checkpointId,
+			long checkpointTimestamp) {
+
+		Map<Serializable, StateHandle<C>> handles = new HashMap<Serializable, StateHandle<C>>();
+
+		for (Entry<Serializable, S> stateEntry : fetchedState.entrySet()) {
+			handles.put(stateEntry.getKey(), provider.createStateHandle(checkpointer.snapshotState(
+					stateEntry.getValue(), checkpointId, checkpointTimestamp)));
+		}
+		return handles;
+	}
+
+	@Override
+	public void restoreStates(Map<Serializable, StateHandle<C>> snapshots) throws Exception {
+		for (Entry<Serializable, StateHandle<C>> snapshotEntry : snapshots.entrySet()) {
+			fetchedState.put(snapshotEntry.getKey(),
+					checkpointer.restoreState(snapshotEntry.getValue().getState()));
+		}
+	}
+
+	@Override
+	public boolean containsKey(Serializable key) {
+		return fetchedState.containsKey(key);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/LazyStateStore.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/LazyStateStore.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/LazyStateStore.java
new file mode 100644
index 0000000..9872a0c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/LazyStateStore.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.runtime.state.PartitionedStateStore;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+
+/**
+ * Implementation of the {@link PartitionedStateStore} interface for lazy
+ * retrieval and snapshotting of the partitioned operator states. Lazy state
+ * access considerably speeds up recovery and makes resource access smoother by
+ * avoiding request congestion in the persistent storage layer.
+ * 
+ * <p>
+ * The logic implemented here can also be used later to push unused state to the
+ * persistent layer and also avoids re-snapshotting the unmodified states.
+ * </p>
+ * 
+ * @param <S>
+ *            Type of the operator states.
+ * @param <C>
+ *            Type of the state checkpoints.
+ */
+public class LazyStateStore<S, C extends Serializable> implements PartitionedStateStore<S, C> {
+
+	protected StateCheckpointer<S, C> checkpointer;
+	protected StateHandleProvider<C> provider;
+
+	Map<Serializable, StateHandle<C>> unfetchedState;
+	Map<Serializable, S> fetchedState;
+
+	public LazyStateStore(StateCheckpointer<S, C> checkpointer, StateHandleProvider<C> provider) {
+		this.checkpointer = checkpointer;
+		this.provider = provider;
+
+		unfetchedState = new HashMap<Serializable, StateHandle<C>>();
+		fetchedState = new HashMap<Serializable, S>();
+	}
+
+	@Override
+	public S getStateForKey(Serializable key) throws Exception {
+		S state = fetchedState.get(key);
+		if (state != null) {
+			return state;
+		} else {
+			StateHandle<C> handle = unfetchedState.get(key);
+			if (handle != null) {
+				state = checkpointer.restoreState(handle.getState());
+				fetchedState.put(key, state);
+				unfetchedState.remove(key);
+				return state;
+			} else {
+				return null;
+			}
+		}
+	}
+
+	@Override
+	public void setStateForKey(Serializable key, S state) {
+		fetchedState.put(key, state);
+		unfetchedState.remove(key);
+	}
+
+	@Override
+	public Map<Serializable, S> getPartitionedState() throws Exception {
+		for (Entry<Serializable, StateHandle<C>> handleEntry : unfetchedState.entrySet()) {
+			fetchedState.put(handleEntry.getKey(),
+					checkpointer.restoreState(handleEntry.getValue().getState()));
+		}
+		unfetchedState.clear();
+		return fetchedState;
+	}
+
+	@Override
+	public Map<Serializable, StateHandle<C>> snapshotStates(long checkpointId,
+			long checkpointTimestamp) {
+		for (Entry<Serializable, S> stateEntry : fetchedState.entrySet()) {
+			unfetchedState.put(stateEntry.getKey(), provider.createStateHandle(checkpointer
+					.snapshotState(stateEntry.getValue(), checkpointId, checkpointTimestamp)));
+		}
+		return unfetchedState;
+	}
+
+	@Override
+	public void restoreStates(Map<Serializable, StateHandle<C>> snapshots) {
+		unfetchedState.putAll(snapshots);
+	}
+
+	@Override
+	public boolean containsKey(Serializable key) {
+		return fetchedState.containsKey(key) || unfetchedState.containsKey(key);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
new file mode 100644
index 0000000..26b2a88
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.runtime.state.PartitionedStateStore;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+
+/**
+ * Implementation of the {@link OperatorState} interface for partitioned user
+ * states. It provides methods for checkpointing and restoring partitioned
+ * operator states upon failure.
+ * 
+ * @param <IN>
+ *            Input type of the underlying {@link OneInputStreamOperator}
+ * @param <S>
+ *            Type of the underlying {@link OperatorState}.
+ * @param <C>
+ *            Type of the state snapshot.
+ */
+public class PartitionedStreamOperatorState<IN, S, C extends Serializable> extends
+		StreamOperatorState<S, C> {
+
+	// KeySelector for getting the state partition key for each input
+	private KeySelector<IN, Serializable> keySelector;
+
+	private PartitionedStateStore<S, C> stateStore;
+	
+	private S defaultState;
+
+	// The currently processed input, used to extract the appropriate key
+	private IN currentInput;
+
+	public PartitionedStreamOperatorState(StateCheckpointer<S, C> checkpointer,
+			StateHandleProvider<C> provider, KeySelector<IN, Serializable> keySelector) {
+		super(checkpointer, provider);
+		this.keySelector = keySelector;
+		this.stateStore = new EagerStateStore<S, C>(checkpointer, provider);
+	}
+	
+	@SuppressWarnings("unchecked")
+	public PartitionedStreamOperatorState(StateHandleProvider<C> provider,
+			KeySelector<IN, Serializable> keySelector) {
+		this((StateCheckpointer<S, C>) new BasicCheckpointer(), provider, keySelector);
+	}
+
+	@Override
+	public S getState() {
+		if (currentInput == null) {
+			return null;
+		} else {
+			try {
+				Serializable key = keySelector.getKey(currentInput);
+				if(stateStore.containsKey(key)){
+					return stateStore.getStateForKey(key);
+				}else{
+					return defaultState;
+				}
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+
+	@Override
+	public void updateState(S state) {
+		if (currentInput == null) {
+			throw new RuntimeException("Need a valid input for updating a state.");
+		} else {
+			try {
+				stateStore.setStateForKey(keySelector.getKey(currentInput), state);
+			} catch (Exception e) {
+				throw new RuntimeException(e);
+			}
+		}
+	}
+	
+	@Override
+	public void setDefaultState(S defaultState){
+		this.defaultState = defaultState;
+	}
+
+	public void setCurrentInput(IN input) {
+		currentInput = input;
+	}
+
+	@Override
+	public Map<Serializable, StateHandle<C>> snapshotState(long checkpointId,
+			long checkpointTimestamp) throws Exception{
+		return stateStore.snapshotStates(checkpointId, checkpointTimestamp);
+	}
+
+	@Override
+	public void restoreState(Map<Serializable, StateHandle<C>> snapshots) throws Exception {
+		stateStore.restoreStates(snapshots);
+	}
+
+	@Override
+	public Map<Serializable, S> getPartitionedState() throws Exception {
+		return stateStore.getPartitionedState();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
new file mode 100644
index 0000000..90a3726
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
+import org.apache.flink.shaded.com.google.common.collect.ImmutableMap;
+
+/**
+ * Implementation of the {@link OperatorState} interface for non-partitioned
+ * user states. It provides methods for checkpointing and restoring operator
+ * states upon failure using the provided {@link StateCheckpointer} and
+ * {@link StateHandleProvider}.
+ * 
+ * @param <S>
+ *            Type of the underlying {@link OperatorState}.
+ * @param <C>
+ *            Type of the state snapshot.
+ */
+public class StreamOperatorState<S, C extends Serializable> implements OperatorState<S> {
+
+	protected static final Serializable DEFAULTKEY = -1;
+
+	private S state;
+	private StateCheckpointer<S, C> checkpointer;
+	private StateHandleProvider<C> provider;
+
+	public StreamOperatorState(StateCheckpointer<S, C> checkpointer, StateHandleProvider<C> provider) {
+		this.checkpointer = checkpointer;
+		this.provider = provider;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public StreamOperatorState(StateHandleProvider<C> provider) {
+		this((StateCheckpointer<S, C>) new BasicCheckpointer(), provider);
+	}
+
+	@Override
+	public S getState() {
+		return state;
+	}
+
+	@Override
+	public void updateState(S state) {
+		this.state = state;
+	}
+	
+	public void setDefaultState(S defaultState){
+		updateState(defaultState);
+	}
+
+	public StateCheckpointer<S, C> getCheckpointer() {
+		return checkpointer;
+	}
+	
+	public void setCheckpointer(StateCheckpointer<S, C> checkpointer) {
+		this.checkpointer = checkpointer;
+	}
+
+	protected StateHandleProvider<C> getStateHandleProvider() {
+		return provider;
+	}
+
+	public Map<Serializable, StateHandle<C>> snapshotState(long checkpointId,
+			long checkpointTimestamp) throws Exception {
+		return ImmutableMap.of(DEFAULTKEY, provider.createStateHandle(checkpointer.snapshotState(
+				getState(), checkpointId, checkpointTimestamp)));
+
+	}
+
+	public void restoreState(Map<Serializable, StateHandle<C>> snapshots) throws Exception {
+		updateState(checkpointer.restoreState(snapshots.get(DEFAULTKEY).getState()));
+	}
+
+	public Map<Serializable, S> getPartitionedState() throws Exception {
+		return ImmutableMap.of(DEFAULTKEY, state);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
index 3905558..247fe25 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BlockingQueueBroker.java
@@ -17,9 +17,9 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.BlockingQueue;
 
-import org.apache.flink.runtime.iterative.concurrent.Broker;
+import org.apache.flink.runtime.iterative.concurrent.Broker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index 941ddd2..c212346 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -17,14 +17,14 @@
 
 package org.apache.flink.streaming.runtime.io;
 
+import java.io.IOException;
+
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
 
-import java.io.IOException;
-
 public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
 
 	private long timeout;

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 2360aa8..6750b52 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import java.io.IOException;
+
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -29,8 +31,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
 
 	private static final Logger LOG = LoggerFactory.getLogger(OneInputStreamTask.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index e69f533..4952cdf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 import org.apache.flink.streaming.api.collector.StreamOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index db95dcc..e7f4d9c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.functors.NotNullPredicate;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.event.task.TaskEvent;
@@ -41,6 +42,8 @@ import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StatefulStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.state.PartitionedStreamOperatorState;
+import org.apache.flink.streaming.api.state.StreamOperatorState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -102,10 +105,19 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		return getEnvironment().getTaskName();
 	}
 
+	@SuppressWarnings({ "rawtypes", "unchecked" })
 	public StreamingRuntimeContext createRuntimeContext(StreamConfig conf) {
 		Environment env = getEnvironment();
-		return new StreamingRuntimeContext(conf.getStreamOperator(userClassLoader).getClass()
-				.getSimpleName(), env, getUserCodeClassLoader(), getExecutionConfig());
+		String operatorName = conf.getStreamOperator(userClassLoader).getClass().getSimpleName();
+		
+		KeySelector<?,Serializable> statePartitioner = conf.getStatePartitioner(userClassLoader);
+		
+		StreamOperatorState state = statePartitioner == null ? new StreamOperatorState(
+				getStateHandleProvider()) : new PartitionedStreamOperatorState(
+				getStateHandleProvider(), statePartitioner);
+		
+		return new StreamingRuntimeContext(operatorName, env, getUserCodeClassLoader(),
+				getExecutionConfig(), state);
 	}
 	
 	private StateHandleProvider<Serializable> getStateHandleProvider() {
@@ -129,7 +141,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 			switch (backend) {
 				case JOBMANAGER:
 					LOG.info("State backend for state checkpoints is set to jobmanager.");
-					return LocalStateHandle.createProvider();
+					return new LocalStateHandle.LocalStateHandleProvider<Serializable>();
 				case FILESYSTEM:
 					String checkpointDir = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null);
 					if (checkpointDir != null) {
@@ -294,13 +306,13 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		// we do nothing here so far. this should call commit on the source function, for example
 		synchronized (checkpointLock) {
 			if (streamOperator instanceof StatefulStreamOperator) {
-				((StatefulStreamOperator) streamOperator).confirmCheckpointCompleted(checkpointId, timestamp);
+				((StatefulStreamOperator) streamOperator).confirmCheckpointCompleted(checkpointId, timestamp, null);
 			}
 
 			if (hasChainedOperators) {
 				for (OneInputStreamOperator<?, ?> chainedOperator : outputHandler.getChainedOperators()) {
 					if (chainedOperator instanceof StatefulStreamOperator) {
-						((StatefulStreamOperator) chainedOperator).confirmCheckpointCompleted(checkpointId, timestamp);
+						((StatefulStreamOperator) chainedOperator).confirmCheckpointCompleted(checkpointId, timestamp, null);
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 6112e03..5fd158c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -18,13 +18,18 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import java.io.Serializable;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.functions.util.RuntimeUDFContext;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.streaming.api.state.StreamOperatorState;
 
 /**
  * Implementation of the {@link RuntimeContext}, created by runtime stream UDF
@@ -33,13 +38,16 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 public class StreamingRuntimeContext extends RuntimeUDFContext {
 
 	private final Environment env;
+	@SuppressWarnings("rawtypes")
+	private StreamOperatorState state;
 
 
 	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
-			ExecutionConfig executionConfig) {
+			ExecutionConfig executionConfig, StreamOperatorState<?, ?> state) {
 		super(name, env.getNumberOfSubtasks(), env.getIndexInSubtaskGroup(), userCodeClassLoader,
 				executionConfig, env.getDistributedCacheEntries());
 		this.env = env;
+		this.state = state;
 	}
 
 	/**
@@ -60,5 +68,25 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	public Configuration getTaskStubParameters() {
 		return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
 	}
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	public <S, C extends Serializable> OperatorState<S> getOperatorState(S defaultState,
+			StateCheckpointer<S, C> checkpointer) {
+		state.setCheckpointer(checkpointer);
+		return (OperatorState<S>) state;
+	}
+	
+	@SuppressWarnings("unchecked")
+	@Override
+	public <S extends Serializable> OperatorState<S> getOperatorState(S defaultState) {
+		state.setDefaultState(defaultState);
+		return (OperatorState<S>) state;
+	}
+	
+	@SuppressWarnings("unchecked")
+	public <S extends Serializable> OperatorState<S> getOperatorState() {
+		return (OperatorState<S>) state;
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 67119f7..2052877 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -30,7 +30,6 @@ import org.apache.flink.streaming.runtime.io.CoRecordReader;
 import org.apache.flink.streaming.runtime.io.InputGateFactory;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index 87c9757..333bcdd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -17,10 +17,10 @@
 
 package org.apache.flink.streaming.util.serialization;
 
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-
 import java.io.Serializable;
 
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
 public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/a7e24580/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulFunctionTest.java
new file mode 100644
index 0000000..f4c1a89
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulFunctionTest.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.state;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.Histogram;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.cache.DistributedCache;
+import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.shaded.com.google.common.collect.ImmutableMap;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Test;
+
+public class StatefulFunctionTest {
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void simpleStateTest() throws Exception {
+
+		StatefulMapper mapper = new StatefulMapper();
+		MockContext context = new MockContext(false, mapper);
+		mapper.setRuntimeContext(context);
+		mapper.open(null);
+
+		assertEquals(Arrays.asList("1", "2", "3", "4", "5"),
+				applyOnSequence(mapper, 1, 5, context.state));
+		assertEquals((Integer) 5, context.state.getState());
+
+		byte[] serializedState = InstantiationUtil.serializeObject(context.state
+				.snapshotState(1, 1));
+
+		StatefulMapper restoredMapper = new StatefulMapper();
+		MockContext restoredContext = new MockContext(false, restoredMapper);
+		restoredMapper.setRuntimeContext(context);
+		restoredMapper.open(null);
+
+		assertEquals(null, restoredContext.state.getState());
+
+		Map<Serializable, StateHandle<Integer>> deserializedState = (Map<Serializable, StateHandle<Integer>>) InstantiationUtil
+				.deserializeObject(serializedState, Thread.currentThread().getContextClassLoader());
+
+		restoredContext.state.restoreState(deserializedState);
+
+		assertEquals((Integer) 5, restoredContext.state.getState());
+
+	}
+
+	@SuppressWarnings("unchecked")
+	@Test
+	public void partitionedStateTest() throws Exception {
+		StatefulMapper mapper = new StatefulMapper();
+		MockContext context = new MockContext(true, mapper);
+		mapper.setRuntimeContext(context);
+		mapper.open(null);
+
+		assertEquals(Arrays.asList("1", "2", "3", "4", "5"),
+				applyOnSequence(mapper, 1, 5, context.state));
+		assertEquals(ImmutableMap.of(0, 2, 1, 3), context.state.getPartitionedState());
+
+		byte[] serializedState = InstantiationUtil.serializeObject(context.state
+				.snapshotState(1, 1));
+
+		StatefulMapper restoredMapper = new StatefulMapper();
+		MockContext restoredContext = new MockContext(true, restoredMapper);
+		restoredMapper.setRuntimeContext(context);
+		restoredMapper.open(null);
+
+		assertEquals(null, restoredContext.state.getState());
+
+		Map<Serializable, StateHandle<Integer>> deserializedState = (Map<Serializable, StateHandle<Integer>>) InstantiationUtil
+				.deserializeObject(serializedState, Thread.currentThread().getContextClassLoader());
+
+		restoredContext.state.restoreState(deserializedState);
+
+		assertEquals(ImmutableMap.of(0, 2, 1, 3), restoredContext.state.getPartitionedState());
+
+	}
+
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	private <T> List<T> applyOnSequence(MapFunction<Integer, T> mapper, int from, int to,
+			StreamOperatorState state) throws Exception {
+		List<T> output = new ArrayList<T>();
+		for (int i = from; i <= to; i++) {
+			if (state instanceof PartitionedStreamOperatorState) {
+				((PartitionedStreamOperatorState) state).setCurrentInput(i);
+			}
+			output.add(mapper.map(i));
+		}
+		return output;
+	}
+
+	public static class ModKey implements KeySelector<Integer, Serializable> {
+
+		private static final long serialVersionUID = 4193026742083046736L;
+
+		int base;
+
+		public ModKey(int base) {
+			this.base = base;
+		}
+
+		@Override
+		public Integer getKey(Integer value) throws Exception {
+			return value % base;
+		}
+
+	}
+
+	public static class StatefulMapper extends RichMapFunction<Integer, String> {
+
+		private static final long serialVersionUID = -9007873655253339356L;
+		OperatorState<Integer> opState;
+
+		@Override
+		public String map(Integer value) throws Exception {
+			opState.updateState(opState.getState() + 1);
+			return value.toString();
+		}
+
+		@Override
+		public void open(Configuration conf) {
+			opState = getRuntimeContext().getOperatorState(0);
+		}
+	}
+
+	public static class MockContext implements RuntimeContext {
+
+		StreamOperatorState<Integer, Integer> state;
+
+		public MockContext(boolean isPartitionedState, StatefulMapper mapper) {
+			if (isPartitionedState) {
+				this.state = new PartitionedStreamOperatorState<Integer, Integer, Integer>(
+						new LocalStateHandleProvider<Integer>(), new ModKey(2));
+			} else {
+				this.state = new StreamOperatorState<Integer, Integer>(
+						new LocalStateHandleProvider<Integer>());
+			}
+		}
+		
+		public String getTaskName() {return null;}
+		public int getNumberOfParallelSubtasks() {return 0;}
+		public int getIndexOfThisSubtask() {return 0;}
+		public ExecutionConfig getExecutionConfig() {return null;}
+		public ClassLoader getUserCodeClassLoader() {return null;}
+		public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> accumulator) {}
+		public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {return null;}
+		public HashMap<String, Accumulator<?, ?>> getAllAccumulators() {return null;}	
+		public IntCounter getIntCounter(String name) {return null;}	
+		public LongCounter getLongCounter(String name) {return null;}
+		public DoubleCounter getDoubleCounter(String name) {return null;}
+		public Histogram getHistogram(String name) {return null;}		
+		public <RT> List<RT> getBroadcastVariable(String name) {return null;}
+		public <T, C> C getBroadcastVariableWithInitializer(String name,
+				BroadcastVariableInitializer<T, C> initializer) {return null;}
+		public DistributedCache getDistributedCache() {return null;}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public <S, C extends Serializable> OperatorState<S> getOperatorState(S defaultState,
+				StateCheckpointer<S, C> checkpointer) {
+			state.setDefaultState((Integer) defaultState);
+			return (OperatorState<S>) state;
+		}
+
+		@SuppressWarnings("unchecked")
+		@Override
+		public <S extends Serializable> OperatorState<S> getOperatorState(S defaultState) {
+			state.setDefaultState((Integer) defaultState);
+			return (OperatorState<S>) state;
+		}
+
+	}
+
+}


Mime
View raw message