flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [16/50] [abbrv] flink git commit: [Flink-5892] Restore state on operator level
Date Wed, 03 May 2017 12:10:23 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 24169f2..0d2e903 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -33,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
@@ -168,7 +170,14 @@ public class CheckpointCoordinatorTest {
 			final ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
 			final ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
 			ExecutionVertex triggerVertex1 = mockExecutionVertex(triggerAttemptID1);
-			ExecutionVertex triggerVertex2 = mockExecutionVertex(triggerAttemptID2, new JobVertexID(), 1, 1, ExecutionState.FINISHED);
+			JobVertexID jobVertexID2 = new JobVertexID();
+			ExecutionVertex triggerVertex2 = mockExecutionVertex(
+				triggerAttemptID2,
+				jobVertexID2,
+				Lists.newArrayList(OperatorID.fromJobVertexID(jobVertexID2)),
+				1,
+				1,
+				ExecutionState.FINISHED);
 
 			// create some mock Execution vertices that need to ack the checkpoint
 			final ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
@@ -317,7 +326,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(jid, checkpoint.getJobId());
 			assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
-			assertEquals(0, checkpoint.getTaskStates().size());
+			assertEquals(0, checkpoint.getOperatorStates().size());
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.isFullyAcknowledged());
 
@@ -426,7 +435,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(jid, checkpoint1.getJobId());
 			assertEquals(2, checkpoint1.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpoint1.getNumberOfAcknowledgedTasks());
-			assertEquals(0, checkpoint1.getTaskStates().size());
+			assertEquals(0, checkpoint1.getOperatorStates().size());
 			assertFalse(checkpoint1.isDiscarded());
 			assertFalse(checkpoint1.isFullyAcknowledged());
 
@@ -436,7 +445,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(jid, checkpoint2.getJobId());
 			assertEquals(2, checkpoint2.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpoint2.getNumberOfAcknowledgedTasks());
-			assertEquals(0, checkpoint2.getTaskStates().size());
+			assertEquals(0, checkpoint2.getOperatorStates().size());
 			assertFalse(checkpoint2.isDiscarded());
 			assertFalse(checkpoint2.isFullyAcknowledged());
 
@@ -471,7 +480,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(jid, checkpointNew.getJobId());
 			assertEquals(2, checkpointNew.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
-			assertEquals(0, checkpointNew.getTaskStates().size());
+			assertEquals(0, checkpointNew.getOperatorStates().size());
 			assertFalse(checkpointNew.isDiscarded());
 			assertFalse(checkpointNew.isFullyAcknowledged());
 			assertNotEquals(checkpoint1.getCheckpointId(), checkpointNew.getCheckpointId());
@@ -539,10 +548,20 @@ public class CheckpointCoordinatorTest {
 			assertEquals(jid, checkpoint.getJobId());
 			assertEquals(2, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
-			assertEquals(0, checkpoint.getTaskStates().size());
+			assertEquals(0, checkpoint.getOperatorStates().size());
 			assertFalse(checkpoint.isDiscarded());
 			assertFalse(checkpoint.isFullyAcknowledged());
 
+			OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+			OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+
+			Map<OperatorID, OperatorState> operatorStates = checkpoint.getOperatorStates();
+
+			operatorStates.put(opID1, new SpyInjectingOperatorState(
+				opID1, vertex1.getTotalNumberOfParallelSubtasks(), vertex1.getMaxParallelism()));
+			operatorStates.put(opID2, new SpyInjectingOperatorState(
+				opID2, vertex2.getTotalNumberOfParallelSubtasks(), vertex2.getMaxParallelism()));
+
 			// check that the vertices received the trigger checkpoint message
 			{
 				verify(vertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId), eq(timestamp), any(CheckpointOptions.class));
@@ -550,9 +569,9 @@ public class CheckpointCoordinatorTest {
 			}
 
 			// acknowledge from one of the tasks
-			SubtaskState subtaskState2 = mock(SubtaskState.class);
-			AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), subtaskState2);
+			AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), mock(SubtaskState.class));
 			coord.receiveAcknowledgeMessage(acknowledgeCheckpoint1);
+			OperatorSubtaskState subtaskState2 = operatorStates.get(opID2).getState(vertex2.getParallelSubtaskIndex());
 			assertEquals(1, checkpoint.getNumberOfAcknowledgedTasks());
 			assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
 			assertFalse(checkpoint.isDiscarded());
@@ -566,8 +585,8 @@ public class CheckpointCoordinatorTest {
 			verify(subtaskState2, never()).registerSharedStates(any(SharedStateRegistry.class));
 
 			// acknowledge the other task.
-			SubtaskState subtaskState1 = mock(SubtaskState.class);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), subtaskState1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), mock(SubtaskState.class)));
+			OperatorSubtaskState subtaskState1 = operatorStates.get(opID1).getState(vertex1.getParallelSubtaskIndex());
 
 			// the checkpoint is internally converted to a successful checkpoint and the
 			// pending checkpoint object is disposed
@@ -596,7 +615,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(jid, success.getJobId());
 			assertEquals(timestamp, success.getTimestamp());
 			assertEquals(checkpoint.getCheckpointId(), success.getCheckpointID());
-			assertEquals(2, success.getTaskStates().size());
+			assertEquals(2, success.getOperatorStates().size());
 
 			// ---------------
 			// trigger another checkpoint and see that this one replaces the other checkpoint
@@ -606,7 +625,9 @@ public class CheckpointCoordinatorTest {
 
 			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
+			subtaskState1 = operatorStates.get(opID1).getState(vertex1.getParallelSubtaskIndex());
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
+			subtaskState2 = operatorStates.get(opID2).getState(vertex2.getParallelSubtaskIndex());
 
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -616,7 +637,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(jid, successNew.getJobId());
 			assertEquals(timestampNew, successNew.getTimestamp());
 			assertEquals(checkpointIdNew, successNew.getCheckpointID());
-			assertTrue(successNew.getTaskStates().isEmpty());
+			assertTrue(successNew.getOperatorStates().isEmpty());
 
 			// validate that the subtask states in old savepoint have unregister their shared states
 			{
@@ -756,13 +777,13 @@ public class CheckpointCoordinatorTest {
 			assertEquals(checkpointId1, sc1.getCheckpointID());
 			assertEquals(timestamp1, sc1.getTimestamp());
 			assertEquals(jid, sc1.getJobId());
-			assertTrue(sc1.getTaskStates().isEmpty());
+			assertTrue(sc1.getOperatorStates().isEmpty());
 
 			CompletedCheckpoint sc2 = scs.get(1);
 			assertEquals(checkpointId2, sc2.getCheckpointID());
 			assertEquals(timestamp2, sc2.getTimestamp());
 			assertEquals(jid, sc2.getJobId());
-			assertTrue(sc2.getTaskStates().isEmpty());
+			assertTrue(sc2.getOperatorStates().isEmpty());
 
 			coord.shutdown(JobStatus.FINISHED);
 		}
@@ -830,10 +851,22 @@ public class CheckpointCoordinatorTest {
 			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
 			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId1), eq(timestamp1), any(CheckpointOptions.class));
 
-			// acknowledge one of the three tasks
-			SubtaskState subtaskState1_2 = mock(SubtaskState.class);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1, new CheckpointMetrics(), subtaskState1_2));
+			OperatorID opID1 = OperatorID.fromJobVertexID(ackVertex1.getJobvertexId());
+			OperatorID opID2 = OperatorID.fromJobVertexID(ackVertex2.getJobvertexId());
+			OperatorID opID3 = OperatorID.fromJobVertexID(ackVertex3.getJobvertexId());
+
+			Map<OperatorID, OperatorState> operatorStates1 = pending1.getOperatorStates();
+
+			operatorStates1.put(opID1, new SpyInjectingOperatorState(
+				opID1, ackVertex1.getTotalNumberOfParallelSubtasks(), ackVertex1.getMaxParallelism()));
+			operatorStates1.put(opID2, new SpyInjectingOperatorState(
+				opID2, ackVertex2.getTotalNumberOfParallelSubtasks(), ackVertex2.getMaxParallelism()));
+			operatorStates1.put(opID3, new SpyInjectingOperatorState(
+				opID3, ackVertex3.getTotalNumberOfParallelSubtasks(), ackVertex3.getMaxParallelism()));
 
+			// acknowledge one of the three tasks
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1, new CheckpointMetrics(), mock(SubtaskState.class)));
+			OperatorSubtaskState subtaskState1_2 = operatorStates1.get(opID2).getState(ackVertex2.getParallelSubtaskIndex());
 			// start the second checkpoint
 			// trigger the first checkpoint. this should succeed
 			assertTrue(coord.triggerCheckpoint(timestamp2, false));
@@ -850,6 +883,15 @@ public class CheckpointCoordinatorTest {
 			}
 			long checkpointId2 = pending2.getCheckpointId();
 
+			Map<OperatorID, OperatorState> operatorStates2 = pending2.getOperatorStates();
+
+			operatorStates2.put(opID1, new SpyInjectingOperatorState(
+				opID1, ackVertex1.getTotalNumberOfParallelSubtasks(), ackVertex1.getMaxParallelism()));
+			operatorStates2.put(opID2, new SpyInjectingOperatorState(
+				opID2, ackVertex2.getTotalNumberOfParallelSubtasks(), ackVertex2.getMaxParallelism()));
+			operatorStates2.put(opID3, new SpyInjectingOperatorState(
+				opID3, ackVertex3.getTotalNumberOfParallelSubtasks(), ackVertex3.getMaxParallelism()));
+
 			// trigger messages should have been sent
 			verify(triggerVertex1.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
 			verify(triggerVertex2.getCurrentExecutionAttempt(), times(1)).triggerCheckpoint(eq(checkpointId2), eq(timestamp2), any(CheckpointOptions.class));
@@ -857,17 +899,17 @@ public class CheckpointCoordinatorTest {
 			// we acknowledge one more task from the first checkpoint and the second
 			// checkpoint completely. The second checkpoint should then subsume the first checkpoint
 
-			SubtaskState subtaskState2_3 = mock(SubtaskState.class);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2, new CheckpointMetrics(), subtaskState2_3));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2, new CheckpointMetrics(), mock(SubtaskState.class)));
+			OperatorSubtaskState subtaskState2_3 = operatorStates2.get(opID3).getState(ackVertex3.getParallelSubtaskIndex());
 
-			SubtaskState subtaskState2_1 = mock(SubtaskState.class);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2, new CheckpointMetrics(), subtaskState2_1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2, new CheckpointMetrics(), mock(SubtaskState.class)));
+			OperatorSubtaskState subtaskState2_1 = operatorStates2.get(opID1).getState(ackVertex1.getParallelSubtaskIndex());
 
-			SubtaskState subtaskState1_1 = mock(SubtaskState.class);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1, new CheckpointMetrics(), subtaskState1_1));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1, new CheckpointMetrics(), mock(SubtaskState.class)));
+			OperatorSubtaskState subtaskState1_1 = operatorStates1.get(opID1).getState(ackVertex1.getParallelSubtaskIndex());
 
-			SubtaskState subtaskState2_2 = mock(SubtaskState.class);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2, new CheckpointMetrics(), subtaskState2_2));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2, new CheckpointMetrics(), mock(SubtaskState.class)));
+			OperatorSubtaskState subtaskState2_2 = operatorStates2.get(opID2).getState(ackVertex2.getParallelSubtaskIndex());
 
 			// now, the second checkpoint should be confirmed, and the first discarded
 			// actually both pending checkpoints are discarded, and the second has been transformed
@@ -896,7 +938,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(checkpointId2, success.getCheckpointID());
 			assertEquals(timestamp2, success.getTimestamp());
 			assertEquals(jid, success.getJobId());
-			assertEquals(3, success.getTaskStates().size());
+			assertEquals(3, success.getOperatorStates().size());
 
 			// the first confirm message should be out
 			verify(commitVertex.getCurrentExecutionAttempt(), times(1)).notifyCheckpointComplete(eq(checkpointId2), eq(timestamp2));
@@ -970,8 +1012,15 @@ public class CheckpointCoordinatorTest {
 			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
 			assertFalse(checkpoint.isDiscarded());
 
-			SubtaskState subtaskState = mock(SubtaskState.class);
-			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), subtaskState));
+			OperatorID opID1 = OperatorID.fromJobVertexID(ackVertex1.getJobvertexId());
+
+			Map<OperatorID, OperatorState> operatorStates = checkpoint.getOperatorStates();
+
+			operatorStates.put(opID1, new SpyInjectingOperatorState(
+				opID1, ackVertex1.getTotalNumberOfParallelSubtasks(), ackVertex1.getMaxParallelism()));
+
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId(), new CheckpointMetrics(), mock(SubtaskState.class)));
+			OperatorSubtaskState subtaskState = operatorStates.get(opID1).getState(ackVertex1.getParallelSubtaskIndex());
 
 			// wait until the checkpoint must have expired.
 			// we check every 250 msecs conservatively for 5 seconds
@@ -1106,13 +1155,25 @@ public class CheckpointCoordinatorTest {
 
 		long checkpointId = pendingCheckpoint.getCheckpointId();
 
-		SubtaskState triggerSubtaskState = mock(SubtaskState.class);
+		OperatorID opIDtrigger = OperatorID.fromJobVertexID(triggerVertex.getJobvertexId());
+		OperatorID opID1 = OperatorID.fromJobVertexID(ackVertex1.getJobvertexId());
+		OperatorID opID2 = OperatorID.fromJobVertexID(ackVertex2.getJobvertexId());
+
+		Map<OperatorID, OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
+
+		operatorStates.put(opIDtrigger, new SpyInjectingOperatorState(
+			opIDtrigger, triggerVertex.getTotalNumberOfParallelSubtasks(), triggerVertex.getMaxParallelism()));
+		operatorStates.put(opID1, new SpyInjectingOperatorState(
+			opID1, ackVertex1.getTotalNumberOfParallelSubtasks(), ackVertex1.getMaxParallelism()));
+		operatorStates.put(opID2, new SpyInjectingOperatorState(
+			opID2, ackVertex2.getTotalNumberOfParallelSubtasks(), ackVertex2.getMaxParallelism()));
 
 		// acknowledge the first trigger vertex
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), mock(SubtaskState.class)));
+		OperatorSubtaskState storedTriggerSubtaskState = operatorStates.get(opIDtrigger).getState(triggerVertex.getParallelSubtaskIndex());
 
-		// verify that the subtask state has registered its shared states at the registry
-		verify(triggerSubtaskState, never()).discardState();
+		// verify that the subtask state has not been discarded
+		verify(storedTriggerSubtaskState, never()).discardState();
 
 		SubtaskState unknownSubtaskState = mock(SubtaskState.class);
 
@@ -1131,20 +1192,20 @@ public class CheckpointCoordinatorTest {
 		verify(differentJobSubtaskState, never()).discardState();
 
 		// duplicate acknowledge message for the trigger vertex
-		reset(triggerSubtaskState);
+		SubtaskState triggerSubtaskState = mock(SubtaskState.class);
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, triggerAttemptId, checkpointId, new CheckpointMetrics(), triggerSubtaskState));
 
 		// duplicate acknowledge messages for a known vertex should not trigger discarding the state
 		verify(triggerSubtaskState, never()).discardState();
 
 		// let the checkpoint fail at the first ack vertex
-		reset(triggerSubtaskState);
+		reset(storedTriggerSubtaskState);
 		coord.receiveDeclineMessage(new DeclineCheckpoint(jobId, ackAttemptId1, checkpointId));
 
 		assertTrue(pendingCheckpoint.isDiscarded());
 
 		// check that we've cleaned up the already acknowledged state
-		verify(triggerSubtaskState, times(1)).discardState();
+		verify(storedTriggerSubtaskState, times(1)).discardState();
 
 		SubtaskState ackSubtaskState = mock(SubtaskState.class);
 
@@ -1411,15 +1472,25 @@ public class CheckpointCoordinatorTest {
 		assertEquals(jid, pending.getJobId());
 		assertEquals(2, pending.getNumberOfNonAcknowledgedTasks());
 		assertEquals(0, pending.getNumberOfAcknowledgedTasks());
-		assertEquals(0, pending.getTaskStates().size());
+		assertEquals(0, pending.getOperatorStates().size());
 		assertFalse(pending.isDiscarded());
 		assertFalse(pending.isFullyAcknowledged());
 		assertFalse(pending.canBeSubsumed());
 
+		OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
+		OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
+
+		Map<OperatorID, OperatorState> operatorStates = pending.getOperatorStates();
+
+		operatorStates.put(opID1, new SpyInjectingOperatorState(
+			opID1, vertex1.getTotalNumberOfParallelSubtasks(), vertex1.getMaxParallelism()));
+		operatorStates.put(opID2, new SpyInjectingOperatorState(
+			opID2, vertex2.getTotalNumberOfParallelSubtasks(), vertex1.getMaxParallelism()));
+
 		// acknowledge from one of the tasks
-		SubtaskState subtaskState2 = mock(SubtaskState.class);
-		AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), subtaskState2);
+		AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(jid, attemptID2, checkpointId, new CheckpointMetrics(), mock(SubtaskState.class));
 		coord.receiveAcknowledgeMessage(acknowledgeCheckpoint2);
+		OperatorSubtaskState subtaskState2 = operatorStates.get(opID2).getState(vertex2.getParallelSubtaskIndex());
 		assertEquals(1, pending.getNumberOfAcknowledgedTasks());
 		assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
 		assertFalse(pending.isDiscarded());
@@ -1433,8 +1504,8 @@ public class CheckpointCoordinatorTest {
 		assertFalse(savepointFuture.isDone());
 
 		// acknowledge the other task.
-		SubtaskState subtaskState1 = mock(SubtaskState.class);
-		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), subtaskState1));
+		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId, new CheckpointMetrics(), mock(SubtaskState.class)));
+		OperatorSubtaskState subtaskState1 = operatorStates.get(opID1).getState(vertex1.getParallelSubtaskIndex());
 
 		// the checkpoint is internally converted to a successful checkpoint and the
 		// pending checkpoint object is disposed
@@ -1461,7 +1532,7 @@ public class CheckpointCoordinatorTest {
 		assertEquals(jid, success.getJobId());
 		assertEquals(timestamp, success.getTimestamp());
 		assertEquals(pending.getCheckpointId(), success.getCheckpointID());
-		assertEquals(2, success.getTaskStates().size());
+		assertEquals(2, success.getOperatorStates().size());
 
 		// ---------------
 		// trigger another checkpoint and see that this one replaces the other checkpoint
@@ -1474,6 +1545,9 @@ public class CheckpointCoordinatorTest {
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
 		coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
 
+		subtaskState1 = operatorStates.get(opID1).getState(vertex1.getParallelSubtaskIndex());
+		subtaskState2 = operatorStates.get(opID2).getState(vertex2.getParallelSubtaskIndex());
+
 		assertEquals(0, coord.getNumberOfPendingCheckpoints());
 		assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
@@ -1481,7 +1555,7 @@ public class CheckpointCoordinatorTest {
 		assertEquals(jid, successNew.getJobId());
 		assertEquals(timestampNew, successNew.getTimestamp());
 		assertEquals(checkpointIdNew, successNew.getCheckpointID());
-		assertTrue(successNew.getTaskStates().isEmpty());
+		assertTrue(successNew.getOperatorStates().isEmpty());
 		assertTrue(savepointFuture.isDone());
 
 		// validate that the first savepoint does not discard its private states.
@@ -1969,6 +2043,18 @@ public class CheckpointCoordinatorTest {
 		List<KeyGroupRange> keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, parallelism1);
 		List<KeyGroupRange> keyGroupPartitions2 = StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2);
 
+		PendingCheckpoint pending = coord.getPendingCheckpoints().get(checkpointId);
+
+		OperatorID opID1 = OperatorID.fromJobVertexID(jobVertexID1);
+		OperatorID opID2 = OperatorID.fromJobVertexID(jobVertexID2);
+
+		Map<OperatorID, OperatorState> operatorStates = pending.getOperatorStates();
+
+		operatorStates.put(opID1, new SpyInjectingOperatorState(
+			opID1, jobVertex1.getParallelism(), jobVertex1.getMaxParallelism()));
+		operatorStates.put(opID2, new SpyInjectingOperatorState(
+			opID2, jobVertex2.getParallelism(), jobVertex2.getMaxParallelism()));
+
 		for (int index = 0; index < jobVertex1.getParallelism(); index++) {
 			SubtaskState subtaskState = mockSubtaskState(jobVertexID1, index, keyGroupPartitions1.get(index));
 
@@ -2004,8 +2090,8 @@ public class CheckpointCoordinatorTest {
 
 		// All shared states should be unregistered once the store is shut down
 		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
-			for (TaskState taskState : completedCheckpoint.getTaskStates().values()) {
-				for (SubtaskState subtaskState : taskState.getStates()) {
+			for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
+				for (OperatorSubtaskState subtaskState : taskState.getStates()) {
 					verify(subtaskState, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
 				}
 			}
@@ -2021,8 +2107,8 @@ public class CheckpointCoordinatorTest {
 
 		// validate that all shared states are registered again after the recovery.
 		for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
-			for (TaskState taskState : completedCheckpoint.getTaskStates().values()) {
-				for (SubtaskState subtaskState : taskState.getStates()) {
+			for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
+				for (OperatorSubtaskState subtaskState : taskState.getStates()) {
 					verify(subtaskState, times(2)).registerSharedStates(any(SharedStateRegistry.class));
 				}
 			}
@@ -2432,7 +2518,11 @@ public class CheckpointCoordinatorTest {
 
 			actualOpStatesBackend.add(opStateBackend);
 			actualOpStatesRaw.add(opStateRaw);
-			assertNull(operatorState);
+			// the 'non partition state' is not null because it is recombined.
+			assertNotNull(operatorState);
+			for (int index = 0; index < operatorState.getLength(); index++) {
+				assertNull(operatorState.get(index));
+			}
 			compareKeyedState(Collections.singletonList(originalKeyedStateBackend), keyedStateBackend);
 			compareKeyedState(Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw);
 		}
@@ -2682,7 +2772,21 @@ public class CheckpointCoordinatorTest {
 	}
 
 	static ExecutionJobVertex mockExecutionJobVertex(
+			JobVertexID jobVertexID,
+			int parallelism,
+			int maxParallelism) {
+
+		return mockExecutionJobVertex(
+			jobVertexID,
+			Collections.singletonList(OperatorID.fromJobVertexID(jobVertexID)),
+			parallelism,
+			maxParallelism
+		);
+	}
+
+	static ExecutionJobVertex mockExecutionJobVertex(
 		JobVertexID jobVertexID,
+		List<OperatorID> jobVertexIDs,
 		int parallelism,
 		int maxParallelism) {
 		final ExecutionJobVertex executionJobVertex = mock(ExecutionJobVertex.class);
@@ -2693,6 +2797,7 @@ public class CheckpointCoordinatorTest {
 			executionVertices[i] = mockExecutionVertex(
 				new ExecutionAttemptID(),
 				jobVertexID,
+				jobVertexIDs,
 				parallelism,
 				maxParallelism,
 				ExecutionState.RUNNING);
@@ -2705,14 +2810,18 @@ public class CheckpointCoordinatorTest {
 		when(executionJobVertex.getParallelism()).thenReturn(parallelism);
 		when(executionJobVertex.getMaxParallelism()).thenReturn(maxParallelism);
 		when(executionJobVertex.isMaxParallelismConfigured()).thenReturn(true);
+		when(executionJobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
+		when(executionJobVertex.getUserDefinedOperatorIDs()).thenReturn(Arrays.asList(new OperatorID[jobVertexIDs.size()]));
 
 		return executionJobVertex;
 	}
 
 	static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
+		JobVertexID jobVertexID = new JobVertexID();
 		return mockExecutionVertex(
 			attemptID,
-			new JobVertexID(),
+			jobVertexID,
+			Arrays.asList(OperatorID.fromJobVertexID(jobVertexID)),
 			1,
 			1,
 			ExecutionState.RUNNING);
@@ -2721,6 +2830,7 @@ public class CheckpointCoordinatorTest {
 	private static ExecutionVertex mockExecutionVertex(
 		ExecutionAttemptID attemptID,
 		JobVertexID jobVertexID,
+		List<OperatorID> jobVertexIDs,
 		int parallelism,
 		int maxParallelism,
 		ExecutionState state,
@@ -2743,6 +2853,11 @@ public class CheckpointCoordinatorTest {
 		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism);
 		when(vertex.getMaxParallelism()).thenReturn(maxParallelism);
 
+		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+		when(jobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
+		
+		when(vertex.getJobVertex()).thenReturn(jobVertex);
+
 		return vertex;
 	}
 
@@ -3135,8 +3250,16 @@ public class CheckpointCoordinatorTest {
 			null,
 			Executors.directExecutor());
 
-		store.addCheckpoint(
-			new CompletedCheckpoint(new JobID(), 0, 0, 0, Collections.<JobVertexID, TaskState>emptyMap()));
+		store.addCheckpoint(new CompletedCheckpoint(
+			new JobID(),
+			0,
+			0,
+			0,
+			Collections.<OperatorID, OperatorState>emptyMap(),
+			Collections.<MasterState>emptyList(),
+			CheckpointProperties.forStandardCheckpoint(),
+			null,
+			null));
 
 		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
 		coord.setCheckpointStatsTracker(tracker);
@@ -3146,4 +3269,17 @@ public class CheckpointCoordinatorTest {
 		verify(tracker, times(1))
 			.reportRestoredCheckpoint(any(RestoredCheckpointStats.class));
 	}
+
+	private static final class SpyInjectingOperatorState extends OperatorState {
+
+		private static final long serialVersionUID = -4004437428483663815L;
+
+		public SpyInjectingOperatorState(OperatorID taskID, int parallelism, int maxParallelism) {
+			super(taskID, parallelism, maxParallelism);
+		}
+
+		public void putState(int subtaskIndex, OperatorSubtaskState subtaskState) {
+			super.putState(subtaskIndex, spy(subtaskState));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 2fc1de5..7d24568 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
@@ -209,6 +210,8 @@ public class CheckpointStateRestoreTest {
 		JobVertexID jobVertexId1 = new JobVertexID();
 		JobVertexID jobVertexId2 = new JobVertexID();
 
+		OperatorID operatorId1 = OperatorID.fromJobVertexID(jobVertexId1);
+
 		// 1st JobVertex
 		ExecutionVertex vertex11 = mockExecutionVertex(mockExecution(), jobVertexId1, 0, 3);
 		ExecutionVertex vertex12 = mockExecutionVertex(mockExecution(), jobVertexId1, 1, 3);
@@ -239,20 +242,30 @@ public class CheckpointStateRestoreTest {
 			null,
 			Executors.directExecutor());
 
-		ChainedStateHandle<StreamStateHandle> serializedState = CheckpointCoordinatorTest
-				.generateChainedStateHandle(new SerializableObject());
+		StreamStateHandle serializedState = CheckpointCoordinatorTest
+				.generateChainedStateHandle(new SerializableObject())
+				.get(0);
 
 		// --- (2) Checkpoint misses state for a jobVertex (should work) ---
-		Map<JobVertexID, TaskState> checkpointTaskStates = new HashMap<>();
+		Map<OperatorID, OperatorState> checkpointTaskStates = new HashMap<>();
 		{
-			TaskState taskState = new TaskState(jobVertexId1, 3, 3, 1);
-			taskState.putState(0, new SubtaskState(serializedState, null, null, null, null));
-			taskState.putState(1, new SubtaskState(serializedState, null, null, null, null));
-			taskState.putState(2, new SubtaskState(serializedState, null, null, null, null));
+			OperatorState taskState = new OperatorState(operatorId1, 3, 3);
+			taskState.putState(0, new OperatorSubtaskState(serializedState, null, null, null, null));
+			taskState.putState(1, new OperatorSubtaskState(serializedState, null, null, null, null));
+			taskState.putState(2, new OperatorSubtaskState(serializedState, null, null, null, null));
 
-			checkpointTaskStates.put(jobVertexId1, taskState);
+			checkpointTaskStates.put(operatorId1, taskState);
 		}
-		CompletedCheckpoint checkpoint = new CompletedCheckpoint(new JobID(), 0, 1, 2, new HashMap<>(checkpointTaskStates));
+		CompletedCheckpoint checkpoint = new CompletedCheckpoint(
+			new JobID(),
+			0,
+			1,
+			2,
+			new HashMap<>(checkpointTaskStates),
+			Collections.<MasterState>emptyList(),
+			CheckpointProperties.forStandardCheckpoint(),
+			null,
+			null);
 
 		coord.getCheckpointStore().addCheckpoint(checkpoint);
 
@@ -261,16 +274,26 @@ public class CheckpointStateRestoreTest {
 
 		// --- (3) JobVertex missing for task state that is part of the checkpoint ---
 		JobVertexID newJobVertexID = new JobVertexID();
+		OperatorID newOperatorID = OperatorID.fromJobVertexID(newJobVertexID);
 
 		// There is no task for this
 		{
-			TaskState taskState = new TaskState(jobVertexId1, 1, 1, 1);
-			taskState.putState(0, new SubtaskState(serializedState, null, null, null, null));
+			OperatorState taskState = new OperatorState(newOperatorID, 1, 1);
+			taskState.putState(0, new OperatorSubtaskState(serializedState, null, null, null, null));
 
-			checkpointTaskStates.put(newJobVertexID, taskState);
+			checkpointTaskStates.put(newOperatorID, taskState);
 		}
 
-		checkpoint = new CompletedCheckpoint(new JobID(), 1, 2, 3, new HashMap<>(checkpointTaskStates));
+		checkpoint = new CompletedCheckpoint(
+			new JobID(),
+			1,
+			2,
+			3,
+			new HashMap<>(checkpointTaskStates),
+			Collections.<MasterState>emptyList(),
+			CheckpointProperties.forStandardCheckpoint(),
+			null,
+			null);
 
 		coord.getCheckpointStore().addCheckpoint(checkpoint);
 
@@ -314,6 +337,12 @@ public class CheckpointStateRestoreTest {
 		when(vertex.getMaxParallelism()).thenReturn(vertices.length);
 		when(vertex.getJobVertexId()).thenReturn(id);
 		when(vertex.getTaskVertices()).thenReturn(vertices);
+		when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorID.fromJobVertexID(id)));
+		when(vertex.getUserDefinedOperatorIDs()).thenReturn(Collections.<OperatorID>singletonList(null));
+
+		for (ExecutionVertex v : vertices) {
+			when(v.getJobVertex()).thenReturn(vertex);
+		}
 		return vertex;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index fc6e516..94bd12f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -20,8 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -39,6 +38,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
@@ -105,7 +105,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 		assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
 
 		for (int i = 1; i < expected.length; i++) {
-			Collection<TaskState> taskStates = expected[i - 1].getTaskStates().values();
+			Collection<OperatorState> taskStates = expected[i - 1].getOperatorStates().values();
 
 			checkpoints.addCheckpoint(expected[i]);
 
@@ -114,8 +114,8 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			assertTrue(expected[i - 1].isDiscarded());
 			assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints());
 
-			for (TaskState taskState : taskStates) {
-				for (SubtaskState subtaskState : taskState.getStates()) {
+			for (OperatorState operatorState : taskStates) {
+				for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
 					verify(subtaskState, times(1)).unregisterSharedStates(any(SharedStateRegistry.class));
 				}
 			}
@@ -198,51 +198,43 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 	// ---------------------------------------------------------------------------------------------
 
 	protected TestCompletedCheckpoint createCheckpoint(int id) throws IOException {
-		return createCheckpoint(id, 4);
-	}
-
-	protected TestCompletedCheckpoint createCheckpoint(int id, int numberOfStates)
-			throws IOException {
-		return createCheckpoint(id, numberOfStates, CheckpointProperties.forStandardCheckpoint());
-	}
+		int numberOfStates = 4;
+		CheckpointProperties props = CheckpointProperties.forStandardCheckpoint();
 
-	protected TestCompletedCheckpoint createCheckpoint(int id, int numberOfStates, CheckpointProperties props)
-			throws IOException {
+		OperatorID operatorID = new OperatorID();
 
-		JobVertexID jvid = new JobVertexID();
-
-		Map<JobVertexID, TaskState> taskGroupStates = new HashMap<>();
-		TaskState taskState = new TaskState(jvid, numberOfStates, numberOfStates, 1);
-		taskGroupStates.put(jvid, taskState);
+		Map<OperatorID, OperatorState> operatorGroupState = new HashMap<>();
+		OperatorState operatorState = new OperatorState(operatorID, numberOfStates, numberOfStates);
+		operatorGroupState.put(operatorID, operatorState);
 
 		for (int i = 0; i < numberOfStates; i++) {
-			SubtaskState subtaskState = CheckpointCoordinatorTest.mockSubtaskState(jvid, i, new KeyGroupRange(i, i));
+			OperatorSubtaskState subtaskState = mock(OperatorSubtaskState.class);
 
-			taskState.putState(i, subtaskState);
+			operatorState.putState(i, subtaskState);
 		}
 
-		return new TestCompletedCheckpoint(new JobID(), id, 0, taskGroupStates, props);
+		return new TestCompletedCheckpoint(new JobID(), id, 0, operatorGroupState, props);
 	}
 
-	protected void resetCheckpoint(Collection<TaskState> taskStates) {
-		for (TaskState taskState : taskStates) {
-			for (SubtaskState subtaskState : taskState.getStates()) {
+	protected void resetCheckpoint(Collection<OperatorState> operatorStates) {
+		for (OperatorState operatorState : operatorStates) {
+			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
 				Mockito.reset(subtaskState);
 			}
 		}
 	}
 
-	protected void verifyCheckpointRegistered(Collection<TaskState> taskStates, SharedStateRegistry registry) {
-		for (TaskState taskState : taskStates) {
-			for (SubtaskState subtaskState : taskState.getStates()) {
+	protected void verifyCheckpointRegistered(Collection<OperatorState> operatorStates, SharedStateRegistry registry) {
+		for (OperatorState operatorState : operatorStates) {
+			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
 				verify(subtaskState, times(1)).registerSharedStates(eq(registry));
 			}
 		}
 	}
 
-	protected void verifyCheckpointDiscarded(Collection<TaskState> taskStates) {
-		for (TaskState taskState : taskStates) {
-			for (SubtaskState subtaskState : taskState.getStates()) {
+	protected void verifyCheckpointDiscarded(Collection<OperatorState> operatorStates) {
+		for (OperatorState operatorState : operatorStates) {
+			for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
 				verify(subtaskState, times(1)).discardState();
 			}
 		}
@@ -270,10 +262,9 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			JobID jobId,
 			long checkpointId,
 			long timestamp,
-			Map<JobVertexID, TaskState> taskGroupStates,
+			Map<OperatorID, OperatorState> operatorGroupState,
 			CheckpointProperties props) {
-
-			super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, null, props);
+			super(jobId, checkpointId, timestamp, Long.MAX_VALUE, operatorGroupState, null, props, null, null);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 652cc76..589ff46 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.junit.Rule;
@@ -37,7 +37,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -55,9 +54,9 @@ public class CompletedCheckpointTest {
 		File file = tmpFolder.newFile();
 		assertEquals(true, file.exists());
 
-		TaskState state = mock(TaskState.class);
-		Map<JobVertexID, TaskState> taskStates = new HashMap<>();
-		taskStates.put(new JobVertexID(), state);
+		OperatorState state = mock(OperatorState.class);
+		Map<OperatorID, OperatorState> taskStates = new HashMap<>();
+		taskStates.put(new OperatorID(), state);
 
 		// Verify discard call is forwarded to state
 		CompletedCheckpoint checkpoint = new CompletedCheckpoint(
@@ -78,18 +77,20 @@ public class CompletedCheckpointTest {
 	 */
 	@Test
 	public void testCleanUpOnSubsume() throws Exception {
-		TaskState state = mock(TaskState.class);
-		Map<JobVertexID, TaskState> taskStates = new HashMap<>();
-		taskStates.put(new JobVertexID(), state);
+		OperatorState state = mock(OperatorState.class);
+		Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
+		operatorStates.put(new OperatorID(), state);
 
 		boolean discardSubsumed = true;
 		CheckpointProperties props = new CheckpointProperties(false, false, discardSubsumed, true, true, true, true);
 		
 		CompletedCheckpoint checkpoint = new CompletedCheckpoint(
 				new JobID(), 0, 0, 1,
-				taskStates,
+				operatorStates,
 				Collections.<MasterState>emptyList(),
-				props);
+				props,
+				null,
+				null);
 
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		checkpoint.registerSharedStates(sharedStateRegistry);
@@ -114,9 +115,9 @@ public class CompletedCheckpointTest {
 				JobStatus.FINISHED, JobStatus.CANCELED, JobStatus.FAILED, JobStatus.SUSPENDED
 		};
 
-		TaskState state = mock(TaskState.class);
-		Map<JobVertexID, TaskState> taskStates = new HashMap<>();
-		taskStates.put(new JobVertexID(), state);
+		OperatorState state = mock(OperatorState.class);
+		Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
+		operatorStates.put(new OperatorID(), state);
 
 		for (JobStatus status : terminalStates) {
 			Mockito.reset(state);
@@ -125,7 +126,7 @@ public class CompletedCheckpointTest {
 			CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
 			CompletedCheckpoint checkpoint = new CompletedCheckpoint(
 					new JobID(), 0, 0, 1,
-					new HashMap<>(taskStates),
+					new HashMap<>(operatorStates),
 					Collections.<MasterState>emptyList(),
 					props,
 					new FileStateHandle(new Path(file.toURI()), file.length()),
@@ -143,9 +144,11 @@ public class CompletedCheckpointTest {
 			props = new CheckpointProperties(false, false, true, true, true, true, true);
 			checkpoint = new CompletedCheckpoint(
 					new JobID(), 0, 0, 1,
-					new HashMap<>(taskStates),
+					new HashMap<>(operatorStates),
 					Collections.<MasterState>emptyList(),
-					props);
+					props,
+					null,
+					null);
 
 			checkpoint.discardOnShutdown(status, sharedStateRegistry);
 			verify(state, times(1)).discardState();
@@ -158,18 +161,20 @@ public class CompletedCheckpointTest {
 	 */
 	@Test
 	public void testCompletedCheckpointStatsCallbacks() throws Exception {
-		TaskState state = mock(TaskState.class);
-		Map<JobVertexID, TaskState> taskStates = new HashMap<>();
-		taskStates.put(new JobVertexID(), state);
+		OperatorState state = mock(OperatorState.class);
+		Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
+		operatorStates.put(new OperatorID(), state);
 
 		CompletedCheckpoint completed = new CompletedCheckpoint(
 			new JobID(),
 			0,
 			0,
 			1,
-			new HashMap<>(taskStates),
+			new HashMap<>(operatorStates),
 			Collections.<MasterState>emptyList(),
-			CheckpointProperties.forStandardCheckpoint());
+			CheckpointProperties.forStandardCheckpoint(),
+			null,
+			null);
 
 		CompletedCheckpointStats.DiscardCallback callback = mock(CompletedCheckpointStats.DiscardCallback.class);
 		completed.setDiscardCallback(callback);

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 2dd1803..6df01a0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -22,15 +22,15 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.SharedStateHandle;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
-import org.mockito.Mock;
 import org.mockito.Mockito;
 
 import java.io.File;
@@ -50,9 +50,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.when;
@@ -63,9 +61,13 @@ public class PendingCheckpointTest {
 	private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();
 
 	static {
+		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+		when(jobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(new OperatorID()));
+		
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		when(vertex.getMaxParallelism()).thenReturn(128);
 		when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(1);
+		when(vertex.getJobVertex()).thenReturn(jobVertex);
 		ACK_TASKS.put(ATTEMPT_ID, vertex);
 	}
 
@@ -193,7 +195,7 @@ public class PendingCheckpointTest {
 		CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
 		QueueExecutor executor = new QueueExecutor();
 
-		TaskState state = mock(TaskState.class);
+		OperatorState state = mock(OperatorState.class);
 		doNothing().when(state).registerSharedStates(any(SharedStateRegistry.class));
 		doNothing().when(state).unregisterSharedStates(any(SharedStateRegistry.class));
 
@@ -311,7 +313,7 @@ public class PendingCheckpointTest {
 	public void testNullSubtaskStateLeadsToStatelessTask() throws Exception {
 		PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
 		pending.acknowledgeTask(ATTEMPT_ID, null, mock(CheckpointMetrics.class));
-		Assert.assertTrue(pending.getTaskStates().isEmpty());
+		Assert.assertTrue(pending.getOperatorStates().isEmpty());
 	}
 
 	/**
@@ -324,7 +326,7 @@ public class PendingCheckpointTest {
 	public void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception {
 		PendingCheckpoint pending = createPendingCheckpoint(CheckpointProperties.forStandardCheckpoint(), null);
 		pending.acknowledgeTask(ATTEMPT_ID, mock(SubtaskState.class), mock(CheckpointMetrics.class));
-		Assert.assertFalse(pending.getTaskStates().isEmpty());
+		Assert.assertFalse(pending.getOperatorStates().isEmpty());
 	}
 
 	@Test
@@ -367,12 +369,12 @@ public class PendingCheckpointTest {
 	}
 
 	@SuppressWarnings("unchecked")
-	static void setTaskState(PendingCheckpoint pending, TaskState state) throws NoSuchFieldException, IllegalAccessException {
-		Field field = PendingCheckpoint.class.getDeclaredField("taskStates");
+	static void setTaskState(PendingCheckpoint pending, OperatorState state) throws NoSuchFieldException, IllegalAccessException {
+		Field field = PendingCheckpoint.class.getDeclaredField("operatorStates");
 		field.setAccessible(true);
-		Map<JobVertexID, TaskState> taskStates = (Map<JobVertexID, TaskState>) field.get(pending);
+		Map<OperatorID, OperatorState> taskStates = (Map<OperatorID, OperatorState>) field.get(pending);
 
-		taskStates.put(new JobVertexID(), state);
+		taskStates.put(new OperatorID(), state);
 	}
 
 	private static final class QueueExecutor implements Executor {

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index 64aeeba..be94762 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -54,16 +54,16 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 	public void testShutdownDiscardsCheckpoints() throws Exception {
 		AbstractCompletedCheckpointStore store = createCompletedCheckpoints(1);
 		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
-		Collection<TaskState> taskStates = checkpoint.getTaskStates().values();
+		Collection<OperatorState> operatorStates = checkpoint.getOperatorStates().values();
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
-		verifyCheckpointRegistered(taskStates, store.sharedStateRegistry);
+		verifyCheckpointRegistered(operatorStates, store.sharedStateRegistry);
 
 		store.shutdown(JobStatus.FINISHED);
 		assertEquals(0, store.getNumberOfRetainedCheckpoints());
 		assertTrue(checkpoint.isDiscarded());
-		verifyCheckpointDiscarded(taskStates);
+		verifyCheckpointDiscarded(operatorStates);
 	}
 
 	/**
@@ -74,7 +74,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 	public void testSuspendDiscardsCheckpoints() throws Exception {
 		AbstractCompletedCheckpointStore store = createCompletedCheckpoints(1);
 		TestCompletedCheckpoint checkpoint = createCheckpoint(0);
-		Collection<TaskState> taskStates = checkpoint.getTaskStates().values();
+		Collection<OperatorState> taskStates = checkpoint.getOperatorStates().values();
 
 		store.addCheckpoint(checkpoint);
 		assertEquals(1, store.getNumberOfRetainedCheckpoints());
@@ -99,7 +99,7 @@ public class StandaloneCompletedCheckpointStoreTest extends CompletedCheckpointS
 		for (long i = 0; i <= numCheckpointsToRetain; ++i) {
 			CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
 			doReturn(i).when(checkpointToAdd).getCheckpointID();
-			doReturn(Collections.emptyMap()).when(checkpointToAdd).getTaskStates();
+			doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
 			doThrow(new IOException()).when(checkpointToAdd).discardOnSubsume(any(SharedStateRegistry.class));
 			
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 73fcf78..73e0ed9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -90,17 +90,17 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		checkpoints.addCheckpoint(expected[1]);
 		checkpoints.addCheckpoint(expected[2]);
 
-		verifyCheckpointRegistered(expected[0].getTaskStates().values(), checkpoints.sharedStateRegistry);
-		verifyCheckpointRegistered(expected[1].getTaskStates().values(), checkpoints.sharedStateRegistry);
-		verifyCheckpointRegistered(expected[2].getTaskStates().values(), checkpoints.sharedStateRegistry);
+		verifyCheckpointRegistered(expected[0].getOperatorStates().values(), checkpoints.sharedStateRegistry);
+		verifyCheckpointRegistered(expected[1].getOperatorStates().values(), checkpoints.sharedStateRegistry);
+		verifyCheckpointRegistered(expected[2].getOperatorStates().values(), checkpoints.sharedStateRegistry);
 
 		// All three should be in ZK
 		assertEquals(3, ZooKeeper.getClient().getChildren().forPath(CheckpointsPath).size());
 		assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
-		resetCheckpoint(expected[0].getTaskStates().values());
-		resetCheckpoint(expected[1].getTaskStates().values());
-		resetCheckpoint(expected[2].getTaskStates().values());
+		resetCheckpoint(expected[0].getOperatorStates().values());
+		resetCheckpoint(expected[1].getOperatorStates().values());
+		resetCheckpoint(expected[2].getOperatorStates().values());
 
 		// Recover TODO!!! clear registry!
 		checkpoints.recover();
@@ -121,7 +121,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint
 		assertEquals(expectedCheckpoints, actualCheckpoints);
 
 		for (CompletedCheckpoint actualCheckpoint : actualCheckpoints) {
-			verifyCheckpointRegistered(actualCheckpoint.getTaskStates().values(), checkpoints.sharedStateRegistry);
+			verifyCheckpointRegistered(actualCheckpoint.getOperatorStates().values(), checkpoints.sharedStateRegistry);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 66ef232..8fc0f02 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -227,7 +227,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
 		for (long i = 0; i <= numCheckpointsToRetain; ++i) {
 			CompletedCheckpoint checkpointToAdd = mock(CompletedCheckpoint.class);
 			doReturn(i).when(checkpointToAdd).getCheckpointID();
-			doReturn(Collections.emptyMap()).when(checkpointToAdd).getTaskStates();
+			doReturn(Collections.emptyMap()).when(checkpointToAdd).getOperatorStates();
 			
 			try {
 				zooKeeperCompletedCheckpointStore.addCheckpoint(checkpointToAdd);

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
index 7d9874e..ba77dbc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -49,6 +52,94 @@ import static org.junit.Assert.assertEquals;
 public class CheckpointTestUtils {
 
 	/**
+	 * Creates a random collection of OperatorState objects containing various types of state handles.
+	 */
+	public static Collection<OperatorState> createOperatorStates(int numTaskStates, int numSubtasksPerTask) {
+		return createOperatorStates(new Random(), numTaskStates, numSubtasksPerTask);
+	}
+
+	/**
+	 * Creates a random collection of OperatorState objects containing various types of state handles.
+	 */
+	public static Collection<OperatorState> createOperatorStates(
+			Random random,
+			int numTaskStates,
+			int numSubtasksPerTask) {
+
+		List<OperatorState> taskStates = new ArrayList<>(numTaskStates);
+
+		for (int stateIdx = 0; stateIdx < numTaskStates; ++stateIdx) {
+
+			OperatorState taskState = new OperatorState(new OperatorID(), numSubtasksPerTask, 128);
+
+			boolean hasNonPartitionableState = random.nextBoolean();
+			boolean hasOperatorStateBackend = random.nextBoolean();
+			boolean hasOperatorStateStream = random.nextBoolean();
+
+			boolean hasKeyedBackend = random.nextInt(4) != 0;
+			boolean hasKeyedStream = random.nextInt(4) != 0;
+
+			for (int subtaskIdx = 0; subtaskIdx < numSubtasksPerTask; subtaskIdx++) {
+
+				StreamStateHandle nonPartitionableState = null;
+				StreamStateHandle operatorStateBackend =
+					new TestByteStreamStateHandleDeepCompare("b", ("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET));
+				StreamStateHandle operatorStateStream =
+					new TestByteStreamStateHandleDeepCompare("b", ("Beautiful").getBytes(ConfigConstants.DEFAULT_CHARSET));
+
+				OperatorStateHandle operatorStateHandleBackend = null;
+				OperatorStateHandle operatorStateHandleStream = null;
+				
+				Map<String, StateMetaInfo> offsetsMap = new HashMap<>();
+				offsetsMap.put("A", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+				offsetsMap.put("B", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+				offsetsMap.put("C", new OperatorStateHandle.StateMetaInfo(new long[]{60, 70, 80}, OperatorStateHandle.Mode.BROADCAST));
+
+				if (hasNonPartitionableState) {
+					nonPartitionableState =
+						new TestByteStreamStateHandleDeepCompare("a", ("Hi").getBytes(ConfigConstants.DEFAULT_CHARSET));
+				}
+
+				if (hasOperatorStateBackend) {
+					operatorStateHandleBackend = new OperatorStateHandle(offsetsMap, operatorStateBackend);
+				}
+
+				if (hasOperatorStateStream) {
+					operatorStateHandleStream = new OperatorStateHandle(offsetsMap, operatorStateStream);
+				}
+
+				KeyGroupsStateHandle keyedStateBackend = null;
+				KeyGroupsStateHandle keyedStateStream = null;
+
+				if (hasKeyedBackend) {
+					keyedStateBackend = new KeyGroupsStateHandle(
+							new KeyGroupRangeOffsets(1, 1, new long[]{42}),
+							new TestByteStreamStateHandleDeepCompare("c", "Hello"
+									.getBytes(ConfigConstants.DEFAULT_CHARSET)));
+				}
+
+				if (hasKeyedStream) {
+					keyedStateStream = new KeyGroupsStateHandle(
+							new KeyGroupRangeOffsets(1, 1, new long[]{23}),
+							new TestByteStreamStateHandleDeepCompare("d", "World"
+									.getBytes(ConfigConstants.DEFAULT_CHARSET)));
+				}
+
+				taskState.putState(subtaskIdx, new OperatorSubtaskState(
+						nonPartitionableState,
+						operatorStateHandleBackend,
+						operatorStateHandleStream,
+						keyedStateStream,
+						keyedStateBackend));
+			}
+
+			taskStates.add(taskState);
+		}
+
+		return taskStates;
+	}
+
+	/**
 	 * Creates a random collection of TaskState objects containing various types of state handles.
 	 */
 	public static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtasksPerTask) {
@@ -88,7 +179,7 @@ public class CheckpointTestUtils {
 
 					StreamStateHandle nonPartitionableState =
 							new TestByteStreamStateHandleDeepCompare("a-" + chainIdx, ("Hi-" + chainIdx).getBytes(
-									ConfigConstants.DEFAULT_CHARSET));
+								ConfigConstants.DEFAULT_CHARSET));
 					StreamStateHandle operatorStateBackend =
 							new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
 					StreamStateHandle operatorStateStream =
@@ -122,14 +213,14 @@ public class CheckpointTestUtils {
 					keyedStateBackend = new KeyGroupsStateHandle(
 							new KeyGroupRangeOffsets(1, 1, new long[]{42}),
 							new TestByteStreamStateHandleDeepCompare("c", "Hello"
-									.getBytes(ConfigConstants.DEFAULT_CHARSET)));
+								.getBytes(ConfigConstants.DEFAULT_CHARSET)));
 				}
 
 				if (hasKeyedStream) {
 					keyedStateStream = new KeyGroupsStateHandle(
 							new KeyGroupRangeOffsets(1, 1, new long[]{23}),
 							new TestByteStreamStateHandleDeepCompare("d", "World"
-									.getBytes(ConfigConstants.DEFAULT_CHARSET)));
+								.getBytes(ConfigConstants.DEFAULT_CHARSET)));
 				}
 
 				taskState.putState(subtaskIdx, new SubtaskState(

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 20b1e57..331621d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -20,14 +20,17 @@ package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -53,29 +56,30 @@ public class SavepointLoaderTest {
 
 		int parallelism = 128128;
 		long checkpointId = Integer.MAX_VALUE + 123123L;
-		JobVertexID vertexId = new JobVertexID();
+		JobVertexID jobVertexID = new JobVertexID();
+		OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
 
-		TaskState state = mock(TaskState.class);
+		OperatorState state = mock(OperatorState.class);
 		when(state.getParallelism()).thenReturn(parallelism);
-		when(state.getJobVertexID()).thenReturn(vertexId);
+		when(state.getOperatorID()).thenReturn(operatorID);
 		when(state.getMaxParallelism()).thenReturn(parallelism);
-		when(state.getChainLength()).thenReturn(1);
 
-		Map<JobVertexID, TaskState> taskStates = new HashMap<>();
-		taskStates.put(vertexId, state);
+		Map<OperatorID, OperatorState> taskStates = new HashMap<>();
+		taskStates.put(operatorID, state);
 
 		JobID jobId = new JobID();
 
 		// Store savepoint
-		SavepointV2 savepoint = new SavepointV2(checkpointId, taskStates.values());
+		SavepointV2 savepoint = new SavepointV2(checkpointId, taskStates.values(), Collections.<MasterState>emptyList());
 		String path = SavepointStore.storeSavepoint(tmp.getAbsolutePath(), savepoint);
 
 		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
 		when(vertex.getParallelism()).thenReturn(parallelism);
 		when(vertex.getMaxParallelism()).thenReturn(parallelism);
+		when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(operatorID));
 
 		Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
-		tasks.put(vertexId, vertex);
+		tasks.put(jobVertexID, vertex);
 
 		ClassLoader ucl = Thread.currentThread().getContextClassLoader();
 
@@ -97,7 +101,7 @@ public class SavepointLoaderTest {
 		}
 
 		// 3) Load and validate: missing vertex
-		assertNotNull(tasks.remove(vertexId));
+		assertNotNull(tasks.remove(jobVertexID));
 
 		try {
 			SavepointLoader.loadAndValidateSavepoint(jobId, tasks, path, ucl, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
index cf79282..391102c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.junit.Rule;
@@ -69,7 +70,10 @@ public class SavepointStoreTest {
 
 		// Store
 		String savepointDirectory = SavepointStore.createSavepointDirectory(root, new JobID());
-		SavepointV2 stored = new SavepointV2(1929292, CheckpointTestUtils.createTaskStates(4, 24));
+		SavepointV2 stored = new SavepointV2(
+			1929292,
+			CheckpointTestUtils.createOperatorStates(4, 24),
+			Collections.<MasterState>emptyList());
 		String path = SavepointStore.storeSavepoint(savepointDirectory, stored);
 
 		list = rootFile.listFiles();
@@ -80,7 +84,7 @@ public class SavepointStoreTest {
 		Savepoint loaded = SavepointStore.loadSavepoint(path, Thread.currentThread().getContextClassLoader());
 
 		assertEquals(stored.getCheckpointId(), loaded.getCheckpointId());
-		assertEquals(stored.getTaskStates(), loaded.getTaskStates());
+		assertEquals(stored.getOperatorStates(), loaded.getOperatorStates());
 		assertEquals(stored.getMasterStates(), loaded.getMasterStates());
 
 		loaded.dispose();
@@ -147,7 +151,10 @@ public class SavepointStoreTest {
 
 		// Savepoint v0
 		String savepointDirectory2 = SavepointStore.createSavepointDirectory(root, new JobID());
-		SavepointV2 savepoint = new SavepointV2(checkpointId, CheckpointTestUtils.createTaskStates(4, 32));
+		SavepointV2 savepoint = new SavepointV2(
+			checkpointId,
+			CheckpointTestUtils.createOperatorStates(4, 32),
+			Collections.<MasterState>emptyList());
 		String pathSavepoint = SavepointStore.storeSavepoint(savepointDirectory2, savepoint);
 
 		list = rootFile.listFiles();
@@ -205,7 +212,10 @@ public class SavepointStoreTest {
 		FileSystem fs = FileSystem.get(new Path(root).toUri());
 
 		// Store
-		SavepointV2 savepoint = new SavepointV2(1929292, CheckpointTestUtils.createTaskStates(4, 24));
+		SavepointV2 savepoint = new SavepointV2(
+			1929292,
+			CheckpointTestUtils.createOperatorStates(4, 24),
+			Collections.<MasterState>emptyList());
 
 		FileStateHandle store1 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint);
 		fs.exists(store1.getFilePath());
@@ -266,6 +276,11 @@ public class SavepointStoreTest {
 		}
 
 		@Override
+		public Collection<OperatorState> getOperatorStates() {
+			return null;
+		}
+
+		@Override
 		public void dispose() {
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
index deb14dd..154d761 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
 
 import org.junit.Test;
 
@@ -48,7 +48,7 @@ public class SavepointV2SerializerTest {
 
 		for (int i = 0; i < 100; ++i) {
 			final long checkpointId = rnd.nextLong() & 0x7fffffffffffffffL;
-			final Collection<TaskState> taskStates = Collections.emptyList();
+			final Collection<OperatorState> taskStates = Collections.emptyList();
 			final Collection<MasterState> masterStates = Collections.emptyList();
 
 			testCheckpointSerialization(checkpointId, taskStates, masterStates);
@@ -63,13 +63,13 @@ public class SavepointV2SerializerTest {
 		for (int i = 0; i < 100; ++i) {
 			final long checkpointId = rnd.nextLong() & 0x7fffffffffffffffL;
 
-			final Collection<TaskState> taskStates = Collections.emptyList();
+			final Collection<OperatorState> operatorStates = Collections.emptyList();
 
 			final int numMasterStates = rnd.nextInt(maxNumMasterStates) + 1;
 			final Collection<MasterState> masterStates = 
 					CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
 
-			testCheckpointSerialization(checkpointId, taskStates, masterStates);
+			testCheckpointSerialization(checkpointId, operatorStates, masterStates);
 		}
 	}
 
@@ -84,8 +84,8 @@ public class SavepointV2SerializerTest {
 
 			final int numTasks = rnd.nextInt(maxTaskStates) + 1;
 			final int numSubtasks = rnd.nextInt(maxNumSubtasks) + 1;
-			final Collection<TaskState> taskStates = 
-					CheckpointTestUtils.createTaskStates(rnd, numTasks, numSubtasks);
+			final Collection<OperatorState> taskStates = 
+					CheckpointTestUtils.createOperatorStates(rnd, numTasks, numSubtasks);
 
 			final Collection<MasterState> masterStates = Collections.emptyList();
 
@@ -106,8 +106,8 @@ public class SavepointV2SerializerTest {
 
 			final int numTasks = rnd.nextInt(maxTaskStates) + 1;
 			final int numSubtasks = rnd.nextInt(maxNumSubtasks) + 1;
-			final Collection<TaskState> taskStates =
-					CheckpointTestUtils.createTaskStates(rnd, numTasks, numSubtasks);
+			final Collection<OperatorState> taskStates =
+					CheckpointTestUtils.createOperatorStates(rnd, numTasks, numSubtasks);
 
 			final int numMasterStates = rnd.nextInt(maxNumMasterStates) + 1;
 			final Collection<MasterState> masterStates =
@@ -119,7 +119,7 @@ public class SavepointV2SerializerTest {
 
 	private void testCheckpointSerialization(
 			long checkpointId,
-			Collection<TaskState> taskStates,
+			Collection<OperatorState> operatorStates,
 			Collection<MasterState> masterStates) throws IOException {
 
 		SavepointV2Serializer serializer = SavepointV2Serializer.INSTANCE;
@@ -127,7 +127,7 @@ public class SavepointV2SerializerTest {
 		ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
 		DataOutputStream out = new DataOutputViewStreamWrapper(baos);
 
-		serializer.serialize(new SavepointV2(checkpointId, taskStates, masterStates), out);
+		serializer.serialize(new SavepointV2(checkpointId, operatorStates, masterStates), out);
 		out.close();
 
 		byte[] bytes = baos.toByteArray();
@@ -136,7 +136,7 @@ public class SavepointV2SerializerTest {
 		SavepointV2 deserialized = serializer.deserialize(in, getClass().getClassLoader());
 
 		assertEquals(checkpointId, deserialized.getCheckpointId());
-		assertEquals(taskStates, deserialized.getTaskStates());
+		assertEquals(operatorStates, deserialized.getOperatorStates());
 
 		assertEquals(masterStates.size(), deserialized.getMasterStates().size());
 		for (Iterator<MasterState> a = masterStates.iterator(), b = deserialized.getMasterStates().iterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Test.java
index 428a62a..6b6a6d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Test.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Test.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.runtime.checkpoint.MasterState;
-import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.checkpoint.OperatorState;
 
 import org.junit.Test;
 
@@ -36,7 +36,7 @@ public class SavepointV2Test {
 	 * Simple test of savepoint methods.
 	 */
 	@Test
-	public void testSavepointV1() throws Exception {
+	public void testSavepointV2() throws Exception {
 		final Random rnd = new Random();
 
 		final long checkpointId = rnd.nextInt(Integer.MAX_VALUE) + 1;
@@ -44,8 +44,8 @@ public class SavepointV2Test {
 		final int numSubtaskStates = 16;
 		final int numMasterStates = 7;
 
-		Collection<TaskState> taskStates =
-				CheckpointTestUtils.createTaskStates(rnd, numTaskStates, numSubtaskStates);
+		Collection<OperatorState> taskStates =
+				CheckpointTestUtils.createOperatorStates(rnd, numTaskStates, numSubtaskStates);
 
 		Collection<MasterState> masterStates =
 				CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
@@ -54,15 +54,15 @@ public class SavepointV2Test {
 
 		assertEquals(2, checkpoint.getVersion());
 		assertEquals(checkpointId, checkpoint.getCheckpointId());
-		assertEquals(taskStates, checkpoint.getTaskStates());
+		assertEquals(taskStates, checkpoint.getOperatorStates());
 		assertEquals(masterStates, checkpoint.getMasterStates());
 
-		assertFalse(checkpoint.getTaskStates().isEmpty());
+		assertFalse(checkpoint.getOperatorStates().isEmpty());
 		assertFalse(checkpoint.getMasterStates().isEmpty());
 
 		checkpoint.dispose();
 
-		assertTrue(checkpoint.getTaskStates().isEmpty());
+		assertTrue(checkpoint.getOperatorStates().isEmpty());
 		assertTrue(checkpoint.getMasterStates().isEmpty());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
index 89db3a1..b5a67fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/LegacyJobVertexIdTest.java
@@ -25,11 +25,13 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.util.SerializedValue;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
@@ -46,7 +48,7 @@ public class LegacyJobVertexIdTest {
 		JobVertexID legacyId1 = new JobVertexID();
 		JobVertexID legacyId2 = new JobVertexID();
 
-		JobVertex jobVertex = new JobVertex("test", defaultId, Arrays.asList(legacyId1, legacyId2));
+		JobVertex jobVertex = new JobVertex("test", defaultId, Arrays.asList(legacyId1, legacyId2), new ArrayList<OperatorID>(), new ArrayList<OperatorID>());
 		jobVertex.setInvokableClass(AbstractInvokable.class);
 
 		ExecutionGraph executionGraph = new ExecutionGraph(

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index a932c18..11a03cc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -76,7 +76,7 @@ public class RecoverableCompletedCheckpointStore extends AbstractCompletedCheckp
 			suspended.clear();
 
 			for (CompletedCheckpoint checkpoint : checkpoints) {
-				sharedStateRegistry.unregisterAll(checkpoint.getTaskStates().values());
+				sharedStateRegistry.unregisterAll(checkpoint.getOperatorStates().values());
 				suspended.add(checkpoint);
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
index 2fbfe1c..f468c93 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java
@@ -153,19 +153,6 @@ public class StreamGraphHasherV1 implements StreamGraphHasher {
 
 			return true;
 		} else {
-			// Check that this node is not part of a chain. This is currently
-			// not supported, because the runtime takes the snapshots by the
-			// operator ID of the first vertex in a chain. It's OK if the node
-			// has chained outputs.
-			for (StreamEdge inEdge : node.getInEdges()) {
-				if (isChainable(inEdge, isChainingEnabled)) {
-					throw new UnsupportedOperationException("Cannot assign user-specified hash "
-							+ "to intermediate node in chain. This will be supported in future "
-							+ "versions of Flink. As a work around start new chain at task "
-							+ node.getOperatorName() + ".");
-				}
-			}
-
 			Hasher hasher = hashFunction.newHasher();
 			byte[] hash = generateUserSpecifiedHash(node, hasher);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
index 3772e58..7c2416e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java
@@ -169,19 +169,6 @@ public class StreamGraphHasherV2 implements StreamGraphHasher {
 
 			return true;
 		} else {
-			// Check that this node is not part of a chain. This is currently
-			// not supported, because the runtime takes the snapshots by the
-			// operator ID of the first vertex in a chain. It's OK if the node
-			// has chained outputs.
-			for (StreamEdge inEdge : node.getInEdges()) {
-				if (isChainable(inEdge, isChainingEnabled)) {
-					throw new UnsupportedOperationException("Cannot assign user-specified hash "
-							+ "to intermediate node in chain. This will be supported in future "
-							+ "versions of Flink. As a work around start new chain at task "
-							+ node.getOperatorName() + ".");
-				}
-			}
-
 			Hasher hasher = hashFunction.newHasher();
 			byte[] hash = generateUserSpecifiedHash(node, hasher);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f7980a7e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
index c1750a1..8a8c8b0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphUserHashHasher.java
@@ -38,15 +38,6 @@ public class StreamGraphUserHashHasher implements StreamGraphHasher {
 			String userHash = streamNode.getUserHash();
 
 			if (null != userHash) {
-				for (StreamEdge inEdge : streamNode.getInEdges()) {
-					if (StreamingJobGraphGenerator.isChainable(inEdge, streamGraph)) {
-						throw new UnsupportedOperationException("Cannot assign user-specified hash "
-								+ "to intermediate node in chain. This will be supported in future "
-								+ "versions of Flink. As a work around start new chain at task "
-								+ streamNode.getOperatorName() + ".");
-					}
-				}
-
 				hashResult.put(streamNode.getId(), StringUtils.hexStringToByte(userHash));
 			}
 		}


Mime
View raw message