flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] flink git commit: [FLINK-4196] [runtime] Remove the 'recoveryTimestamp' from checkpoint restores.
Date Thu, 14 Jul 2016 19:12:32 GMT
Repository: flink
Updated Branches:
  refs/heads/master 64aa7c899 -> de6a3d33e


[FLINK-4196] [runtime] Remove the 'recoveryTimestamp' from checkpoint restores.

The 'recoveryTimestamp' was an unsafe wall clock timestamp attached by the master
upon recovery. This this timestamp cannot be relied upon in distributed setups,
it is removed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/de6a3d33
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/de6a3d33
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/de6a3d33

Branch: refs/heads/master
Commit: de6a3d33ecfa689fd0da1ef661bbf6edb68e9d0b
Parents: 2477161
Author: Stephan Ewen <sewen@apache.org>
Authored: Wed Jul 13 17:31:35 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jul 14 21:11:48 2016 +0200

----------------------------------------------------------------------
 .../streaming/state/RocksDBStateBackend.java    | 13 ++++------
 .../operator/AbstractCEPPatternOperator.java    |  4 ++--
 .../AbstractKeyedCEPPatternOperator.java        |  4 ++--
 .../checkpoint/CheckpointCoordinator.java       |  4 +---
 .../checkpoint/SavepointCoordinator.java        |  4 +---
 .../deployment/TaskDeploymentDescriptor.java    | 13 ++--------
 .../flink/runtime/executiongraph/Execution.java |  7 +-----
 .../runtime/executiongraph/ExecutionVertex.java |  4 +---
 .../runtime/jobgraph/tasks/StatefulTask.java    |  3 +--
 .../runtime/state/AbstractStateBackend.java     |  5 ++--
 .../state/AsynchronousKvStateSnapshot.java      |  3 +--
 .../runtime/state/GenericFoldingState.java      |  9 ++++---
 .../flink/runtime/state/GenericListState.java   |  5 ++--
 .../runtime/state/GenericReducingState.java     |  5 ++--
 .../flink/runtime/state/KvStateSnapshot.java    |  4 +---
 .../apache/flink/runtime/state/StateUtils.java  | 12 ++++------
 .../filesystem/AbstractFsStateSnapshot.java     |  3 +--
 .../state/memory/AbstractMemStateSnapshot.java  |  2 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  6 +----
 .../checkpoint/CheckpointStateRestoreTest.java  | 10 ++++----
 .../checkpoint/SavepointCoordinatorTest.java    |  3 +--
 .../runtime/state/StateBackendTestBase.java     | 25 ++++++++++----------
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  4 +---
 .../source/ContinuousFileReaderOperator.java    |  5 ++--
 .../api/operators/AbstractStreamOperator.java   |  4 ++--
 .../operators/AbstractUdfStreamOperator.java    |  4 ++--
 .../streaming/api/operators/StreamOperator.java |  4 +---
 .../operators/GenericWriteAheadSink.java        |  4 ++--
 ...ractAlignedProcessingTimeWindowOperator.java |  4 ++--
 .../operators/windowing/WindowOperator.java     |  4 ++--
 .../streaming/runtime/tasks/StreamTask.java     | 11 ++++-----
 .../operators/WriteAheadSinkTestBase.java       |  2 +-
 ...AlignedProcessingTimeWindowOperatorTest.java |  4 ++--
 ...AlignedProcessingTimeWindowOperatorTest.java |  4 ++--
 .../tasks/StreamTaskAsyncCheckpointTest.java    |  4 ++--
 .../util/OneInputStreamOperatorTestHarness.java |  4 ++--
 36 files changed, 83 insertions(+), 127 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 4c44249..4778aa0 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -58,6 +58,7 @@ import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.api.common.state.StateBackend;
 
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.streaming.util.HDFSCopyFromLocal;
 import org.apache.flink.streaming.util.HDFSCopyToLocal;
 import org.apache.hadoop.fs.FileSystem;
@@ -162,7 +163,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	 * checkpoints and when disposing the db. Otherwise, the asynchronous snapshot might try
 	 * iterating over a disposed db.
 	 */
-	private Object dbCleanupLock;
+	private final SerializableObject dbCleanupLock = new SerializableObject();
 
 	/**
 	 * Information about the k/v states as we create them. This is used to retrieve the
@@ -289,8 +290,6 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 			throw new RuntimeException("Error cleaning RocksDB data directory.", e);
 		}
 
-		dbCleanupLock = new Object();
-
 		List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>(1);
 		// RocksDB seems to need this...
 		columnFamilyDescriptors.add(new ColumnFamilyDescriptor("default".getBytes()));
@@ -479,7 +478,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
-	public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots, long recoveryTimestamp) throws Exception {
+	public final void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {
 		if (keyValueStateSnapshots.size() == 0) {
 			return;
 		}
@@ -670,8 +669,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(
 				RocksDBStateBackend stateBackend,
 				TypeSerializer<Object> keySerializer,
-				ClassLoader classLoader,
-				long recoveryTimestamp) throws Exception {
+				ClassLoader classLoader) throws Exception {
 			throw new RuntimeException("Should never happen.");
 		}
 
@@ -807,8 +805,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 		public final KvState<Object, Object, ValueState<Object>, ValueStateDescriptor<Object>, RocksDBStateBackend> restoreState(
 				RocksDBStateBackend stateBackend,
 				TypeSerializer<Object> keySerializer,
-				ClassLoader classLoader,
-				long recoveryTimestamp) throws Exception {
+				ClassLoader classLoader) throws Exception {
 			throw new RuntimeException("Should never happen.");
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index 753656f..8150eae 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -118,8 +118,8 @@ abstract public class AbstractCEPPatternOperator<IN, OUT> extends AbstractCEPBas
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
-		super.restoreState(state, recoveryTimestamp);
+	public void restoreState(StreamTaskState state) throws Exception {
+		super.restoreState(state);
 
 		StreamStateHandle stream = (StreamStateHandle)state.getOperatorState();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 83892ca..9ffe9b6 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -186,8 +186,8 @@ abstract public class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> extends Abst
 	}
 
 	@Override
-	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
-		super.restoreState(state, recoveryTimestamp);
+	public void restoreState(StreamTaskState state) throws Exception {
+		super.restoreState(state);
 
 		@SuppressWarnings("unchecked")
 		StateHandle<DataInputView> stateHandle = (StateHandle<DataInputView>) state.getOperatorState();

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index c599e5a..c6b2a77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -801,8 +801,6 @@ public class CheckpointCoordinator {
 				}
 			}
 
-			long recoveryTimestamp = System.currentTimeMillis();
-
 			for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry: latest.getTaskStates().entrySet()) {
 				TaskState taskState = taskGroupStateEntry.getValue();
 				ExecutionJobVertex executionJobVertex = tasks.get(taskGroupStateEntry.getKey());
@@ -833,7 +831,7 @@ public class CheckpointCoordinator {
 						Map<Integer, SerializedValue<StateHandle<?>>> kvStateForTaskMap = taskState.getUnwrappedKvStates(keyGroupPartitions.get(i));
 
 						Execution currentExecutionAttempt = executionJobVertex.getTaskVertices()[i].getCurrentExecutionAttempt();
-						currentExecutionAttempt.setInitialState(state, kvStateForTaskMap, recoveryTimestamp);
+						currentExecutionAttempt.setInitialState(state, kvStateForTaskMap);
 					}
 
 					if (allOrNothingState && counter > 0 && counter < executionJobVertex.getParallelism()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
index 2c348ea..b96a02a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
@@ -188,8 +188,6 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 				throw new IllegalStateException("CheckpointCoordinator is shut down");
 			}
 
-			long recoveryTimestamp = System.currentTimeMillis();
-
 			LOG.info("Rolling back to savepoint '{}'.", savepointPath);
 
 			CompletedCheckpoint checkpoint = savepointStore.getState(savepointPath);
@@ -237,7 +235,7 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 							.getTaskVertices()[i]
 							.getCurrentExecutionAttempt();
 
-						currentExecutionAttempt.setInitialState(state, kvStateForTaskMap, recoveryTimestamp);
+						currentExecutionAttempt.setInitialState(state, kvStateForTaskMap);
 					}
 				} else {
 					String msg = String.format("Failed to rollback to savepoint %s. " +

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index f595681..60fb45c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -93,8 +93,6 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	/** The execution configuration (see {@link ExecutionConfig}) related to the specific job. */
 	private final SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
-	private long recoveryTimestamp;
-		
 	/**
 	 * Constructs a task deployment descriptor.
 	 */
@@ -116,8 +114,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			List<BlobKey> requiredJarFiles,
 			List<URL> requiredClasspaths,
 			int targetSlotNumber,
-			SerializedValue<StateHandle<?>> operatorState,
-			long recoveryTimestamp) {
+			SerializedValue<StateHandle<?>> operatorState) {
 
 		checkArgument(indexInSubtaskGroup >= 0);
 		checkArgument(numberOfSubtasks > indexInSubtaskGroup);
@@ -142,7 +139,6 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		this.requiredClasspaths = checkNotNull(requiredClasspaths);
 		this.targetSlotNumber = targetSlotNumber;
 		this.operatorState = operatorState;
-		this.recoveryTimestamp = recoveryTimestamp;
 	}
 
 	public TaskDeploymentDescriptor(
@@ -182,8 +178,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			requiredJarFiles,
 			requiredClasspaths,
 			targetSlotNumber,
-			null,
-			-1);
+			null);
 	}
 
 	/**
@@ -324,8 +319,4 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	public SerializedValue<StateHandle<?>> getOperatorState() {
 		return operatorState;
 	}
-	
-	public long getRecoveryTimestamp() {
-		return recoveryTimestamp;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 691adaf..1b32100 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -139,8 +139,6 @@ public class Execution implements Serializable {
 	private SerializedValue<StateHandle<?>> operatorState;
 
 	private Map<Integer, SerializedValue<StateHandle<?>>> operatorKvState;
-	
-	private long recoveryTimestamp;
 
 	/** The execution context which is used to execute futures. */
 	@SuppressWarnings("NonSerializableFieldInSerializableClass")
@@ -239,15 +237,13 @@ public class Execution implements Serializable {
 
 	public void setInitialState(
 		SerializedValue<StateHandle<?>> initialState,
-		Map<Integer, SerializedValue<StateHandle<?>>> initialKvState,
-		long recoveryTimestamp) {
+		Map<Integer, SerializedValue<StateHandle<?>>> initialKvState) {
 
 		if (state != ExecutionState.CREATED) {
 			throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
 		}
 		this.operatorState = initialState;
 		this.operatorKvState = initialKvState;
-		this.recoveryTimestamp = recoveryTimestamp;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -376,7 +372,6 @@ public class Execution implements Serializable {
 				slot,
 				operatorState,
 				operatorKvState,
-				recoveryTimestamp,
 				attemptNumber);
 
 			// register this execution at the execution graph, to receive call backs

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a85f32a..e20f466 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -638,7 +638,6 @@ public class ExecutionVertex implements Serializable {
 			SimpleSlot targetSlot,
 			SerializedValue<StateHandle<?>> operatorState,
 			Map<Integer, SerializedValue<StateHandle<?>>> operatorKvState,
-			long recoveryTimestamp,
 			int attemptNumber) {
 
 		// Produced intermediate results
@@ -689,8 +688,7 @@ public class ExecutionVertex implements Serializable {
 			jarFiles,
 			classpaths,
 			targetSlot.getRoot().getSlotNumber(),
-			operatorState,
-			recoveryTimestamp);
+			operatorState);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index aca1bc2..f8bba1a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -31,9 +31,8 @@ public interface StatefulTask<T extends StateHandle<?>> {
 	 * a snapshot of the state from a previous execution.
 	 * 
 	 * @param stateHandle The handle to the state.
-	 * @param recoveryTimestamp Global recovery timestamp.
 	 */
-	void setInitialState(T stateHandle, long recoveryTimestamp) throws Exception;
+	void setInitialState(T stateHandle) throws Exception;
 
 	/**
 	 * This method is either called directly and asynchronously by the checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 95ca13f..6ab4999 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -341,7 +341,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 	 * @param keyValueStateSnapshots The Map of snapshots
 	 */
 	@SuppressWarnings("unchecked,rawtypes")
-	public void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots, long recoveryTimestamp) throws Exception {
+	public void injectKeyValueStateSnapshots(HashMap<String, KvStateSnapshot> keyValueStateSnapshots) throws Exception {
 		if (keyValueStateSnapshots != null) {
 			if (keyValueStatesByName == null) {
 				keyValueStatesByName = new HashMap<>();
@@ -350,8 +350,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 			for (Map.Entry<String, KvStateSnapshot> state : keyValueStateSnapshots.entrySet()) {
 				KvState kvState = state.getValue().restoreState(this,
 					keySerializer,
-					userCodeClassLoader,
-					recoveryTimestamp);
+					userCodeClassLoader);
 				keyValueStatesByName.put(state.getKey(), kvState);
 			}
 			keyValueStates = keyValueStatesByName.values().toArray(new KvState[keyValueStatesByName.size()]);

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
index 30a9c5a..877034d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AsynchronousKvStateSnapshot.java
@@ -45,8 +45,7 @@ public abstract class AsynchronousKvStateSnapshot<K, N, S extends State, SD exte
 	public final KvState<K, N, S, SD, Backend> restoreState(
 		Backend stateBackend,
 		TypeSerializer<K> keySerializer,
-		ClassLoader classLoader,
-		long recoveryTimestamp) throws Exception {
+		ClassLoader classLoader) throws Exception {
 		throw new RuntimeException("This should never be called and probably points to a bug.");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
index ef1d796..762cc3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericFoldingState.java
@@ -113,11 +113,10 @@ public class GenericFoldingState<K, N, T, ACC, Backend extends AbstractStateBack
 		@Override
 		@SuppressWarnings("unchecked")
 		public KvState<K, N, FoldingState<T, ACC>, FoldingStateDescriptor<T, ACC>, Backend> restoreState(
-			Backend stateBackend,
-			TypeSerializer<K> keySerializer,
-			ClassLoader classLoader,
-			long recoveryTimestamp) throws Exception {
-			return new GenericFoldingState((ValueState<ACC>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, recoveryTimestamp), foldFunction);
+				Backend stateBackend,
+				TypeSerializer<K> keySerializer,
+				ClassLoader classLoader) throws Exception {
+			return new GenericFoldingState((ValueState<ACC>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader), foldFunction);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
index fbb0170..9393082 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericListState.java
@@ -120,9 +120,8 @@ public class GenericListState<K, N, T, Backend extends AbstractStateBackend, W e
 		public KvState<K, N, ListState<T>, ListStateDescriptor<T>, Backend> restoreState(
 			Backend stateBackend,
 			TypeSerializer<K> keySerializer,
-			ClassLoader classLoader,
-			long recoveryTimestamp) throws Exception {
-			return new GenericListState((ValueState<T>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, recoveryTimestamp));
+			ClassLoader classLoader) throws Exception {
+			return new GenericListState((ValueState<T>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
index 102e25e..7407dfa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/GenericReducingState.java
@@ -118,9 +118,8 @@ public class GenericReducingState<K, N, T, Backend extends AbstractStateBackend,
 		public KvState<K, N, ReducingState<T>, ReducingStateDescriptor<T>, Backend> restoreState(
 			Backend stateBackend,
 			TypeSerializer<K> keySerializer,
-			ClassLoader classLoader,
-			long recoveryTimestamp) throws Exception {
-			return new GenericReducingState((ValueState<T>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader, recoveryTimestamp), reduceFunction);
+			ClassLoader classLoader) throws Exception {
+			return new GenericReducingState((ValueState<T>) wrappedSnapshot.restoreState(stateBackend, keySerializer, classLoader), reduceFunction);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
index 245427e..847d53e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
@@ -48,7 +48,6 @@ public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescript
 	 *                     from this snapshot.
 	 * @param keySerializer The serializer for the keys.
 	 * @param classLoader The class loader for user-defined types.
-	 * @param recoveryTimestamp The timestamp of the checkpoint we are recovering from.
 	 *
 	 * @return An instance of the key/value state loaded from this snapshot.
 	 * 
@@ -57,8 +56,7 @@ public interface KvStateSnapshot<K, N, S extends State, SD extends StateDescript
 	KvState<K, N, S, SD, Backend> restoreState(
 		Backend stateBackend,
 		TypeSerializer<K> keySerializer,
-		ClassLoader classLoader,
-		long recoveryTimestamp) throws Exception;
+		ClassLoader classLoader) throws Exception;
 
 	/**
 	 * Discards the state snapshot, removing any resources occupied by it.

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
index 96e0eb5..b130c70 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
@@ -38,24 +38,22 @@ public class StateUtils {
 	 *            The state carrier operator.
 	 * @param state
 	 *            The state handle.
-	 * @param recoveryTimestamp
-	 *            Global recovery timestamp
 	 * @param <T>
 	 *            Type bound for the
 	 */
-	public static <T extends StateHandle<?>> void setOperatorState(StatefulTask<?> op,
-			StateHandle<?> state, long recoveryTimestamp) throws Exception {
+	public static <T extends StateHandle<?>> void setOperatorState(StatefulTask<?> op, StateHandle<?> state)
+			throws Exception {
+
 		@SuppressWarnings("unchecked")
 		StatefulTask<T> typedOp = (StatefulTask<T>) op;
 		@SuppressWarnings("unchecked")
 		T typedHandle = (T) state;
 
-		typedOp.setInitialState(typedHandle, recoveryTimestamp);
+		typedOp.setInitialState(typedHandle);
 	}
 
 	// ------------------------------------------------------------------------
 
 	/** Do not instantiate */
-	private StateUtils() {
-	}
+	private StateUtils() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
index 432a9e6..cd02870 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsStateSnapshot.java
@@ -83,8 +83,7 @@ public abstract class AbstractFsStateSnapshot<K, N, SV, S extends State, SD exte
 	public KvState<K, N, S, SD, FsStateBackend> restoreState(
 		FsStateBackend stateBackend,
 		final TypeSerializer<K> keySerializer,
-		ClassLoader classLoader,
-		long recoveryTimestamp) throws Exception {
+		ClassLoader classLoader) throws Exception {
 
 		// validity checks
 		if (!this.keySerializer.equals(keySerializer)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
index 5d4f0d8..86d4c7d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/AbstractMemStateSnapshot.java
@@ -82,7 +82,7 @@ public abstract class AbstractMemStateSnapshot<K, N, SV, S extends State, SD ext
 	public KvState<K, N, S, SD, MemoryStateBackend> restoreState(
 		MemoryStateBackend stateBackend,
 		final TypeSerializer<K> keySerializer,
-		ClassLoader classLoader, long recoveryTimestamp) throws Exception {
+		ClassLoader classLoader) throws Exception {
 
 		// validity checks
 		if (!this.keySerializer.equals(keySerializer)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 58eb90c..25e4b43 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -226,8 +226,6 @@ public class Task implements Runnable {
 	 * initialization, to be memory friendly */
 	private volatile SerializedValue<StateHandle<?>> operatorState;
 
-	private volatile long recoveryTs;
-
 	/** Initialized from the Flink configuration. May also be set at the ExecutionConfig */
 	private long taskCancellationInterval;
 
@@ -259,7 +257,6 @@ public class Task implements Runnable {
 		this.requiredClasspaths = checkNotNull(tdd.getRequiredClasspaths());
 		this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName());
 		this.operatorState = tdd.getOperatorState();
-		this.recoveryTs = tdd.getRecoveryTimestamp();
 		this.serializedExecutionConfig = checkNotNull(tdd.getSerializedExecutionConfig());
 
 		this.taskCancellationInterval = jobConfiguration.getLong(
@@ -538,14 +535,13 @@ public class Task implements Runnable {
 
 			// get our private reference onto the stack (be safe against concurrent changes)
 			SerializedValue<StateHandle<?>> operatorState = this.operatorState;
-			long recoveryTs = this.recoveryTs;
 
 			if (operatorState != null) {
 				if (invokable instanceof StatefulTask) {
 					try {
 						StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader);
 						StatefulTask<?> op = (StatefulTask<?>) invokable;
-						StateUtils.setOperatorState(op, state, recoveryTs);
+						StateUtils.setOperatorState(op, state);
 					}
 					catch (Exception e) {
 						throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 68cd145..2b1b7e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -113,11 +113,11 @@ public class CheckpointStateRestoreTest {
 			coord.restoreLatestCheckpointedState(map, true, false);
 
 			// verify that each stateful vertex got the state
-			verify(statefulExec1, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any(), Mockito.anyLong());
-			verify(statefulExec2, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any(), Mockito.anyLong());
-			verify(statefulExec3, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any(), Mockito.anyLong());
-			verify(statelessExec1, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any(), Mockito.anyLong());
-			verify(statelessExec2, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any(), Mockito.anyLong());
+			verify(statefulExec1, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
+			verify(statefulExec2, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
+			verify(statefulExec3, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
+			verify(statelessExec1, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
+			verify(statelessExec2, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
index 405fd07..384ed42 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
@@ -58,7 +58,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doThrow;
@@ -202,7 +201,7 @@ public class SavepointCoordinatorTest extends TestLogger {
 		// Verify all executions have been reset
 		for (ExecutionVertex vertex : ackVertices) {
 			verify(vertex.getCurrentExecutionAttempt(), times(1)).setInitialState(
-					any(SerializedValue.class), any(Map.class), anyLong());
+					any(SerializedValue.class), any(Map.class));
 		}
 
 		// Verify all promises removed

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index cc36f4a..12cf112 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -53,6 +53,7 @@ import static org.junit.Assert.*;
 /**
  * Generic tests for the partitioned state part of {@link AbstractStateBackend}.
  */
+@SuppressWarnings("serial")
 public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 	protected B backend;
@@ -129,7 +130,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		backend.dispose();
 		backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
 
-		backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100);
+		backend.injectKeyValueStateSnapshots((HashMap) snapshot1);
 
 		for (String key: snapshot1.keySet()) {
 			snapshot1.get(key).discardState();
@@ -145,7 +146,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		backend.dispose();
 		backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
 
-		backend.injectKeyValueStateSnapshots((HashMap) snapshot2, 100);
+		backend.injectKeyValueStateSnapshots((HashMap) snapshot2);
 
 		for (String key: snapshot2.keySet()) {
 			snapshot2.get(key).discardState();
@@ -221,7 +222,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 		backend.dispose();
 		backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
 
-		backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100);
+		backend.injectKeyValueStateSnapshots((HashMap) snapshot1);
 
 		for (String key: snapshot1.keySet()) {
 			snapshot1.get(key).discardState();
@@ -288,7 +289,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 			// restore the first snapshot and validate it
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
-			backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100);
+			backend.injectKeyValueStateSnapshots((HashMap) snapshot1);
 
 			for (String key: snapshot1.keySet()) {
 				snapshot1.get(key).discardState();
@@ -305,7 +306,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 			// restore the second snapshot and validate it
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
-			backend.injectKeyValueStateSnapshots((HashMap) snapshot2, 100);
+			backend.injectKeyValueStateSnapshots((HashMap) snapshot2);
 
 			for (String key: snapshot2.keySet()) {
 				snapshot2.get(key).discardState();
@@ -383,7 +384,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 			// restore the first snapshot and validate it
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
-			backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100);
+			backend.injectKeyValueStateSnapshots((HashMap) snapshot1);
 
 			for (String key: snapshot1.keySet()) {
 				snapshot1.get(key).discardState();
@@ -400,7 +401,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 			// restore the second snapshot and validate it
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
-			backend.injectKeyValueStateSnapshots((HashMap) snapshot2, 100);
+			backend.injectKeyValueStateSnapshots((HashMap) snapshot2);
 
 			for (String key: snapshot2.keySet()) {
 				snapshot2.get(key).discardState();
@@ -484,7 +485,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 			// restore the first snapshot and validate it
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
-			backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100);
+			backend.injectKeyValueStateSnapshots((HashMap) snapshot1);
 
 			for (String key: snapshot1.keySet()) {
 				snapshot1.get(key).discardState();
@@ -501,7 +502,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 			// restore the second snapshot and validate it
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
-			backend.injectKeyValueStateSnapshots((HashMap) snapshot2, 100);
+			backend.injectKeyValueStateSnapshots((HashMap) snapshot2);
 
 			for (String key: snapshot2.keySet()) {
 				snapshot2.get(key).discardState();
@@ -553,7 +554,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 			// restore the first snapshot and validate it
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
-			backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100);
+			backend.injectKeyValueStateSnapshots((HashMap) snapshot1);
 
 			for (String key: snapshot1.keySet()) {
 				snapshot1.get(key).discardState();
@@ -612,7 +613,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 			// restore the first snapshot and validate it
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
-			backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100);
+			backend.injectKeyValueStateSnapshots((HashMap) snapshot1);
 
 			for (String key: snapshot1.keySet()) {
 				snapshot1.get(key).discardState();
@@ -674,7 +675,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> {
 
 			// restore the first snapshot and validate it
 			backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE);
-			backend.injectKeyValueStateSnapshots((HashMap) snapshot1, 100);
+			backend.injectKeyValueStateSnapshots((HashMap) snapshot1);
 
 			for (String key: snapshot1.keySet()) {
 				snapshot1.get(key).discardState();

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 7b55987..4b90b88 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -201,9 +201,7 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
-		public void setInitialState(StateHandle<Serializable> stateHandle, long ts) throws Exception {
-
-		}
+		public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {}
 
 		@Override
 		public boolean triggerCheckpoint(long checkpointId, long timestamp) {

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index b13e7a8..fda5efd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -406,8 +406,8 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 	}
 
 	@Override
-	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
-		super.restoreState(state, recoveryTimestamp);
+	public void restoreState(StreamTaskState state) throws Exception {
+		super.restoreState(state);
 
 		StreamStateHandle stream = (StreamStateHandle) state.getOperatorState();
 
@@ -427,6 +427,7 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A
 		}
 
 		// read the state of the format
+		@SuppressWarnings("unchecked")
 		S formatState = (S) ois.readObject();
 
 		// set the whole reader state for the open() to find.

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7755347..0269a34 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -195,11 +195,11 @@ public abstract class AbstractStreamOperator<OUT>
 	
 	@Override
 	@SuppressWarnings("rawtypes,unchecked")
-	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
+	public void restoreState(StreamTaskState state) throws Exception {
 		// restore the key/value state. the actual restore happens lazily, when the function requests
 		// the state again, because the restore method needs information provided by the user function
 		if (stateBackend != null) {
-			stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates(), recoveryTimestamp);
+			stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates());
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 86b07d6..1ddd934 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -150,8 +150,8 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends
 	}
 
 	@Override
-	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
-		super.restoreState(state, recoveryTimestamp);
+	public void restoreState(StreamTaskState state) throws Exception {
+		super.restoreState(state);
 		
 		StateHandle<Serializable> stateHandle =  state.getFunctionState();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 4572ef1..3e38165 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -115,13 +115,11 @@ public interface StreamOperator<OUT> extends Serializable {
 	 *
 	 * @param state The state of operator that was snapshotted as part of checkpoint
 	 *              from which the execution is restored.
-	 * 
-	 * @param recoveryTimestamp Global recovery timestamp
 	 *
 	 * @throws Exception Exceptions during state restore should be forwarded, so that the system can
 	 *                   properly react to failed state restore and fail the execution attempt.
 	 */
-	void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception;
+	void restoreState(StreamTaskState state) throws Exception;
 
 	/**
 	 * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 5545717..b268c7a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -109,8 +109,8 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I
 	}
 
 	@Override
-	public void restoreState(StreamTaskState state, long recoveryTimestamp) throws Exception {
-		super.restoreState(state, recoveryTimestamp);
+	public void restoreState(StreamTaskState state) throws Exception {
+		super.restoreState(state);
 		this.state = (ExactlyOnceState) state.getFunctionState();
 		out = null;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 32c4e67..fdc8117 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -261,8 +261,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	}
 
 	@Override
-	public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
-		super.restoreState(taskState, recoveryTimestamp);
+	public void restoreState(StreamTaskState taskState) throws Exception {
+		super.restoreState(taskState);
 
 		@SuppressWarnings("unchecked")
 		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index b6ca564..bb05d2b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -846,8 +846,8 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 	}
 
 	@Override
-	public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
-		super.restoreState(taskState, recoveryTimestamp);
+	public void restoreState(StreamTaskState taskState) throws Exception {
+		super.restoreState(taskState);
 
 		final ClassLoader userClassloader = getUserCodeClassloader();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index a5de312..6ad94b4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -156,8 +156,6 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	/** Flag to mark this task as canceled */
 	private volatile boolean canceled;
 
-	private long recoveryTimestamp;
-
 	private long lastCheckpointSize = 0;
 
 	// ------------------------------------------------------------------------
@@ -498,13 +496,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	// ------------------------------------------------------------------------
 	//  Checkpoint and Restore
 	// ------------------------------------------------------------------------
-	
+
 	@Override
-	public void setInitialState(StreamTaskStateList initialState, long recoveryTimestamp) {
+	public void setInitialState(StreamTaskStateList initialState) {
 		lazyRestoreState = initialState;
-		this.recoveryTimestamp = recoveryTimestamp;
 	}
-	
+
 	private void restoreState() throws Exception {
 		if (lazyRestoreState != null) {
 			LOG.info("Restoring checkpointed state to task {}", getName());
@@ -522,7 +519,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					
 					if (state != null && operator != null) {
 						LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName());
-						operator.restoreState(state, recoveryTimestamp);
+						operator.restoreState(state);
 					}
 					else if (operator != null) {
 						LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName());

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
index 3f3a387..1d706d1 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/WriteAheadSinkTestBase.java
@@ -201,7 +201,7 @@ public abstract class WriteAheadSinkTestBase<IN, S extends GenericWriteAheadSink
 		task.getOperator().close();
 		task.getOperator().open();
 
-		task.getOperator().restoreState(states.get(states.size() - 1), 0);
+		task.getOperator().restoreState(states.get(states.size() - 1));
 
 		for (int x = 0; x < 20; x++) {
 			testHarness.processElement(new StreamRecord<>(generateValue(elementCounter, 2)));

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index eb087c6..d2f8e05 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -512,7 +512,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							windowSize, windowSize);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
-			op.restoreState(state, 1);
+			op.restoreState(state);
 			op.open();
 
 			// inject some more elements
@@ -609,7 +609,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 					windowSize, windowSlide);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
-			op.restoreState(state, 1);
+			op.restoreState(state);
 			op.open();
 			
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index af46513..585eaa7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -609,7 +609,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					windowSize, windowSize);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
-			op.restoreState(state, 1);
+			op.restoreState(state);
 			op.open();
 
 			// inject the remaining elements
@@ -717,7 +717,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					windowSize, windowSlide);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
-			op.restoreState(state, 1);
+			op.restoreState(state);
 			op.open();
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
index 8e1cadf..b74903a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskAsyncCheckpointTest.java
@@ -172,8 +172,8 @@ public class StreamTaskAsyncCheckpointTest {
 		}
 
 		@Override
-		public void restoreState(StreamTaskState taskState, long recoveryTimestamp) throws Exception {
-			super.restoreState(taskState, recoveryTimestamp);
+		public void restoreState(StreamTaskState taskState) throws Exception {
+			super.restoreState(taskState);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/de6a3d33/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
index 66bdb57..7d0fc57 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
@@ -195,10 +195,10 @@ public class OneInputStreamOperatorTestHarness<IN, OUT> {
 	}
 
 	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(StreamTaskState, long)} ()}
+	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#restoreState(StreamTaskState)} ()}
 	 */
 	public void restore(StreamTaskState snapshot, long recoveryTimestamp) throws Exception {
-		operator.restoreState(snapshot, recoveryTimestamp);
+		operator.restoreState(snapshot);
 	}
 
 	/**


Mime
View raw message