flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [2/6] flink git commit: [Flink-5892] Restore state on operator level
Date Fri, 28 Apr 2017 19:55:34 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 7f24cd3..aa0f08d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
@@ -140,7 +141,9 @@ public class StreamingJobGraphGenerator {
 			legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
 		}
 
-		setChaining(hashes, legacyHashes);
+		Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
+
+		setChaining(hashes, legacyHashes, chainedOperatorHashes);
 
 		setPhysicalEdges();
 
@@ -190,9 +193,9 @@ public class StreamingJobGraphGenerator {
 	 *
 	 * <p>This will recursively create all {@link JobVertex} instances.
 	 */
-	private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
+	private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
 		for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
-			createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0);
+			createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
 		}
 	}
 
@@ -201,7 +204,8 @@ public class StreamingJobGraphGenerator {
 			Integer currentNodeId,
 			Map<Integer, byte[]> hashes,
 			List<Map<Integer, byte[]>> legacyHashes,
-			int chainIndex) {
+			int chainIndex,
+			Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
 
 		if (!builtVertices.contains(startNodeId)) {
 
@@ -220,20 +224,27 @@ public class StreamingJobGraphGenerator {
 
 			for (StreamEdge chainable : chainableOutputs) {
 				transitiveOutEdges.addAll(
-						createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1));
+						createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
 			}
 
 			for (StreamEdge nonChainable : nonChainableOutputs) {
 				transitiveOutEdges.add(nonChainable);
-				createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0);
+				createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
+			}
+
+			List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.get(startNodeId);
+			if (operatorHashes == null) {
+				operatorHashes = new ArrayList<>();
+				chainedOperatorHashes.put(startNodeId, operatorHashes);
 			}
+			operatorHashes.add(new Tuple2<>(hashes.get(currentNodeId), legacyHashes.get(1).get(currentNodeId)));
 
 			chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
 			chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
 			chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));
 
 			StreamConfig config = currentNodeId.equals(startNodeId)
-					? createJobVertex(startNodeId, hashes, legacyHashes)
+					? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
 					: new StreamConfig(new Configuration());
 
 			setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);
@@ -308,7 +319,8 @@ public class StreamingJobGraphGenerator {
 	private StreamConfig createJobVertex(
 			Integer streamNodeId,
 			Map<Integer, byte[]> hashes,
-			List<Map<Integer, byte[]>> legacyHashes) {
+			List<Map<Integer, byte[]>> legacyHashes,
+			Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
 
 		JobVertex jobVertex;
 		StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
@@ -330,18 +342,32 @@ public class StreamingJobGraphGenerator {
 			}
 		}
 
+		List<Tuple2<byte[], byte[]>> chainedOperators = chainedOperatorHashes.get(streamNodeId);
+		List<OperatorID> chainedOperatorVertexIds = new ArrayList<>();
+		List<OperatorID> userDefinedChainedOperatorVertexIds = new ArrayList<>();
+		if (chainedOperators != null) {
+			for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) {
+				chainedOperatorVertexIds.add(new OperatorID(chainedOperator.f0));
+				userDefinedChainedOperatorVertexIds.add(chainedOperator.f1 != null ? new OperatorID(chainedOperator.f1) : null);
+			}
+		}
+
 		if (streamNode.getInputFormat() != null) {
 			jobVertex = new InputFormatVertex(
 					chainedNames.get(streamNodeId),
 					jobVertexId,
-					legacyJobVertexIds);
+					legacyJobVertexIds,
+					chainedOperatorVertexIds,
+					userDefinedChainedOperatorVertexIds);
 			TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
 			taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(streamNode.getInputFormat()));
 		} else {
 			jobVertex = new JobVertex(
 					chainedNames.get(streamNodeId),
 					jobVertexId,
-					legacyJobVertexIds);
+					legacyJobVertexIds,
+					chainedOperatorVertexIds,
+					userDefinedChainedOperatorVertexIds);
 		}
 
 		jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId));

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
index 05aa694..a897674 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/StreamingJobGraphGeneratorNodeHashTest.java
@@ -47,7 +47,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Tests the {@link StreamNode} hash assignment during translation from {@link StreamGraph} to
@@ -392,10 +391,10 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that a manual hash for an intermediate chain node throws an Exception.
+	 * Tests that a manual hash for an intermediate chain node is accepted.
 	 */
-	@Test(expected = UnsupportedOperationException.class)
-	public void testManualHashAssignmentForIntermediateNodeInChainThrowsException() throws Exception {
+	@Test
+	public void testManualHashAssignmentForIntermediateNodeInChain() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
 		env.setParallelism(4);
 
@@ -409,9 +408,6 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 
 	/**
 	 * Tests that a manual hash at the beginning of a chain is accepted.
-	 *
-	 * <p>This should work, because the ID is used at the beginning of a chain. This is currently
-	 * not allowed for intermediate nodes (see {@link #testManualHashAssignmentForIntermediateNodeInChainThrowsException()}).
 	 */
 	@Test
 	public void testManualHashAssignmentForStartNodeInInChain() throws Exception {
@@ -446,7 +442,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 	}
 
 	@Test
-	public void testUserProvidedHashingOnChainNotSupported() {
+	public void testUserProvidedHashingOnChainSupported() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
 
 		env.addSource(new NoOpSourceFunction(), "src").setUidHash("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
@@ -455,11 +451,7 @@ public class StreamingJobGraphGeneratorNodeHashTest extends TestLogger {
 				.keyBy(new NoOpKeySelector())
 				.reduce(new NoOpReduceFunction()).name("reduce").setUidHash("dddddddddddddddddddddddddddddddd");
 
-		try {
-			env.getStreamGraph().getJobGraph();
-			fail();
-		} catch (UnsupportedOperationException ignored) {
-		}
+		env.getStreamGraph().getJobGraph();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index e0de7d2..72a1b63 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -32,28 +32,32 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -93,7 +97,9 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -102,7 +108,6 @@ import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSav
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -310,30 +315,43 @@ public class SavepointITCase extends TestLogger {
 				};
 			}};
 
+			ExecutionGraph graph = (ExecutionGraph) ((JobManagerMessages.JobFound) Await.result(jobManager.ask(new JobManagerMessages.RequestJob(jobId), deadline.timeLeft()), deadline.timeLeft())).executionGraph();
+
 			// - Verification START -------------------------------------------
 
 			String errMsg = "Error during gathering of TaskDeploymentDescriptors";
-			assertNull(errMsg, error[0]);
+			if (error[0] != null) {
+				throw new RuntimeException(error[0]);
+			}
+
+			Map<OperatorID, Tuple2<Integer, ExecutionJobVertex>> operatorToJobVertexMapping = new HashMap<>();
+			for (ExecutionJobVertex task : graph.getVerticesTopologically()) {
+				List<OperatorID> operatorIDs = task.getOperatorIDs();
+				for (int x = 0; x < operatorIDs.size(); x++) {
+					operatorToJobVertexMapping.put(operatorIDs.get(x), new Tuple2<>(x, task));
+				}
+			}
 
 			// Verify that all tasks, which are part of the savepoint
 			// have a matching task deployment descriptor.
-			for (TaskState taskState : savepoint.getTaskStates()) {
-				Collection<TaskDeploymentDescriptor> taskTdds = tdds.get(taskState.getJobVertexID());
+			for (OperatorState operatorState : savepoint.getOperatorStates()) {
+				Tuple2<Integer, ExecutionJobVertex> chainIndexAndJobVertex = operatorToJobVertexMapping.get(operatorState.getOperatorID());
+				Collection<TaskDeploymentDescriptor> taskTdds = tdds.get(chainIndexAndJobVertex.f1.getJobVertexId());
 
 				errMsg = "Missing task for savepoint state for operator "
-					+ taskState.getJobVertexID() + ".";
+					+ operatorState.getOperatorID() + ".";
 				assertTrue(errMsg, taskTdds.size() > 0);
 
-				assertEquals(taskState.getNumberCollectedStates(), taskTdds.size());
+				assertEquals(operatorState.getNumberCollectedStates(), taskTdds.size());
 
 				for (TaskDeploymentDescriptor tdd : taskTdds) {
-					SubtaskState subtaskState = taskState.getState(tdd.getSubtaskIndex());
+					OperatorSubtaskState subtaskState = operatorState.getState(tdd.getSubtaskIndex());
 
 					assertNotNull(subtaskState);
 
 					errMsg = "Initial operator state mismatch.";
 					assertEquals(errMsg, subtaskState.getLegacyOperatorState(),
-						tdd.getTaskStateHandles().getLegacyOperatorState());
+						tdd.getTaskStateHandles().getLegacyOperatorState().get(chainIndexAndJobVertex.f0));
 				}
 			}
 
@@ -360,15 +378,13 @@ public class SavepointITCase extends TestLogger {
 			// The checkpoint files
 			List<File> checkpointFiles = new ArrayList<>();
 
-			for (TaskState stateForTaskGroup : savepoint.getTaskStates()) {
-				for (SubtaskState subtaskState : stateForTaskGroup.getStates()) {
-					ChainedStateHandle<StreamStateHandle> streamTaskState = subtaskState.getLegacyOperatorState();
+			for (OperatorState stateForTaskGroup : savepoint.getOperatorStates()) {
+				for (OperatorSubtaskState subtaskState : stateForTaskGroup.getStates()) {
+					StreamStateHandle streamTaskState = subtaskState.getLegacyOperatorState();
 
-					for (int i = 0; i < streamTaskState.getLength(); i++) {
-						if (streamTaskState.get(i) != null) {
-							FileStateHandle fileStateHandle = (FileStateHandle) streamTaskState.get(i);
-							checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
-						}
+					if (streamTaskState != null) {
+						FileStateHandle fileStateHandle = (FileStateHandle) streamTaskState;
+						checkpointFiles.add(new File(fileStateHandle.getFilePath().toUri()));
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
new file mode 100644
index 0000000..2eecf49
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.jobmanager.JobManager;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManager;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
+import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Option;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.File;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Abstract class to verify that it is possible to migrate a 1.2 savepoint to 1.3 and that the topology can be modified
+ * from that point on.
+ * 
+ * The verification is done in 2 Steps:
+ * Step 1: Migrate the job to 1.3 by submitting the same job used for the 1.2 savepoint, and create a new savepoint.
+ * Step 2: Modify the job topology, and restore from the savepoint created in step 1.
+ */
+public abstract class AbstractOperatorRestoreTestBase {
+
+	@Rule
+	public final TemporaryFolder tmpFolder = new TemporaryFolder();
+
+	private static ActorSystem actorSystem = null;
+	private static ActorGateway jobManager = null;
+	private static ActorGateway archiver = null;
+	private static ActorGateway taskManager = null;
+
+	private static final FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+
+	@BeforeClass
+	public static void setupCluster() throws Exception {
+		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+
+		actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+		Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
+			new Configuration(),
+			actorSystem,
+			TestingUtils.defaultExecutor(),
+			TestingUtils.defaultExecutor(),
+			Option.apply("jm"),
+			Option.apply("arch"),
+			TestingJobManager.class,
+			TestingMemoryArchivist.class);
+
+		jobManager = new AkkaActorGateway(master._1(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+		archiver = new AkkaActorGateway(master._2(), HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+		Configuration tmConfig = new Configuration();
+		tmConfig.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
+
+		ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
+			tmConfig,
+			ResourceID.generate(),
+			actorSystem,
+			"localhost",
+			Option.apply("tm"),
+			Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path(), HighAvailabilityServices.DEFAULT_LEADER_ID)),
+			true,
+			TestingTaskManager.class);
+
+		taskManager = new AkkaActorGateway(taskManagerRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+		// Wait until connected
+		Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+		Await.ready(taskManager.ask(msg, timeout), timeout);
+	}
+
+	@AfterClass
+	public static void tearDownCluster() {
+		if (actorSystem != null) {
+			actorSystem.shutdown();
+		}
+
+		if (archiver != null) {
+			archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+		}
+
+		if (jobManager != null) {
+			jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+		}
+
+		if (taskManager != null) {
+			taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+		}
+	}
+
+	@Test
+	public void testMigrationAndRestore() throws Throwable {
+		// submit 1.2 job and create a migrated 1.3 savepoint
+		String savepointPath = migrateJob();
+		// restore from migrated 1.3 savepoint
+		restoreJob(savepointPath);
+	}
+
+	private String migrateJob() throws Throwable {
+		URL savepointResource = AbstractOperatorRestoreTestBase.class.getClassLoader().getResource("operatorstate/" + getMigrationSavepointName());
+		if (savepointResource == null) {
+			throw new IllegalArgumentException("Savepoint file does not exist.");
+		}
+		JobGraph jobToMigrate = createJobGraph(ExecutionMode.MIGRATE);
+		jobToMigrate.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointResource.getFile()));
+
+		Object msg;
+		Object result;
+
+		// Submit job graph
+		msg = new JobManagerMessages.SubmitJob(jobToMigrate, ListeningBehaviour.DETACHED);
+		result = Await.result(jobManager.ask(msg, timeout), timeout);
+
+		if (result instanceof JobManagerMessages.JobResultFailure) {
+			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
+			throw new Exception(failure.cause());
+		}
+		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+
+		// Wait for all tasks to be running
+		msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobToMigrate.getJobID());
+		Await.result(jobManager.ask(msg, timeout), timeout);
+
+		// Trigger savepoint
+		File targetDirectory = tmpFolder.newFolder();
+		msg = new JobManagerMessages.CancelJobWithSavepoint(jobToMigrate.getJobID(), targetDirectory.getAbsolutePath());
+		Future<Object> future = jobManager.ask(msg, timeout);
+		result = Await.result(future, timeout);
+
+		if (result instanceof JobManagerMessages.CancellationFailure) {
+			JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure) result;
+			throw new Exception(failure.cause());
+		}
+
+		String savepointPath = ((JobManagerMessages.CancellationSuccess) result).savepointPath();
+
+		// Wait until canceled
+		msg = new TestingJobManagerMessages.NotifyWhenJobStatus(jobToMigrate.getJobID(), JobStatus.CANCELED);
+		Await.ready(jobManager.ask(msg, timeout), timeout);
+
+		return savepointPath;
+	}
+
+	private void restoreJob(String savepointPath) throws Exception {
+		JobGraph jobToRestore = createJobGraph(ExecutionMode.RESTORE);
+		jobToRestore.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true));
+
+		Object msg;
+		Object result;
+
+		// Submit job graph
+		msg = new JobManagerMessages.SubmitJob(jobToRestore, ListeningBehaviour.DETACHED);
+		result = Await.result(jobManager.ask(msg, timeout), timeout);
+
+		if (result instanceof JobManagerMessages.JobResultFailure) {
+			JobManagerMessages.JobResultFailure failure = (JobManagerMessages.JobResultFailure) result;
+			throw new Exception(failure.cause());
+		}
+		Assert.assertSame(JobManagerMessages.JobSubmitSuccess.class, result.getClass());
+
+		msg = new JobManagerMessages.RequestJobStatus(jobToRestore.getJobID());
+		JobStatus status = ((JobManagerMessages.CurrentJobStatus) Await.result(jobManager.ask(msg, timeout), timeout)).status();
+		while (!status.isTerminalState()) {
+			status = ((JobManagerMessages.CurrentJobStatus) Await.result(jobManager.ask(msg, timeout), timeout)).status();
+		}
+
+		Assert.assertEquals(JobStatus.FINISHED, status);
+	}
+
+	private JobGraph createJobGraph(ExecutionMode mode) {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+		env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+		env.setRestartStrategy(RestartStrategies.noRestart());
+		env.setStateBackend(new MemoryStateBackend());
+
+		switch (mode) {
+			case MIGRATE:
+				createMigrationJob(env);
+				break;
+			case RESTORE:
+				createRestoredJob(env);
+				break;
+		}
+
+		return StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
+	}
+
+	/**
+	 * Recreates the job used to create the 1.2 savepoint.
+	 *
+	 * @param env StreamExecutionEnvironment to use
+	 */
+	protected abstract void createMigrationJob(StreamExecutionEnvironment env);
+
+	/**
+	 * Creates a modified version of the job used to create the 1.2 savepoint.
+	 *
+	 * @param env StreamExecutionEnvironment to use
+	 */
+	protected abstract void createRestoredJob(StreamExecutionEnvironment env);
+
+	/**
+	 * Returns the name of the savepoint directory to use, relative to "resources/operatorstate".
+	 *
+	 * @return savepoint directory to use
+	 */
+	protected abstract String getMigrationSavepointName();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
new file mode 100644
index 0000000..f333aca
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/ExecutionMode.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore;
+
+/**
+ * Enum to control function behavior for the different test stages.
+ * 
+ * {@link ExecutionMode#GENERATE} should be used when creating the 1.2 savepoint.
+ * {@link ExecutionMode#MIGRATE} should be used when migrating the 1.2 savepoint to 1.3.
+ * {@link ExecutionMode#RESTORE} should be used when restoring from the migrated 1.3 savepoint.
+ */
+public enum ExecutionMode {
+	GENERATE,
+	MIGRATE,
+	RESTORE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
new file mode 100644
index 0000000..28cd15a
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedComplexChainTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.keyed;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+public class KeyedComplexChainTest extends AbstractOperatorRestoreTestBase {
+
+	@Override
+	public void createMigrationJob(StreamExecutionEnvironment env) {
+		/**
+		 * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2)
+		 */
+		SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env, ExecutionMode.MIGRATE);
+
+		SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.MIGRATE, source);
+
+		SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.MIGRATE, window);
+
+		SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.MIGRATE, first);
+	}
+
+	@Override
+	protected void createRestoredJob(StreamExecutionEnvironment env) {
+		/**
+		 * Source -> keyBy -> C(Window -> StatefulMap2) -> StatefulMap1
+		 */
+		SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env, ExecutionMode.RESTORE);
+
+		SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.RESTORE, source);
+
+		SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.RESTORE, window);
+
+		SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.RESTORE, second);
+		first.startNewChain();
+	}
+
+	@Override
+	protected final String getMigrationSavepointName() {
+		return "complexKeyed";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
new file mode 100644
index 0000000..6add7b2
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.keyed;
+
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Savepoint generator to create the job used by the {@link KeyedComplexChainTest}.
+ *
+ * The job should be cancelled manually through the REST API using the cancel-with-savepoint operation.
+ */
+public class KeyedJob {
+
+	public static void main(String[] args) throws Exception {
+		ParameterTool pt = ParameterTool.fromArgs(args);
+
+		String savepointsPath = pt.getRequired("savepoint-path");
+
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointsPath);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
+		env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+		env.setRestartStrategy(RestartStrategies.noRestart());
+
+		env.setStateBackend(new MemoryStateBackend());
+
+		/**
+		 * Source -> keyBy -> C(Window -> StatefulMap1 -> StatefulMap2)
+		 */
+
+		SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = createIntegerTupleSource(env, ExecutionMode.GENERATE);
+
+		SingleOutputStreamOperator<Integer> window = createWindowFunction(ExecutionMode.GENERATE, source);
+
+		SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.GENERATE, window);
+
+		SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.GENERATE, first);
+
+		env.execute("job");
+	}
+
+	public static SingleOutputStreamOperator<Tuple2<Integer, Integer>> createIntegerTupleSource(StreamExecutionEnvironment env, ExecutionMode mode) {
+		return env.addSource(new IntegerTupleSource(mode));
+	}
+
+	public static SingleOutputStreamOperator<Integer> createWindowFunction(ExecutionMode mode, DataStream<Tuple2<Integer, Integer>> input) {
+		return input
+			.keyBy(0)
+			.countWindow(1)
+			.apply(new StatefulWindowFunction(mode))
+			.setParallelism(4)
+			.uid("window");
+	}
+
+	public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+		SingleOutputStreamOperator<Integer> map = input
+			.map(new StatefulStringStoringMap(mode, "first"))
+			.setParallelism(4);
+
+		// TODO: re-enable this when generating the actual 1.2 savepoint
+		//if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
+		map.uid("first");
+		//}
+
+		return map;
+	}
+
+	public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+		SingleOutputStreamOperator<Integer> map = input
+			.map(new StatefulStringStoringMap(mode, "second"))
+			.setParallelism(4);
+
+		// TODO: re-enable this when generating the actual 1.2 savepoint
+		//if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
+		map.uid("second");
+		//}
+
+		return map;
+	}
+
+	private static final class IntegerTupleSource extends RichSourceFunction<Tuple2<Integer, Integer>> {
+
+		private static final long serialVersionUID = 1912878510707871659L;
+		private final ExecutionMode mode;
+
+		private boolean running = true;
+
+		private IntegerTupleSource(ExecutionMode mode) {
+			this.mode = mode;
+		}
+
+		@Override
+		public void run(SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
+			for (int x = 0; x < 10; x++) {
+				ctx.collect(new Tuple2<>(x, x));
+			}
+
+			switch (mode) {
+				case GENERATE:
+				case MIGRATE:
+					synchronized (this) {
+						while (running) {
+							this.wait();
+						}
+					}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			synchronized (this) {
+				running = false;
+				this.notifyAll();
+			}
+		}
+	}
+
+	private static final class StatefulWindowFunction extends RichWindowFunction<Tuple2<Integer, Integer>, Integer, Tuple, GlobalWindow> {
+
+		private static final long serialVersionUID = -7236313076792964055L;
+
+		private final ExecutionMode mode;
+		private transient ListState<Integer> state;
+
+		private boolean applyCalled = false;
+
+		private StatefulWindowFunction(ExecutionMode mode) {
+			this.mode = mode;
+		}
+
+		@Override
+		public void open(Configuration config) {
+			this.state = getRuntimeContext().getListState(new ListStateDescriptor<>("values", Integer.class));
+		}
+
+		@Override
+		public void apply(Tuple key, GlobalWindow window, Iterable<Tuple2<Integer, Integer>> values, Collector<Integer> out) throws Exception {
+			// fail-safe to make sure apply is actually called
+			applyCalled = true;
+			switch (mode) {
+				case GENERATE:
+					for (Tuple2<Integer, Integer> value : values) {
+						state.add(value.f1);
+					}
+					break;
+				case MIGRATE:
+				case RESTORE:
+					Iterator<Tuple2<Integer, Integer>> input = values.iterator();
+					Iterator<Integer> restored = state.get().iterator();
+					while (input.hasNext() && restored.hasNext()) {
+						Tuple2<Integer, Integer> value = input.next();
+						Integer rValue = restored.next();
+						Assert.assertEquals(rValue, value.f1);
+					}
+					Assert.assertEquals(restored.hasNext(), input.hasNext());
+			}
+		}
+
+		@Override
+		public void close() {
+			Assert.assertTrue("Apply was never called.", applyCalled);
+		}
+	}
+
+	private static class StatefulStringStoringMap extends RichMapFunction<Integer, Integer> implements ListCheckpointed<String> {
+
+		private static final long serialVersionUID = 6092985758425330235L;
+		private final ExecutionMode mode;
+		private final String valueToStore;
+
+		private StatefulStringStoringMap(ExecutionMode mode, String valueToStore) {
+			this.mode = mode;
+			this.valueToStore = valueToStore;
+		}
+
+		@Override
+		public Integer map(Integer value) throws Exception {
+			return value;
+		}
+
+		@Override
+		public List<String> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Arrays.asList(valueToStore + getRuntimeContext().getIndexOfThisSubtask());
+		}
+
+		@Override
+		public void restoreState(List<String> state) throws Exception {
+			switch (mode) {
+				case GENERATE:
+					break;
+				case MIGRATE:
+				case RESTORE:
+					Assert.assertEquals("Failed for " + valueToStore + getRuntimeContext().getIndexOfThisSubtask(), 1, state.size());
+					String value = state.get(0);
+					Assert.assertEquals(valueToStore + getRuntimeContext().getIndexOfThisSubtask(), value);
+			}
+		}
+	}
+
+
+	private KeyedJob() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
new file mode 100644
index 0000000..5b51765
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/AbstractNonKeyedOperatorRestoreTestBase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.AbstractOperatorRestoreTestBase;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * All classes extending this class will use the same savepoint and migration job.
+ */
+public abstract class AbstractNonKeyedOperatorRestoreTestBase extends AbstractOperatorRestoreTestBase {
+
+	@Override
+	public void createMigrationJob(StreamExecutionEnvironment env) {
+		/**
+		 * Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+		 */
+		DataStream<Integer> source = createSource(env, ExecutionMode.MIGRATE);
+
+		SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.MIGRATE, source);
+		first.startNewChain();
+
+		SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.MIGRATE, first);
+		second.startNewChain();
+
+		SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
+
+		SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.MIGRATE, stateless);
+	}
+
+	@Override
+	protected final String getMigrationSavepointName() {
+		return "nonKeyed";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
new file mode 100644
index 0000000..6838070
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainBreakTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change breaks up a chain.
+ */
+public class ChainBreakTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+	@Override
+	public void createRestoredJob(StreamExecutionEnvironment env) {
+		/**
+		 * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+		 * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map) -> StatefulMap3
+		 */
+		DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
+
+		SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
+		first.startNewChain();
+
+		SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
+		second.startNewChain();
+
+		SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
+
+		SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
+		third.startNewChain();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
new file mode 100644
index 0000000..e405e76
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthDecreaseTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change removes an operator from a chain.
+ */
+public class ChainLengthDecreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+	@Override
+	public void createRestoredJob(StreamExecutionEnvironment env) {
+		/**
+		 * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+		 * Modified job: Source -> StatefulMap1 -> CHAIN(Map -> StatefulMap3)
+		 */
+		DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
+
+		SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
+		first.startNewChain();
+
+		SingleOutputStreamOperator<Integer> stateless = createStatelessMap(first);
+		stateless.startNewChain();
+
+		SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
new file mode 100644
index 0000000..b78aa10
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainLengthIncreaseTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operator is restored if a topology change adds an operator to a chain.
+ */
+public class ChainLengthIncreaseTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+	@Override
+	public void createRestoredJob(StreamExecutionEnvironment env) {
+		/**
+		 * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+		 * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3 -> StatefulMap4)
+		 */
+		DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
+
+		SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
+		first.startNewChain();
+
+		SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
+		second.startNewChain();
+
+		SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
+
+		SingleOutputStreamOperator<Integer> stateless2 = createStatelessMap(stateless);
+
+		SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
new file mode 100644
index 0000000..7c68b4e
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainOrderTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change causes the ordering of a chain to change.
+ */
+public class ChainOrderTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+	@Override
+	public void createRestoredJob(StreamExecutionEnvironment env) {
+		/**
+		 * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+		 * Modified job: Source -> StatefulMap1 -> CHAIN(StatefulMap3 -> Map -> StatefulMap2)
+		 */
+		DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
+
+		SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
+		first.startNewChain();
+
+		SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, first);
+		third.startNewChain();
+
+		SingleOutputStreamOperator<Integer> stateless = createStatelessMap(third);
+
+		SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, stateless);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
new file mode 100644
index 0000000..3f2fba4
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/ChainUnionTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createFirstStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSecondStatefulMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createSource;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createStatelessMap;
+import static org.apache.flink.test.state.operator.restore.unkeyed.NonKeyedJob.createThirdStatefulMap;
+
+/**
+ * Verifies that the state of all operators is restored if a topology change creates a new chain.
+ */
+public class ChainUnionTest extends AbstractNonKeyedOperatorRestoreTestBase {
+
+	@Override
+	public void createRestoredJob(StreamExecutionEnvironment env) {
+		/**
+		 * Original job: Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+		 * Modified job: Source -> CHAIN(StatefulMap1 -> StatefulMap2 -> Map -> StatefulMap3)
+		 */
+		DataStream<Integer> source = createSource(env, ExecutionMode.RESTORE);
+
+		SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.RESTORE, source);
+		first.startNewChain();
+
+		SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.RESTORE, first);
+
+		SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
+
+		SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.RESTORE, stateless);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
new file mode 100644
index 0000000..32067b3
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/unkeyed/NonKeyedJob.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.test.state.operator.restore.unkeyed;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+import org.apache.flink.test.state.operator.restore.ExecutionMode;
+import org.junit.Assert;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Savepoint generator to create the job used by the {@link AbstractNonKeyedOperatorRestoreTestBase}.
+ *
+ * The job should be cancelled manually through the REST API using the cancel-with-savepoint operation.
+ */
+public class NonKeyedJob {
+
+	public static void main(String[] args) throws Exception {
+		ParameterTool pt = ParameterTool.fromArgs(args);
+
+		String savepointsPath = pt.getRequired("savepoint-path");
+
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, savepointsPath);
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);
+		env.enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+		env.setRestartStrategy(RestartStrategies.noRestart());
+
+		env.setStateBackend(new MemoryStateBackend());
+
+		/**
+		 * Source -> StatefulMap1 -> CHAIN(StatefulMap2 -> Map -> StatefulMap3)
+		 */
+		DataStream<Integer> source = createSource(env, ExecutionMode.GENERATE);
+
+		SingleOutputStreamOperator<Integer> first = createFirstStatefulMap(ExecutionMode.GENERATE, source);
+		first.startNewChain();
+
+		SingleOutputStreamOperator<Integer> second = createSecondStatefulMap(ExecutionMode.GENERATE, first);
+		second.startNewChain();
+
+		SingleOutputStreamOperator<Integer> stateless = createStatelessMap(second);
+
+		SingleOutputStreamOperator<Integer> third = createThirdStatefulMap(ExecutionMode.GENERATE, stateless);
+
+		env.execute("job");
+	}
+
+	public static SingleOutputStreamOperator<Integer> createSource(StreamExecutionEnvironment env, ExecutionMode mode) {
+		return env.addSource(new IntegerSource(mode))
+			.setParallelism(4);
+	}
+
+	public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+		return input
+			.map(new StatefulStringStoringMap(mode, "first"))
+			.setParallelism(4)
+			.uid("first");
+	}
+
+	public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+		return input
+			.map(new StatefulStringStoringMap(mode, "second"))
+			.setParallelism(4)
+			.uid("second");
+	}
+
+	public static SingleOutputStreamOperator<Integer> createThirdStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
+		SingleOutputStreamOperator<Integer> map = input
+			.map(new StatefulStringStoringMap(mode, "third"))
+			.setParallelism(4);
+
+		// we cannot set the uid on a chained operator in 1.2
+		if (mode == ExecutionMode.MIGRATE || mode == ExecutionMode.RESTORE) {
+			map.uid("third");
+		}
+
+		return map;
+	}
+
+	public static SingleOutputStreamOperator<Integer> createStatelessMap(DataStream<Integer> input) {
+		return input.map(new NoOpMapFunction())
+			.setParallelism(4);
+	}
+
+	private static class StatefulStringStoringMap extends RichMapFunction<Integer, Integer> implements ListCheckpointed<String> {
+
+		private static final long serialVersionUID = 6092985758425330235L;
+		private final ExecutionMode mode;
+		private final String valueToStore;
+
+		private StatefulStringStoringMap(ExecutionMode mode, String valueToStore) {
+			this.mode = mode;
+			this.valueToStore = valueToStore;
+		}
+
+		@Override
+		public Integer map(Integer value) throws Exception {
+			return value;
+		}
+
+		@Override
+		public List<String> snapshotState(long checkpointId, long timestamp) throws Exception {
+			return Arrays.asList(valueToStore + getRuntimeContext().getIndexOfThisSubtask());
+		}
+
+		@Override
+		public void restoreState(List<String> state) throws Exception {
+			switch (mode) {
+				case GENERATE:
+					break;
+				case MIGRATE:
+				case RESTORE:
+					Assert.assertEquals("Failed for " + valueToStore + getRuntimeContext().getIndexOfThisSubtask(), 1, state.size());
+					String value = state.get(0);
+					Assert.assertEquals(valueToStore + getRuntimeContext().getIndexOfThisSubtask(), value);
+			}
+		}
+	}
+
+	private static class NoOpMapFunction implements MapFunction<Integer, Integer> {
+
+		private static final long serialVersionUID = 6584823409744624276L;
+
+		@Override
+		public Integer map(Integer value) throws Exception {
+			return value;
+		}
+	}
+
+	private static final class IntegerSource extends RichParallelSourceFunction<Integer> {
+
+		private static final long serialVersionUID = 1912878510707871659L;
+		private final ExecutionMode mode;
+
+		private volatile boolean running = true;
+
+		private IntegerSource(ExecutionMode mode) {
+			this.mode = mode;
+		}
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			ctx.collect(1);
+
+			switch (mode) {
+				case GENERATE:
+				case MIGRATE:
+					// keep the job running until cancel-with-savepoint was done
+					synchronized (this) {
+						while (running) {
+							this.wait();
+						}
+					}
+			}
+		}
+
+		@Override
+		public void cancel() {
+			synchronized (this) {
+				running = false;
+				this.notifyAll();
+			}
+		}
+	}
+
+	private NonKeyedJob() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata
new file mode 100644
index 0000000..9e03876
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/complexKeyed/_metadata differ

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata b/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata
new file mode 100644
index 0000000..8fcd1ea
Binary files /dev/null and b/flink-tests/src/test/resources/operatorstate/nonKeyed/_metadata differ


Mime
View raw message