flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject flink git commit: [FLINK-2707] [streaming] Set StateCheckpointer before default state value
Date Fri, 18 Sep 2015 17:15:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master b234b0b16 -> 3e233a389


[FLINK-2707] [streaming] Set StateCheckpointer before default state value


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e233a38
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e233a38
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e233a38

Branch: refs/heads/master
Commit: 3e233a3894a2701cc16c6cb3bc8778fc84482e20
Parents: b234b0b
Author: Gyula Fora <gyfora@apache.org>
Authored: Fri Sep 18 17:54:30 2015 +0200
Committer: Gyula Fora <gyfora@apache.org>
Committed: Fri Sep 18 17:54:53 2015 +0200

----------------------------------------------------------------------
 .../runtime/tasks/StreamingRuntimeContext.java       |  2 +-
 .../streaming/api/state/StatefulOperatorTest.java    | 15 ++++++++++++++-
 2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3e233a38/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 2ca2862..b82888e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -104,8 +104,8 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 			throw new RuntimeException("Cannot set default state to null.");
 		}
 		StreamOperatorState<S, C> state = (StreamOperatorState<S, C>) getState(name,
partitioned);
-		state.setDefaultState(defaultState);
 		state.setCheckpointer(checkpointer);
+		state.setDefaultState(defaultState);
 
 		return (OperatorState<S>) state;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3e233a38/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
index 48bb1f3..76d9e16 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/state/StatefulOperatorTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.OperatorState;
+import org.apache.flink.api.common.state.StateCheckpointer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
@@ -278,7 +279,19 @@ public class StatefulOperatorTest extends StreamingMultipleProgramsTestBase
{
 
 		@Override
 		public void open(Configuration conf) throws IOException {
-			groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true);
+			groupCounter = getRuntimeContext().getOperatorState("groupCounter", 0, true,
+					new StateCheckpointer<Integer, String>() {
+
+						@Override
+						public String snapshotState(Integer state, long checkpointId, long checkpointTimestamp)
{
+							return state.toString();
+						}
+
+						@Override
+						public Integer restoreState(String stateSnapshot) {
+							return Integer.parseInt(stateSnapshot);
+						}
+					});
 		}
 
 		@SuppressWarnings("unchecked")


Mime
View raw message