flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/2] flink git commit: [FLINK-3085] [runtime] Initialize state backends as part of "invoke()"
Date Fri, 27 Nov 2015 12:33:21 GMT
Repository: flink
Updated Branches:
  refs/heads/master d359a974a -> e69d14521


[FLINK-3085] [runtime] Initialize state backends as part of "invoke()"


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e4a4effe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e4a4effe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e4a4effe

Branch: refs/heads/master
Commit: e4a4effe179a80516b84e2b590ab4d7c65efdeb1
Parents: d359a97
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Nov 27 12:40:02 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Nov 27 12:40:02 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/runtime/tasks/StreamTask.java    | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e4a4effe/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index c310439..0a17ace 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -171,9 +171,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			configuration = new StreamConfig(getTaskConfiguration());
 			accumulatorMap = accumulatorRegistry.getUserMap();
 
-			stateBackend = createStateBackend();
-			stateBackend.initializeForJob(getEnvironment());
-
 			headOperator = configuration.getStreamOperator(userClassLoader);
 			operatorChain = new OperatorChain<>(this, headOperator, accumulatorRegistry.getReadWriteReporter());
 
@@ -207,7 +204,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		
 		boolean disposed = false;
 		try {
-			// first order of business is to ive operators back their state
+			// first order of business is to initialize the state backend and to
+			// give operators back their state
+			stateBackend = createStateBackend();
+			stateBackend.initializeForJob(getEnvironment());
+			
 			restoreStateLazy();
 			
 			// we need to make sure that any triggers scheduled in open() cannot be


Mime
View raw message