flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From srich...@apache.org
Subject [1/6] flink git commit: [FLINK-7719] [checkpoints] Send checkpoint id to task as part of deployment descriptor when resuming
Date Mon, 22 Jan 2018 13:08:45 GMT
Repository: flink
Updated Branches:
  refs/heads/master 2bdd16e05 -> 517b3f872


[FLINK-7719] [checkpoints] Send checkpoint id to task as part of deployment descriptor when
resuming


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/402a2e30
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/402a2e30
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/402a2e30

Branch: refs/heads/master
Commit: 402a2e30c750e1bcb753643ed66c6df0dd861112
Parents: 2bdd16e
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Mon Aug 21 14:31:38 2017 +0200
Committer: Stefan Richter <s.richter@data-artisans.com>
Committed: Mon Jan 22 14:06:17 2018 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |  2 +-
 .../checkpoint/JobManagerTaskRestore.java       | 51 +++++++++++++
 .../checkpoint/StateAssignmentOperation.java    | 15 ++--
 .../deployment/TaskDeploymentDescriptor.java    | 36 ++++-----
 .../flink/runtime/executiongraph/Execution.java | 18 ++---
 .../runtime/executiongraph/ExecutionVertex.java | 10 ++-
 .../runtime/taskexecutor/TaskExecutor.java      |  2 +-
 .../apache/flink/runtime/taskmanager/Task.java  | 78 ++++++--------------
 .../flink/runtime/taskmanager/TaskManager.scala |  2 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 12 ++-
 .../checkpoint/CheckpointStateRestoreTest.java  | 11 +--
 .../TaskDeploymentDescriptorTest.java           |  9 ++-
 .../ExecutionVertexLocalityTest.java            |  4 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  2 +-
 .../streaming/runtime/tasks/StreamTask.java     |  1 -
 .../tasks/InterruptSensitiveRestoreTest.java    |  6 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  3 +-
 17 files changed, 150 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 95ca5d7..81ba10c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1057,7 +1057,7 @@ public class CheckpointCoordinator {
 			final Map<OperatorID, OperatorState> operatorStates = latest.getOperatorStates();
 
 			StateAssignmentOperation stateAssignmentOperation =
-					new StateAssignmentOperation(tasks, operatorStates, allowNonRestoredState);
+					new StateAssignmentOperation(latest.getCheckpointID(), tasks, operatorStates, allowNonRestoredState);
 
 			stateAssignmentOperation.assignStates();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java
new file mode 100644
index 0000000..9f027e8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/JobManagerTaskRestore.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import java.io.Serializable;
+
+public class JobManagerTaskRestore implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final long restoreCheckpointId;
+
+	private final TaskStateSnapshot taskStateSnapshot;
+
+	public JobManagerTaskRestore(long restoreCheckpointId, TaskStateSnapshot taskStateSnapshot)
{
+		this.restoreCheckpointId = restoreCheckpointId;
+		this.taskStateSnapshot = taskStateSnapshot;
+	}
+
+	public long getRestoreCheckpointId() {
+		return restoreCheckpointId;
+	}
+
+	public TaskStateSnapshot getTaskStateSnapshot() {
+		return taskStateSnapshot;
+	}
+
+	@Override
+	public String toString() {
+		return "TaskRestore{" +
+			"restoreCheckpointId=" + restoreCheckpointId +
+			", taskStateSnapshot=" + taskStateSnapshot +
+			'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index d80311c..e108bad 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -55,13 +55,17 @@ public class StateAssignmentOperation {
 
 	private final Map<JobVertexID, ExecutionJobVertex> tasks;
 	private final Map<OperatorID, OperatorState> operatorStates;
+
+	private final long restoreCheckpointId;
 	private final boolean allowNonRestoredState;
 
 	public StateAssignmentOperation(
-			Map<JobVertexID, ExecutionJobVertex> tasks,
-			Map<OperatorID, OperatorState> operatorStates,
-			boolean allowNonRestoredState) {
+		long restoreCheckpointId,
+		Map<JobVertexID, ExecutionJobVertex> tasks,
+		Map<OperatorID, OperatorState> operatorStates,
+		boolean allowNonRestoredState) {
 
+		this.restoreCheckpointId = restoreCheckpointId;
 		this.tasks = Preconditions.checkNotNull(tasks);
 		this.operatorStates = Preconditions.checkNotNull(operatorStates);
 		this.allowNonRestoredState = allowNonRestoredState;
@@ -214,7 +218,8 @@ public class StateAssignmentOperation {
 			}
 
 			if (!statelessTask) {
-				currentExecutionAttempt.setInitialState(taskState);
+				JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(restoreCheckpointId, taskState);
+				currentExecutionAttempt.setInitialState(taskRestore);
 			}
 		}
 	}
@@ -230,7 +235,7 @@ public class StateAssignmentOperation {
 			!subRawOperatorState.containsKey(instanceID) &&
 			!subManagedKeyedState.containsKey(instanceID) &&
 			!subRawKeyedState.containsKey(instanceID)) {
-			
+
 			return new OperatorSubtaskState();
 		}
 		if (!subManagedKeyedState.containsKey(instanceID)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 0c7e308..4f5b231 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.deployment;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.blob.PermanentBlobService;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
@@ -142,21 +142,22 @@ public final class TaskDeploymentDescriptor implements Serializable
{
 	/** Slot number to run the sub task in on the target machine. */
 	private final int targetSlotNumber;
 
-	/** State handles for the sub task. */
-	private final TaskStateSnapshot taskStateHandles;
+	/** Information to restore the task. This can be null if there is no state to restore. */
+	@Nullable
+	private final JobManagerTaskRestore taskRestore;
 
 	public TaskDeploymentDescriptor(
-			JobID jobId,
-			MaybeOffloaded<JobInformation> serializedJobInformation,
-			MaybeOffloaded<TaskInformation> serializedTaskInformation,
-			ExecutionAttemptID executionAttemptId,
-			AllocationID allocationId,
-			int subtaskIndex,
-			int attemptNumber,
-			int targetSlotNumber,
-			TaskStateSnapshot taskStateHandles,
-			Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
-			Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors) {
+		JobID jobId,
+		MaybeOffloaded<JobInformation> serializedJobInformation,
+		MaybeOffloaded<TaskInformation> serializedTaskInformation,
+		ExecutionAttemptID executionAttemptId,
+		AllocationID allocationId,
+		int subtaskIndex,
+		int attemptNumber,
+		int targetSlotNumber,
+		@Nullable JobManagerTaskRestore taskRestore,
+		Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
+		Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors) {
 
 		this.jobId = Preconditions.checkNotNull(jobId);
 
@@ -175,7 +176,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		Preconditions.checkArgument(0 <= targetSlotNumber, "The target slot number must be positive.");
 		this.targetSlotNumber = targetSlotNumber;
 
-		this.taskStateHandles = taskStateHandles;
+		this.taskRestore = taskRestore;
 
 		this.producedPartitions = Preconditions.checkNotNull(resultPartitionDeploymentDescriptors);
 		this.inputGates = Preconditions.checkNotNull(inputGateDeploymentDescriptors);
@@ -263,8 +264,9 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		return inputGates;
 	}
 
-	public TaskStateSnapshot getTaskStateHandles() {
-		return taskStateHandles;
+	@Nullable
+	public JobManagerTaskRestore getTaskRestore() {
+		return taskRestore;
 	}
 
 	public AllocationID getAllocationId() {

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 367d02c..272eed2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -150,8 +150,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private volatile Throwable failureCause;          // once assigned, never changes
 
-	/** The handle to the state that the task gets on restore */
-	private volatile TaskStateSnapshot taskState;
+	/** Information to restore the task on recovery, such as checkpoint id and task state snapshot
*/
+	private volatile JobManagerTaskRestore taskRestore;
 
 	// ------------------------ Accumulators & Metrics ------------------------
 
@@ -316,19 +316,19 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		return state.isTerminal();
 	}
 
-	public TaskStateSnapshot getTaskStateSnapshot() {
-		return taskState;
+	public JobManagerTaskRestore getTaskRestore() {
+		return taskRestore;
 	}
 
 	/**
 	 * Sets the initial state for the execution. The serialized state is then shipped via the
 	 * {@link TaskDeploymentDescriptor} to the TaskManagers.
 	 *
-	 * @param checkpointStateHandles all checkpointed operator state
+	 * @param taskRestore information to restore the state
 	 */
-	public void setInitialState(TaskStateSnapshot checkpointStateHandles) {
+	public void setInitialState(JobManagerTaskRestore taskRestore) {
 		checkState(state == CREATED, "Can only assign operator state when execution attempt is
in CREATED");
-		this.taskState = checkpointStateHandles;
+		this.taskRestore = taskRestore;
 	}
 
 	/**
@@ -530,7 +530,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
 				attemptId,
 				slot,
-				taskState,
+				taskRestore,
 				attemptNumber);
 
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index ef46086..88923fb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -55,6 +55,8 @@ import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -466,7 +468,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	 */
 	public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnState()
{
 		TaskManagerLocation priorLocation;
-		if (currentExecution.getTaskStateSnapshot() != null && (priorLocation = getLatestPriorLocation())
!= null) {
+		if (currentExecution.getTaskRestore() != null && (priorLocation = getLatestPriorLocation())
!= null) {
 			return Collections.singleton(CompletableFuture.completedFuture(priorLocation));
 		}
 		else {
@@ -746,7 +748,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	TaskDeploymentDescriptor createDeploymentDescriptor(
 			ExecutionAttemptID executionId,
 			LogicalSlot targetSlot,
-			TaskStateSnapshot taskStateHandles,
+			@Nullable JobManagerTaskRestore taskRestore,
 			int attemptNumber) throws ExecutionGraphException {
 		
 		// Produced intermediate results
@@ -836,7 +838,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			subTaskIndex,
 			attemptNumber,
 			targetSlot.getPhysicalSlotNumber(),
-			taskStateHandles,
+			taskRestore,
 			producedPartitions,
 			consumedPartitions);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 3c7d1cb..ae024e3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -390,7 +390,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 				tdd.getProducedPartitions(),
 				tdd.getInputGates(),
 				tdd.getTargetSlotNumber(),
-				tdd.getTaskStateHandles(),
+				tdd.getTaskRestore(),
 				memoryManager,
 				ioManager,
 				networkEnvironment,

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 16437d9..fbbcd51 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -35,7 +35,7 @@ import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -172,7 +172,7 @@ public class Task implements Runnable, TaskActions {
 
 	/** Access to task manager configuration and host names*/
 	private final TaskManagerRuntimeInfo taskManagerConfig;
-	
+
 	/** The memory manager to be used by this task */
 	private final MemoryManager memoryManager;
 
@@ -251,12 +251,6 @@ public class Task implements Runnable, TaskActions {
 	/** Serial executor for asynchronous calls (checkpoints, etc), lazily initialized */
 	private volatile ExecutorService asyncCallDispatcher;
 
-	/**
-	 * The handles to the states that the task was initialized with. Will be set
-	 * to null after the initialization, to be memory friendly.
-	 */
-	private volatile TaskStateSnapshot taskStateHandles;
-
 	/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
 	private long taskCancellationInterval;
 
@@ -283,7 +277,7 @@ public class Task implements Runnable, TaskActions {
 		Collection<ResultPartitionDeploymentDescriptor> resultPartitionDeploymentDescriptors,
 		Collection<InputGateDeploymentDescriptor> inputGateDeploymentDescriptors,
 		int targetSlotNumber,
-		TaskStateSnapshot taskStateHandles,
+		JobManagerTaskRestore taskRestore,
 		MemoryManager memManager,
 		IOManager ioManager,
 		NetworkEnvironment networkEnvironment,
@@ -325,7 +319,6 @@ public class Task implements Runnable, TaskActions {
 		this.requiredClasspaths = jobInformation.getRequiredClasspathURLs();
 		this.nameOfInvokableClass = taskInformation.getInvokableClassName();
 		this.serializedExecutionConfig = jobInformation.getSerializedExecutionConfig();
-		this.taskStateHandles = taskStateHandles;
 
 		Configuration tmConfig = taskManagerConfig.getConfiguration();
 		this.taskCancellationInterval = tmConfig.getLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL);
@@ -678,7 +671,7 @@ public class Task implements Runnable, TaskActions {
 				this);
 
 			// now load and instantiate the task's invokable code
-			invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env,
taskStateHandles);
+			invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass, env);
 
 			// ----------------------------------------------------------------
 			//  actual task core work
@@ -743,7 +736,7 @@ public class Task implements Runnable, TaskActions {
 
 			try {
 				// check if the exception is unrecoverable
-				if (ExceptionUtils.isJvmFatalError(t) || 
+				if (ExceptionUtils.isJvmFatalError(t) ||
 					(t instanceof OutOfMemoryError && taskManagerConfig.shouldExitJvmOnOutOfMemoryError()))
 				{
 					// terminate the JVM immediately
@@ -833,7 +826,7 @@ public class Task implements Runnable, TaskActions {
 				removeCachedFiles(distributedCacheEntries, fileCache);
 
 				// close and de-activate safety net for task thread
-				LOG.info("Ensuring all FileSystem streams are closed for task {}", this); 
+				LOG.info("Ensuring all FileSystem streams are closed for task {}", this);
 				FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
 
 				notifyFinalState();
@@ -943,7 +936,7 @@ public class Task implements Runnable, TaskActions {
 	 * <p>
 	 * This method never blocks.
 	 * </p>
-	 * 
+	 *
 	 * @throws UnsupportedOperationException
 	 *             if the {@link AbstractInvokable} does not implement {@link StoppableTask}
 	 */
@@ -972,7 +965,7 @@ public class Task implements Runnable, TaskActions {
 	 * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
 	 * Otherwise it sets the state to CANCELING, and, if the invokable code is running,
 	 * starts an asynchronous thread that aborts that code.
-	 * 
+	 *
 	 * <p>This method never blocks.</p>
 	 */
 	public void cancelExecution() {
@@ -1136,7 +1129,7 @@ public class Task implements Runnable, TaskActions {
 
 	/**
 	 * Calls the invokable to trigger a checkpoint.
-	 * 
+	 *
 	 * @param checkpointID The ID identifying the checkpoint.
 	 * @param checkpointTimestamp The timestamp associated with the checkpoint.
 	 * @param checkpointOptions Options for performing this checkpoint.
@@ -1365,8 +1358,6 @@ public class Task implements Runnable, TaskActions {
 	 * @param classLoader The classloader to load the class through.
 	 * @param className The name of the class to load.
 	 * @param environment The task environment.
-	 * @param initialState The task's initial state. Null, if the task is either stateless,
or
-	 *                     initialized with empty state.
 	 *
 	 * @return The instantiated invokable task object.
 	 *
@@ -1374,59 +1365,34 @@ public class Task implements Runnable, TaskActions {
 	 *                   Also throws an exception if the task class misses the necessary constructor.
 	 */
 	private static AbstractInvokable loadAndInstantiateInvokable(
-			ClassLoader classLoader,
-			String className,
-			Environment environment,
-			@Nullable TaskStateSnapshot initialState) throws Throwable {
+		ClassLoader classLoader,
+		String className,
+		Environment environment) throws Throwable {
 
 		final Class<? extends AbstractInvokable> invokableClass;
 		try {
 			invokableClass = Class.forName(className, true, classLoader)
-					.asSubclass(AbstractInvokable.class);
-		}
-		catch (Throwable t) {
+				.asSubclass(AbstractInvokable.class);
+		} catch (Throwable t) {
 			throw new Exception("Could not load the task's invokable class.", t);
 		}
 
-		Constructor<? extends AbstractInvokable> statefulCtor = null;
-		Constructor<? extends AbstractInvokable> statelessCtor = null;
+		Constructor<? extends AbstractInvokable> statelessCtor;
 
-		// try to find and call the constructor that accepts state
 		try {
-			//noinspection JavaReflectionMemberAccess
-			statefulCtor = invokableClass.getConstructor(Environment.class, TaskStateSnapshot.class);
-		}
-		catch (NoSuchMethodException e) {
-			if (initialState == null) {
-				// we allow also the constructor that takes no state, as a convenience for stateless
-				// tasks so that they are not forced to carry the stateful constructor
-				try {
-					statelessCtor = invokableClass.getConstructor(Environment.class);
-				}
-				catch (NoSuchMethodException ee) {
-					throw new FlinkException("Task misses proper constructor", ee);
-				}
-			}
-			else {
-				throw new FlinkException("Task has state to restore, but misses the stateful constructor",
e);
-			}
+			statelessCtor = invokableClass.getConstructor(Environment.class);
+		} catch (NoSuchMethodException ee) {
+			throw new FlinkException("Task misses proper constructor", ee);
 		}
 
 		// instantiate the class
 		try {
-			if (statefulCtor != null) {
-				return statefulCtor.newInstance(environment, initialState);
-			}
-			else {
-				//noinspection ConstantConditions  --> cannot happen
-				return statelessCtor.newInstance(environment);
-			}
-		}
-		catch (InvocationTargetException e) {
+			//noinspection ConstantConditions  --> cannot happen
+			return statelessCtor.newInstance(environment);
+		} catch (InvocationTargetException e) {
 			// directly forward exceptions from the eager initialization
 			throw e.getTargetException();
-		}
-		catch (Exception e) {
+		} catch (Exception e) {
 			throw new FlinkException("Could not instantiate the task's invokable class.", e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 9979618..f9cfeea 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -1186,7 +1186,7 @@ class TaskManager(
         tdd.getProducedPartitions,
         tdd.getInputGates,
         tdd.getTargetSlotNumber,
-        tdd.getTaskStateHandles,
+        tdd.getTaskRestore,
         memoryManager,
         ioManager,
         network,

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 9f9659a..a272ce1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2363,7 +2363,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 			KeyGroupsStateHandle originalKeyedStateBackend = generateKeyGroupState(jobVertexID2, newKeyGroupPartitions2.get(i),
false);
 			KeyGroupsStateHandle originalKeyedStateRaw = generateKeyGroupState(jobVertexID2, newKeyGroupPartitions2.get(i),
true);
 
-			TaskStateSnapshot taskStateHandles = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateSnapshot();
+			JobManagerTaskRestore taskRestore = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
+			TaskStateSnapshot taskStateHandles = taskRestore.getTaskStateSnapshot();
 
 			final int headOpIndex = operatorIDs.size() - 1;
 			List<Collection<OperatorStateHandle>> allParallelManagedOpStates = new ArrayList<>(operatorIDs.size());
@@ -2563,7 +2564,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			final List<OperatorID> operatorIds = newJobVertex1.getOperatorIDs();
 
-			TaskStateSnapshot stateSnapshot = newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateSnapshot();
+			JobManagerTaskRestore taskRestore = newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
+			TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
 
 			OperatorSubtaskState headOpState = stateSnapshot.getSubtaskStateByOperatorID(operatorIds.get(operatorIds.size()
- 1));
 			assertTrue(headOpState.getManagedKeyedState().isEmpty());
@@ -2629,7 +2631,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 			final List<OperatorID> operatorIds = newJobVertex2.getOperatorIDs();
 
-			TaskStateSnapshot stateSnapshot = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateSnapshot();
+			JobManagerTaskRestore taskRestore = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
+			TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
 
 			// operator 3
 			{
@@ -3040,7 +3043,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
 
 		for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
 
-			TaskStateSnapshot stateSnapshot = executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateSnapshot();
+			JobManagerTaskRestore taskRestore = executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskRestore();
+			TaskStateSnapshot stateSnapshot = taskRestore.getTaskStateSnapshot();
 
 			OperatorSubtaskState operatorState = stateSnapshot.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID(jobVertexID));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index ae354bb..7348a38 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -142,11 +142,12 @@ public class CheckpointStateRestoreTest {
 
 			// verify that each stateful vertex got the state
 
-			BaseMatcher<TaskStateSnapshot> matcher = new BaseMatcher<TaskStateSnapshot>()
{
+			BaseMatcher<JobManagerTaskRestore> matcher = new BaseMatcher<JobManagerTaskRestore>()
{
 				@Override
 				public boolean matches(Object o) {
-					if (o instanceof TaskStateSnapshot) {
-						return Objects.equals(o, subtaskStates);
+					if (o instanceof JobManagerTaskRestore) {
+						JobManagerTaskRestore taskRestore = (JobManagerTaskRestore) o;
+						return Objects.equals(taskRestore.getTaskStateSnapshot(), subtaskStates);
 					}
 					return false;
 				}
@@ -160,8 +161,8 @@ public class CheckpointStateRestoreTest {
 			verify(statefulExec1, times(1)).setInitialState(Mockito.argThat(matcher));
 			verify(statefulExec2, times(1)).setInitialState(Mockito.argThat(matcher));
 			verify(statefulExec3, times(1)).setInitialState(Mockito.argThat(matcher));
-			verify(statelessExec1, times(0)).setInitialState(Mockito.<TaskStateSnapshot>any());
-			verify(statelessExec2, times(0)).setInitialState(Mockito.<TaskStateSnapshot>any());
+			verify(statelessExec1, times(0)).setInitialState(Mockito.<JobManagerTaskRestore>any());
+			verify(statelessExec2, times(0)).setInitialState(Mockito.<JobManagerTaskRestore>any());
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 3247024..e20d34b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -74,6 +75,7 @@ public class TaskDeploymentDescriptorTest {
 				vertexID, taskName, currentNumberOfSubtasks, numberOfKeyGroups, invokableClass.getName(),
taskConfiguration));
 			final int targetSlotNumber = 47;
 			final TaskStateSnapshot taskStateHandles = new TaskStateSnapshot();
+			final JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(1L, taskStateHandles);
 
 			final TaskDeploymentDescriptor orig = new TaskDeploymentDescriptor(
 				jobID,
@@ -84,7 +86,7 @@ public class TaskDeploymentDescriptorTest {
 				indexInSubtaskGroup,
 				attemptNumber,
 				targetSlotNumber,
-				taskStateHandles,
+				taskRestore,
 				producedResults,
 				inputGates);
 
@@ -93,7 +95,7 @@ public class TaskDeploymentDescriptorTest {
 			assertFalse(orig.getSerializedJobInformation() == copy.getSerializedJobInformation());
 			assertFalse(orig.getSerializedTaskInformation() == copy.getSerializedTaskInformation());
 			assertFalse(orig.getExecutionAttemptId() == copy.getExecutionAttemptId());
-			assertFalse(orig.getTaskStateHandles() == copy.getTaskStateHandles());
+			assertFalse(orig.getTaskRestore() == copy.getTaskRestore());
 			assertFalse(orig.getProducedPartitions() == copy.getProducedPartitions());
 			assertFalse(orig.getInputGates() == copy.getInputGates());
 
@@ -104,7 +106,8 @@ public class TaskDeploymentDescriptorTest {
 			assertEquals(orig.getSubtaskIndex(), copy.getSubtaskIndex());
 			assertEquals(orig.getAttemptNumber(), copy.getAttemptNumber());
 			assertEquals(orig.getTargetSlotNumber(), copy.getTargetSlotNumber());
-			assertEquals(orig.getTaskStateHandles(), copy.getTaskStateHandles());
+			assertEquals(orig.getTaskRestore().getRestoreCheckpointId(), copy.getTaskRestore().getRestoreCheckpointId());
+			assertEquals(orig.getTaskRestore().getTaskStateSnapshot(), copy.getTaskRestore().getTaskStateSnapshot());
 			assertEquals(orig.getProducedPartitions(), copy.getProducedPartitions());
 			assertEquals(orig.getInputGates(), copy.getInputGates());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 274df59..14752e8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -174,7 +174,7 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 
 			// target state
 			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
-			target.getCurrentExecutionAttempt().setInitialState(mock(TaskStateSnapshot.class));
+			target.getCurrentExecutionAttempt().setInitialState(mock(JobManagerTaskRestore.class));
 		}
 
 		// validate that the target vertices have the state's location as the location preference

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 9a6f405..8408ee0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -255,7 +255,7 @@ public class TaskAsyncCallTest {
 			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 			Collections.<InputGateDeploymentDescriptor>emptyList(),
 			0,
-			new TaskStateSnapshot(),
+			null,
 			mock(MemoryManager.class),
 			mock(IOManager.class),
 			networkEnvironment,

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index f89b916..a28bf23 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 0af1471..5ffaa29 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.blob.PermanentBlobCache;
 import org.apache.flink.runtime.blob.TransientBlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -228,6 +229,9 @@ public class InterruptSensitiveRestoreTest {
 		streamConfig.setOperatorID(operatorID);
 		TaskStateSnapshot stateSnapshot = new TaskStateSnapshot();
 		stateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
+
+		JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(1L, stateSnapshot);
+
 		JobInformation jobInformation = new JobInformation(
 			new JobID(),
 			"test job name",
@@ -257,7 +261,7 @@ public class InterruptSensitiveRestoreTest {
 			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 			Collections.<InputGateDeploymentDescriptor>emptyList(),
 			0,
-			stateSnapshot,
+			taskRestore,
 			mock(MemoryManager.class),
 			mock(IOManager.class),
 			networkEnvironment,

http://git-wip-us.apache.org/repos/asf/flink/blob/402a2e30/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index f492d9e..d9a21a9 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
@@ -921,7 +922,7 @@ public class StreamTaskTest extends TestLogger {
 			Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
 			Collections.<InputGateDeploymentDescriptor>emptyList(),
 			0,
-			new TaskStateSnapshot(),
+			new JobManagerTaskRestore(1L, null),
 			mock(MemoryManager.class),
 			mock(IOManager.class),
 			network,


Mime
View raw message