flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [08/12] flink git commit: [streaming] Re-enable Checkpointed interface for drawing snapshots
Date Thu, 25 Jun 2015 17:21:41 GMT
[streaming] Re-enable Checkpointed interface for drawing snapshots


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0ae1758a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0ae1758a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0ae1758a

Branch: refs/heads/master
Commit: 0ae1758a98aebf8098570fc1c11096f78747af44
Parents: 474ff4d
Author: Gyula Fora <gyfora@apache.org>
Authored: Mon Jun 22 11:10:37 2015 +0200
Committer: Gyula Fora <gyfora@apache.org>
Committed: Thu Jun 25 16:38:07 2015 +0200

----------------------------------------------------------------------
 docs/apis/streaming_guide.md                    | 48 +++++++++++++++++--
 .../api/common/functions/RuntimeContext.java    |  9 +++-
 .../util/AbstractRuntimeUDFContext.java         |  5 +-
 .../flink/api/common/state/OperatorState.java   | 10 +++-
 .../streaming/connectors/kafka/KafkaITCase.java |  3 --
 .../source/StatefulSequenceSource.java          |  4 +-
 .../operators/AbstractUdfStreamOperator.java    | 49 ++++++++++++++++----
 .../api/operators/StatefulStreamOperator.java   |  5 +-
 .../streaming/api/state/EagerStateStore.java    |  5 ++
 .../state/PartitionedStreamOperatorState.java   | 18 ++++---
 .../api/state/StreamOperatorState.java          | 12 +++--
 .../streaming/api/state/WrapperStateHandle.java | 20 +++++---
 .../streaming/runtime/tasks/StreamTask.java     | 49 ++++++++++++--------
 .../runtime/tasks/StreamingRuntimeContext.java  | 14 ++++--
 .../api/state/StatefulOperatorTest.java         | 35 +++++++++++---
 .../runtime/tasks/SourceStreamTaskTest.java     |  3 +-
 .../StreamCheckpointingITCase.java              | 37 +++++++++------
 .../ProcessFailureStreamingRecoveryITCase.java  | 24 +++++++---
 18 files changed, 256 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/docs/apis/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md
index f252f1e..2713d6e 100644
--- a/docs/apis/streaming_guide.md
+++ b/docs/apis/streaming_guide.md
@@ -1188,13 +1188,15 @@ Rich functions provide, in addition to the user-defined function (`map()`, `redu
 Stateful computation
 ------------
 
-Flink supports the checkpointing and persistence of user defined operator state, so in case of a failure this state can be restored to the latest checkpoint and the processing will continue from there. This gives exactly once processing semantics with respect to the operator states when the sources follow this stateful pattern as well. In practice this usually means that sources keep track of their current offset as their OperatorState. The `PersistentKafkaSource` provides this stateful functionality for reading streams from Kafka. 
+Flink supports the checkpointing and persistence of user defined operator states, so in case of a failure this state can be restored to the latest checkpoint and the processing will continue from there. This gives exactly once processing semantics with respect to the operator states when the sources follow this stateful pattern as well. In practice this usually means that sources keep track of their current offset as their OperatorState. The `PersistentKafkaSource` provides this stateful functionality for reading streams from Kafka.
 
-Flink supports two ways of accessing operator states: partitioned and non-partitioned state access.
+### OperatorState
 
-In case of non-partitioned state access, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.getState()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `getState()` will return number of inputs processed by each parallel mapper.
+Flink supports two types of operator states: partitioned and non-partitioned states.
 
-In case of of partitioned `OperatorState` a separate state is maintained for each received key. This can be used for instance to count received inputs by different keys, or store and update summary statistics of different sub-streams.
+In case of non-partitioned operator state, an operator state is maintained for each parallel instance of a given operator. When `OperatorState.getState()` is called, a separate state is returned in each parallel instance. In practice this means if we keep a counter for the received inputs in a mapper, `getState()` will return number of inputs processed by each parallel mapper.
+
+In case of of partitioned operator state a separate state is maintained for each received key. This can be used for instance to count received inputs by different keys, or store and update summary statistics of different sub-streams.
 
 Checkpointing of the states needs to be enabled from the `StreamExecutionEnvironment` using the `enableCheckpointing(…)` where additional parameters can be passed to modify the default 5 second checkpoint interval.
 
@@ -1264,8 +1266,46 @@ public static class CounterSource implements RichParallelSourceFunction<Long> {
 
 Some operators might need the information when a checkpoint is fully acknowledged by Flink to communicate that with the outside world. In this case see the `flink.streaming.api.checkpoint.CheckpointComitter` interface.
 
+### Checkpointed interface
+
+Another way of exposing user defined operator state for the Flink runtime for checkpointing is by using the `Checkpointed` interface.
+
+When the user defined function implements the `Checkpointed` interface, the `snapshotState(…)` and `restoreState(…)` methods will be executed to draw and restore function state.
+
+For example the same counting, reduce function shown for `OperatorState`s by using the `Checkpointed` interface instead:
+
+{% highlight java %}
+public class CounterSum implements ReduceFunction<Long>, Checkpointed<Long> {
+    
+    //persistent counter
+    private long counter = 0;
+
+    @Override
+    public Long reduce(Long value1, Long value2) throws Exception {
+        counter++;
+        return value1 + value2;
+    }
+
+    // regularly persists state during normal operation
+    @Override
+    public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+        return new Long(counter);
+    }
+
+    // restores state on recovery from failure
+    @Override
+    public void restoreState(Serializable state) {
+        counter = (Long) state;
+    }
+}
+{% endhighlight %} 
+
+### State checkpoints in iterative jobs
+
 Fink currently only provides processing guarantees for jobs without iterations. Enabling checkpointing on an iterative job causes an exception. In order to force checkpointing on an iterative program the user needs to set a special flag when enabling checkpointing: `env.enableCheckpointing(interval, force = true)`.
 
+Please note that records in flight in the loop edges (and the state changes associated with them) will be lost during failure.
+
 [Back to top](#top)
 
 Lambda expressions with Java 8

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
index 8046327..4c8e924 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/RuntimeContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.functions;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
@@ -193,9 +194,11 @@ public interface RuntimeContext {
 	 *            The {@link StateCheckpointer} that will be used to draw
 	 *            snapshots from the user state.
 	 * @return The {@link OperatorState} for the underlying operator.
+	 * 
+	 * @throws IOException Thrown if the system cannot access the state.
 	 */
 	<S, C extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
-			boolean partitioned, StateCheckpointer<S, C> checkpointer);
+			boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException;
 
 	/**
 	 * Returns the {@link OperatorState} with the given name of the underlying
@@ -220,7 +223,9 @@ public interface RuntimeContext {
 	 *            Sets whether partitioning should be applied for the given
 	 *            state. If true a partitioner key must be used.
 	 * @return The {@link OperatorState} for the underlying operator.
+	 * 
+	 * @throws IOException Thrown if the system cannot access the state.
 	 */
 	<S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
-			boolean partitioned);
+			boolean partitioned) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
index 33a7abc..f48eb57 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/functions/util/AbstractRuntimeUDFContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.common.functions.util;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
@@ -175,13 +176,13 @@ public abstract class AbstractRuntimeUDFContext implements RuntimeContext {
 	
 	@Override
 	public <S, C extends Serializable> OperatorState<S> getOperatorState(String name,
-			S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) {
+			S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException {
 	throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
 	}
 
 	@Override
 	public <S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
-			boolean partitioned) {
+			boolean partitioned) throws IOException{
 	throw new UnsupportedOperationException("Operator state is only accessible for streaming operators.");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
index 926c190..4198a50 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/OperatorState.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.api.common.state;
 
+import java.io.IOException;
+
 import org.apache.flink.api.common.functions.MapFunction;
 
 /**
@@ -45,8 +47,10 @@ public interface OperatorState<T> {
 	 * independent state for each partition.
 	 * 
 	 * @return The operator state corresponding to the current input.
+	 * 
+	 * @throws IOException Thrown if the system cannot access the state.
 	 */
-	T getState();
+	T getState() throws IOException;
 
 	/**
 	 * Updates the operator state accessible by {@link #getState()} to the given
@@ -55,7 +59,9 @@ public interface OperatorState<T> {
 	 * 
 	 * @param state
 	 *            The new state.
+	 *            
+	 * @throws IOException Thrown if the system cannot access the state.
 	 */
-	void updateState(T state);
+	void updateState(T state) throws IOException;
 	
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 562dc47..2af56c1 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.File;
 import java.io.IOException;
-import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
@@ -47,7 +46,6 @@ import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
 
 import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.collections.map.LinkedMap;
 import org.apache.curator.test.TestingServer;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -77,7 +75,6 @@ import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
index 3cb8b90..9a2ba4c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/functions/source/StatefulSequenceSource.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.api.functions.source;
 
 
+import java.io.IOException;
+
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.configuration.Configuration;
@@ -73,7 +75,7 @@ public class StatefulSequenceSource extends RichParallelSourceFunction<Long> {
 	}
 	
 	@Override
-	public void open(Configuration conf){
+	public void open(Configuration conf) throws IOException{
 		collected = getRuntimeContext().getOperatorState("collected", 0L, false);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 9faf0c0..c128a7b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -25,10 +25,13 @@ import java.util.Map.Entry;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.PartitionedStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.state.StreamOperatorState;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
 
@@ -72,25 +75,36 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 		FunctionUtils.closeFunction(userFunction);
 	}
 
+	@Override
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public void restoreInitialState(Map<String, PartitionedStateHandle> snapshots) throws Exception {
-		// We iterate over the states registered for this operator, initialize and restore it
-		for (Entry<String, PartitionedStateHandle> snapshot : snapshots.entrySet()) {
-			Map<Serializable, StateHandle<Serializable>> handles = snapshot.getValue().getState();
-			StreamOperatorState restoredState = runtimeContext.getState(snapshot.getKey(),
-					!(handles instanceof ImmutableMap));
-			restoredState.restoreState(snapshot.getValue().getState());
+	public void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> snapshots) throws Exception {
+		// Restore state using the Checkpointed interface
+		if (userFunction instanceof Checkpointed) {
+			((Checkpointed) userFunction).restoreState(snapshots.f0.getState());
+		}
+		
+		if (snapshots.f1 != null) {
+			// We iterate over the states registered for this operator, initialize and restore it
+			for (Entry<String, PartitionedStateHandle> snapshot : snapshots.f1.entrySet()) {
+				Map<Serializable, StateHandle<Serializable>> handles = snapshot.getValue().getState();
+				StreamOperatorState restoredState = runtimeContext.getState(snapshot.getKey(),
+						!(handles instanceof ImmutableMap));
+				restoredState.restoreState(snapshot.getValue().getState());
+			}
 		}
+		
 	}
 
 	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public Map<String, PartitionedStateHandle> getStateSnapshotFromFunction(long checkpointId, long timestamp)
+	public Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> getStateSnapshotFromFunction(long checkpointId, long timestamp)
 			throws Exception {
 		// Get all the states for the operator
 		Map<String, StreamOperatorState> operatorStates = runtimeContext.getOperatorStates();
+		
+		Map<String, PartitionedStateHandle> operatorStateSnapshots;
 		if (operatorStates.isEmpty()) {
 			// We return null to signal that there is nothing to checkpoint
-			return null;
+			operatorStateSnapshots = null;
 		} else {
 			// Checkpoint the states and store the handles in a map
 			Map<String, PartitionedStateHandle> snapshots = new HashMap<String, PartitionedStateHandle>();
@@ -100,7 +114,22 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 						new PartitionedStateHandle(state.getValue().snapshotState(checkpointId, timestamp)));
 			}
 
-			return snapshots;
+			operatorStateSnapshots = snapshots;
+		}
+		
+		StateHandle<Serializable> checkpointedSnapshot = null;
+
+		if (userFunction instanceof Checkpointed) {
+			StateHandleProvider<Serializable> provider = runtimeContext.getStateHandleProvider();
+			checkpointedSnapshot = provider.createStateHandle(((Checkpointed) userFunction)
+					.snapshotState(checkpointId, timestamp));
+		}
+		
+		if (operatorStateSnapshots != null || checkpointedSnapshot != null) {
+			return new Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>(
+					checkpointedSnapshot, operatorStateSnapshots);
+		} else {
+			return null;
 		}
 
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
index 0fac2f8..6b5a3e8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators;
 import java.io.Serializable;
 import java.util.Map;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.PartitionedStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
 
@@ -31,9 +32,9 @@ import org.apache.flink.runtime.state.StateHandle;
  */
 public interface StatefulStreamOperator<OUT> extends StreamOperator<OUT> {
 
-	void restoreInitialState(Map<String, PartitionedStateHandle> state) throws Exception;
+	void restoreInitialState(Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> state) throws Exception;
 
-	Map<String, PartitionedStateHandle> getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception;
+	Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception;
 
 	void confirmCheckpointCompleted(long checkpointId, String stateName, StateHandle<Serializable> checkpointedState) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
index b374fe4..6d3bad6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/EagerStateStore.java
@@ -87,5 +87,10 @@ public class EagerStateStore<S, C extends Serializable> implements PartitionedSt
 	public void setCheckPointer(StateCheckpointer<S, C> checkpointer) {
 		this.checkpointer = checkpointer;
 	}
+	
+	@Override
+	public String toString() {
+		return fetchedState.toString();
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
index abe32da..808b7c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.state;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
@@ -68,9 +69,9 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 	}
 
 	@Override
-	public S getState() {
+	public S getState() throws IOException{
 		if (currentInput == null) {
-			return null;
+			throw new IllegalStateException("Need a valid input for accessing the state.");
 		} else {
 			try {
 				Serializable key = keySelector.getKey(currentInput);
@@ -80,23 +81,23 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 					return defaultState;
 				}
 			} catch (Exception e) {
-				throw new RuntimeException(e);
+				throw new RuntimeException("User-defined key selector threw an exception.");
 			}
 		}
 	}
 
 	@Override
-	public void updateState(S state) {
+	public void updateState(S state) throws IOException {
 		if (state == null) {
 			throw new RuntimeException("Cannot set state to null.");
 		}
 		if (currentInput == null) {
-			throw new RuntimeException("Need a valid input for updating a state.");
+			throw new IllegalStateException("Need a valid input for updating a state.");
 		} else {
 			try {
 				stateStore.setStateForKey(keySelector.getKey(currentInput), state);
 			} catch (Exception e) {
-				throw new RuntimeException(e);
+				throw new RuntimeException("User-defined key selector threw an exception.");
 			}
 		}
 	}
@@ -125,5 +126,10 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 	public Map<Serializable, S> getPartitionedState() throws Exception {
 		return stateStore.getPartitionedState();
 	}
+	
+	@Override
+	public String toString() {
+		return stateStore.toString();
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
index 59c624e..1699c27 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/StreamOperatorState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.state;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.Map;
 
@@ -58,19 +59,19 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
 	}
 
 	@Override
-	public S getState() {
+	public S getState() throws IOException {
 		return state;
 	}
 
 	@Override
-	public void updateState(S state) {
+	public void updateState(S state) throws IOException {
 		if (state == null) {
 			throw new RuntimeException("Cannot set state to null.");
 		}
 		this.state = state;
 	}
 	
-	public void setDefaultState(S defaultState) {
+	public void setDefaultState(S defaultState) throws IOException {
 		if (getState() == null) {
 			updateState(defaultState);
 		}
@@ -102,5 +103,10 @@ public class StreamOperatorState<S, C extends Serializable> implements OperatorS
 	public Map<Serializable, S> getPartitionedState() throws Exception {
 		return ImmutableMap.of(DEFAULTKEY, state);
 	}
+	
+	@Override
+	public String toString() {
+		return state.toString();
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java
index 276f9e9..1adef48 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/WrapperStateHandle.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.PartitionedStateHandle;
 import org.apache.flink.runtime.state.StateHandle;
@@ -35,19 +36,24 @@ public class WrapperStateHandle extends LocalStateHandle<Serializable> {
 
 	private static final long serialVersionUID = 1L;
 
-	public WrapperStateHandle(List<Map<String, PartitionedStateHandle>> state) {
+	public WrapperStateHandle(List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>> state) {
 		super((Serializable) state);
 	}
 
 	@Override
 	public void discardState() throws Exception {
 		@SuppressWarnings("unchecked")
-		List<Map<String, PartitionedStateHandle>> chainedStates = (List<Map<String, PartitionedStateHandle>>) getState();
-		for (Map<String, PartitionedStateHandle> stateMap : chainedStates) {
-			if(stateMap != null) {
-				for (PartitionedStateHandle statePartitions : stateMap.values()) {
-					for (StateHandle<Serializable> handle : statePartitions.getState().values()) {
-						handle.discardState();
+		List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>> chainedStates = (List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>>) getState();
+		for (Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> state : chainedStates) {
+			if (state != null) {
+				if (state.f0 != null) {
+					state.f0.discardState();
+				}
+				if (state.f1 != null) {
+					for (PartitionedStateHandle statePartitions : state.f1.values()) {
+						for (StateHandle<Serializable> handle : statePartitions.getState().values()) {
+							handle.discardState();
+						}
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 8f6bc43..7421a33 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -27,6 +27,7 @@ import java.util.Map.Entry;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.functors.NotNullPredicate;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.event.task.TaskEvent;
@@ -200,18 +201,15 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
 		
 		// We retrieve end restore the states for the chained oeprators.
-		List<Serializable> chainedStates = (List<Serializable>) stateHandle.getState();
+		List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>> chainedStates = (List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>>) stateHandle.getState();
 
 		// We restore all stateful chained operators
 		for (int i = 0; i < chainedStates.size(); i++) {
-			Serializable state = chainedStates.get(i);
+			Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> state = chainedStates.get(i);
 			// If state is not null we need to restore it
 			if (state != null) {
 				StreamOperator<?> chainedOperator = outputHandler.getChainedOperators().get(i);
-
-				((StatefulStreamOperator<?>) chainedOperator)
-						.restoreInitialState((Map<String, PartitionedStateHandle>) state);
-
+				((StatefulStreamOperator<?>) chainedOperator).restoreInitialState(state);
 			}
 		}
 
@@ -226,7 +224,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 					LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
 					
 					// We wrap the states of the chained operators in a list, marking non-stateful oeprators with null
-					List<Map<String, PartitionedStateHandle>> chainedStates = new ArrayList<Map<String, PartitionedStateHandle>>();
+					List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>> chainedStates = new ArrayList<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>>();
 					
 					// A wrapper handle is created for the List of statehandles
 					WrapperStateHandle stateHandle;
@@ -273,26 +271,39 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	@Override
 	public void confirmCheckpoint(long checkpointId, SerializedValue<StateHandle<?>> stateHandle) throws Exception {
-		// we do nothing here so far. this should call commit on the source function, for example
 		synchronized (checkpointLock) {
-			
-			List<Map<String, PartitionedStateHandle>> chainedStates = (List<Map<String, PartitionedStateHandle>>) stateHandle.deserializeValue(getUserCodeClassLoader()).getState();
+			if (stateHandle != null) {
+				List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>> chainedStates = (List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>>) stateHandle
+						.deserializeValue(getUserCodeClassLoader()).getState();
 
-			for (int i = 0; i < chainedStates.size(); i++) {
-				Map<String, PartitionedStateHandle> chainedState = chainedStates.get(i);
-				if (chainedState != null) {
+				for (int i = 0; i < chainedStates.size(); i++) {
+					Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> chainedState = chainedStates
+							.get(i);
 					StreamOperator<?> chainedOperator = outputHandler.getChainedOperators().get(i);
-					if (chainedOperator instanceof StatefulStreamOperator) {
-						for (Entry<String, PartitionedStateHandle> stateEntry : chainedState
-								.entrySet()) {
-							for (StateHandle<Serializable> handle : stateEntry.getValue().getState().values()) {
-								((StatefulStreamOperator) chainedOperator).confirmCheckpointCompleted(
-										checkpointId, stateEntry.getKey(), handle);
+
+					if (chainedState != null) {
+						if (chainedState.f0 != null) {
+							((StatefulStreamOperator) chainedOperator).confirmCheckpointCompleted(
+									checkpointId, null, chainedState.f0);
+						}
+
+						if (chainedState.f1 != null) {
+							if (chainedOperator instanceof StatefulStreamOperator) {
+								for (Entry<String, PartitionedStateHandle> stateEntry : chainedState.f1
+										.entrySet()) {
+									for (StateHandle<Serializable> handle : stateEntry.getValue()
+											.getState().values()) {
+										((StatefulStreamOperator) chainedOperator)
+												.confirmCheckpointCompleted(checkpointId,
+														stateEntry.getKey(), handle);
+									}
+								}
 							}
 						}
 					}
 				}
 			}
+
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index de5ef06..3efd619 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -49,8 +50,9 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	private final Map<String, StreamOperatorState> states;
 	private final List<PartitionedStreamOperatorState> partitionedStates;
 	private final KeySelector<?, ?> statePartitioner;
-	private final StateHandleProvider<?> provider;
+	private final StateHandleProvider<Serializable> provider;
 
+	@SuppressWarnings("unchecked")
 	public StreamingRuntimeContext(String name, Environment env, ClassLoader userCodeClassLoader,
 			ExecutionConfig executionConfig, KeySelector<?, ?> statePartitioner,
 			StateHandleProvider<?> provider) {
@@ -60,7 +62,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 		this.statePartitioner = statePartitioner;
 		this.states = new HashMap<String, StreamOperatorState>();
 		this.partitionedStates = new LinkedList<PartitionedStreamOperatorState>();
-		this.provider = provider;
+		this.provider = (StateHandleProvider<Serializable>) provider;
 	}
 
 	/**
@@ -81,11 +83,15 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	public Configuration getTaskStubParameters() {
 		return new TaskConfig(env.getTaskConfiguration()).getStubParameters();
 	}
+	
+	public StateHandleProvider<Serializable> getStateHandleProvider() {
+		return provider;
+	}
 
 	@SuppressWarnings("unchecked")
 	@Override
 	public <S, C extends Serializable> OperatorState<S> getOperatorState(String name,
-			S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) {
+			S defaultState, boolean partitioned, StateCheckpointer<S, C> checkpointer) throws IOException {
 		if (defaultState == null) {
 			throw new RuntimeException("Cannot set default state to null.");
 		}
@@ -99,7 +105,7 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	@SuppressWarnings("unchecked")
 	@Override
 	public <S extends Serializable> OperatorState<S> getOperatorState(String name, S defaultState,
-			boolean partitioned) {
+			boolean partitioned) throws IOException {
 		if (defaultState == null) {
 			throw new RuntimeException("Cannot set default state to null.");
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 32a638a..442d8ea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.state;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -31,11 +32,14 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.LocalStateHandle.LocalStateHandleProvider;
 import org.apache.flink.runtime.state.PartitionedStateHandle;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
@@ -65,6 +69,7 @@ public class StatefulOperatorTest {
 		assertEquals((Integer) 5, context.getOperatorState("counter", 0, false).getState());
 		assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState());
 		assertEquals("12345", context.getOperatorState("concat", "", false).getState());
+		assertEquals((Integer) 5, ((StatefulMapper) map.getUserFunction()).checkpointedCounter);
 
 		byte[] serializedState = InstantiationUtil.serializeObject(map.getStateSnapshotFromFunction(1, 1));
 
@@ -74,6 +79,7 @@ public class StatefulOperatorTest {
 		assertEquals((Integer) 5, restoredContext.getOperatorState("counter", 0, false).getState());
 		assertEquals(ImmutableMap.of(0, 2, 1, 3), context.getOperatorStates().get("groupCounter").getPartitionedState());
 		assertEquals("12345", restoredContext.getOperatorState("concat", "", false).getState());
+		assertEquals((Integer) 5, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
 		out.clear();
 
 		processInputs(restoredMap, Arrays.asList(7, 8));
@@ -83,6 +89,8 @@ public class StatefulOperatorTest {
 		assertEquals(ImmutableMap.of(0, 3, 1, 4), restoredContext.getOperatorStates().get("groupCounter")
 				.getPartitionedState());
 		assertEquals("1234578", restoredContext.getOperatorState("concat", "", false).getState());
+		assertEquals((Integer) 7, ((StatefulMapper) restoredMap.getUserFunction()).checkpointedCounter);
+
 	}
 
 	private void processInputs(StreamMap<Integer, ?> map, List<Integer> input) throws Exception {
@@ -116,8 +124,9 @@ public class StatefulOperatorTest {
 		}, context);
 
 		if (serializedState != null) {
-			op.restoreInitialState((Map<String, PartitionedStateHandle>) InstantiationUtil.deserializeObject(serializedState, Thread
-					.currentThread().getContextClassLoader()));
+			op.restoreInitialState((Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>) InstantiationUtil
+					.deserializeObject(serializedState, Thread.currentThread()
+							.getContextClassLoader()));
 		}
 
 		op.open(null);
@@ -125,18 +134,21 @@ public class StatefulOperatorTest {
 		return op;
 	}
 
-	public static class StatefulMapper extends RichMapFunction<Integer, String> {
-
-		private static final long serialVersionUID = -9007873655253339356L;
+	public static class StatefulMapper extends RichMapFunction<Integer, String> implements
+			Checkpointed<Integer> {
+	private static final long serialVersionUID = -9007873655253339356L;
 		OperatorState<Integer> counter;
 		OperatorState<Integer> groupCounter;
 		OperatorState<String> concat;
+		
+		Integer checkpointedCounter = 0;
 
 		@Override
 		public String map(Integer value) throws Exception {
 			counter.updateState(counter.getState() + 1);
 			groupCounter.updateState(groupCounter.getState() + 1);
 			concat.updateState(concat.getState() + value.toString());
+			checkpointedCounter++;
 			try {
 				counter.updateState(null);
 				fail();
@@ -146,7 +158,7 @@ public class StatefulOperatorTest {
 		}
 
 		@Override
-		public void open(Configuration conf) {
+		public void open(Configuration conf) throws IOException {
 			counter = getRuntimeContext().getOperatorState("counter", 0, false);
 			groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true);
 			concat = getRuntimeContext().getOperatorState("concat", "", false);
@@ -161,6 +173,17 @@ public class StatefulOperatorTest {
 			} catch (RuntimeException e){
 			}
 		}
+
+		@Override
+		public Integer snapshotState(long checkpointId, long checkpointTimestamp)
+				throws Exception {
+			return checkpointedCounter;
+		}
+
+		@Override
+		public void restoreState(Integer state) {
+			this.checkpointedCounter = (Integer) state;
+		}
 	}
 	
 	public static class ModKey implements KeySelector<Integer, Serializable> {

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 5cd34cc..b4877c6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
@@ -192,7 +193,7 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
 		}
 		
 		@Override
-		public void open(Configuration conf){
+		public void open(Configuration conf) throws IOException{
 			state = getRuntimeContext().getOperatorState("state", 1, false, this);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index 7e046af..e2430d6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
@@ -32,6 +33,7 @@ import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -195,7 +197,7 @@ public class StreamCheckpointingITCase {
 
 		static final long[] counts = new long[PARALLELISM];
 		@Override
-		public void close() {
+		public void close() throws IOException {
 			counts[getRuntimeContext().getIndexOfThisSubtask()] = index.getState();
 		}
 
@@ -205,7 +207,7 @@ public class StreamCheckpointingITCase {
 		}
 
 		@Override
-		public void open(Configuration parameters) {
+		public void open(Configuration parameters) throws IOException {
 			rnd = new Random();
 			stringBuilder = new StringBuilder();
 			step = getRuntimeContext().getNumberOfParallelSubtasks();
@@ -264,12 +266,12 @@ public class StreamCheckpointingITCase {
 		}
 
 		@Override
-		public void open(Configuration conf) {
+		public void open(Configuration conf) throws IOException {
 			count = getRuntimeContext().getOperatorState("count", 0L, false);
 		}
 
 		@Override
-		public void close() {
+		public void close() throws IOException {
 			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
 		}
 		
@@ -334,25 +336,30 @@ public class StreamCheckpointingITCase {
 		}
 	}
 
-	private static class StringRichFilterFunction extends RichFilterFunction<String> {
+	private static class StringRichFilterFunction extends RichFilterFunction<String> implements Checkpointed<Long> {
 
-		OperatorState<Long> count;
+		Long count = 0L;
 		static final long[] counts = new long[PARALLELISM];
 		
 		@Override
 		public boolean filter(String value) {
-			count.updateState(count.getState() + 1);
+			count++;
 			return value.length() < 100;
 		}
-		
+
 		@Override
-		public void open(Configuration conf) {
-			this.count = getRuntimeContext().getOperatorState("count", 0L, false);
+		public void close() {
+			counts[getRuntimeContext().getIndexOfThisSubtask()] = count;
 		}
 
 		@Override
-		public void close() {
-			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return count;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			count = state;
 		}
 	}
 
@@ -362,18 +369,18 @@ public class StreamCheckpointingITCase {
 		static final long[] counts = new long[PARALLELISM];
 		
 		@Override
-		public PrefixCount map(String value) {
+		public PrefixCount map(String value) throws IOException {
 			count.updateState(count.getState() + 1);
 			return new PrefixCount(value.substring(0, 1), value, 1L);
 		}
 		
 		@Override
-		public void open(Configuration conf) {
+		public void open(Configuration conf) throws IOException {
 			this.count = getRuntimeContext().getOperatorState("count", 0L, false);
 		}
 
 		@Override
-		public void close() {
+		public void close() throws IOException {
 			counts[getRuntimeContext().getIndexOfThisSubtask()] = count.getState();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0ae1758a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
index 688a212..d8c925d 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureStreamingRecoveryITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.FileStateHandle;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
@@ -152,7 +153,7 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 		}
 		
 		@Override
-		public void open(Configuration conf) {
+		public void open(Configuration conf) throws IOException {
 			collected = getRuntimeContext().getOperatorState("count", 0L, false);
 		}
 
@@ -181,12 +182,12 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 		}
 	}
 
-	private static class CheckpointedSink extends RichSinkFunction<Long> {
+	private static class CheckpointedSink extends RichSinkFunction<Long> implements Checkpointed<Long> {
 
 		private long stepSize;
 		private long congruence;
 		private long toCollect;
-		private OperatorState<Long> collected;
+		private Long collected = 0L;
 		private long end;
 
 		public CheckpointedSink(long end) {
@@ -198,21 +199,30 @@ public class ProcessFailureStreamingRecoveryITCase extends AbstractProcessFailur
 			stepSize = getRuntimeContext().getNumberOfParallelSubtasks();
 			congruence = getRuntimeContext().getIndexOfThisSubtask();
 			toCollect = (end % stepSize > congruence) ? (end / stepSize + 1) : (end / stepSize);
-			collected = getRuntimeContext().getOperatorState("count", 0L, false);
 		}
 
 		@Override
 		public void invoke(Long value) throws Exception {
-			long expected = collected.getState() * stepSize + congruence;
+			long expected = collected * stepSize + congruence;
 
 			Assert.assertTrue("Value did not match expected value. " + expected + " != " + value, value.equals(expected));
 
-			collected.updateState(collected.getState() + 1);
+			collected++;
 
-			if (collected.getState() > toCollect) {
+			if (collected > toCollect) {
 				Assert.fail("Collected <= toCollect: " + collected + " > " + toCollect);
 			}
 
 		}
+
+		@Override
+		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
+			return collected;
+		}
+
+		@Override
+		public void restoreState(Long state) {
+			collected = state;
+		}
 	}
 }


Mime
View raw message