flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [3/8] flink git commit: [FLINK-2924] [streaming] Out-of-core state backend for JDBC databases
Date Tue, 24 Nov 2015 09:45:05 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
new file mode 100644
index 0000000..ff12aae
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java
@@ -0,0 +1,465 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.derby.drda.NetworkServerControl;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
+import org.apache.flink.runtime.state.KvState;
+import org.apache.flink.runtime.state.KvStateSnapshot;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+public class DbStateBackendTest {
+
+	private static NetworkServerControl server;
+	private static File tempDir;
+	private static DbBackendConfig conf;
+
+	@BeforeClass
+	public static void startDerbyServer() throws UnknownHostException, Exception {
+		server = new NetworkServerControl(InetAddress.getByName("localhost"), 1527, "flink", "flink");
+		server.start(null);
+		tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		conf = new DbBackendConfig("flink", "flink",
+				"jdbc:derby://localhost:1527/" + tempDir.getAbsolutePath() + "/flinkDB1;create=true");
+		conf.setDbAdapterClass(DerbyAdapter.class);
+		conf.setKvStateCompactionFrequency(1);
+
+	}
+
+	@AfterClass
+	public static void stopDerbyServer() throws Exception {
+		try {
+			server.shutdown();
+			FileUtils.deleteDirectory(new File(tempDir.getAbsolutePath() + "/flinkDB1"));
+			FileUtils.deleteDirectory(new File(tempDir.getAbsolutePath() + "/flinkDB2"));
+			FileUtils.forceDelete(new File("derby.log"));
+		} catch (Exception ignore) {
+		}
+	}
+
+	@Test
+	public void testSetupAndSerialization() throws Exception {
+		DbStateBackend dbBackend = new DbStateBackend(conf);
+
+		assertFalse(dbBackend.isInitialized());
+
+		// serialize / copy the backend
+		DbStateBackend backend = CommonTestUtils.createCopySerializable(dbBackend);
+		assertFalse(backend.isInitialized());
+		assertEquals(dbBackend.getConfiguration(), backend.getConfiguration());
+
+		Environment env = new DummyEnvironment("test", 1, 0);
+		backend.initializeForJob(env);
+
+		assertNotNull(backend.getConnection());
+		assertTrue(isTableCreated(backend.getConnection(), "checkpoints_" + env.getJobID().toString()));
+
+		backend.disposeAllStateForCurrentJob();
+		assertFalse(isTableCreated(backend.getConnection(), "checkpoints_" + env.getJobID().toString()));
+		backend.close();
+
+		assertTrue(backend.getConnection().isClosed());
+	}
+
+	@Test
+	public void testSerializableState() throws Exception {
+		Environment env = new DummyEnvironment("test", 1, 0);
+		DbStateBackend backend = new DbStateBackend(conf);
+
+		backend.initializeForJob(env);
+
+		String state1 = "dummy state";
+		String state2 = "row row row your boat";
+		Integer state3 = 42;
+
+		StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L,
+				System.currentTimeMillis());
+		StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L,
+				System.currentTimeMillis());
+		StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L,
+				System.currentTimeMillis());
+
+		assertEquals(state1, handle1.getState(getClass().getClassLoader()));
+		handle1.discardState();
+
+		assertEquals(state2, handle2.getState(getClass().getClassLoader()));
+		handle2.discardState();
+
+		assertFalse(isTableEmpty(backend.getConnection(), "checkpoints_" + env.getJobID().toString()));
+
+		assertEquals(state3, handle3.getState(getClass().getClassLoader()));
+		handle3.discardState();
+
+		assertTrue(isTableEmpty(backend.getConnection(), "checkpoints_" + env.getJobID().toString()));
+
+		backend.close();
+
+	}
+
+	@Test
+	public void testKeyValueState() throws Exception {
+
+		// We will create the DbStateBackend backed with a FsStateBackend for
+		// nonPartitioned states
+		File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString());
+		try {
+			FsStateBackend fileBackend = new FsStateBackend(localFileUri(tempDir));
+
+			DbBackendConfig conf2 = new DbBackendConfig("flink", "flink",
+					"jdbc:derby://localhost:1527/" + tempDir.getAbsolutePath() + "/flinkDB2;create=true");
+			conf2.setDbAdapterClass(DerbyAdapter.class);
+
+			DbStateBackend backend = new DbStateBackend(conf, fileBackend);
+			DbStateBackend backend2 = new DbStateBackend(conf2, fileBackend);
+
+			Environment env = new DummyEnvironment("test", 2, 0);
+			Environment env2 = new DummyEnvironment("test", 2, 1);
+
+			backend.initializeForJob(env);
+			backend2.initializeForJob(env2);
+
+			LazyDbKvState<Integer, String> kv = backend.createKvState(1, "state1", IntSerializer.INSTANCE,
+					StringSerializer.INSTANCE, null);
+
+			String tableName = "kvstate_" + env.getJobID() + "_1_state1";
+			assertTrue(isTableCreated(backend.getConnection(), tableName));
+
+			assertEquals(0, kv.size());
+
+			// some modifications to the state
+			kv.setCurrentKey(1);
+			assertNull(kv.value());
+			kv.update("1");
+			assertEquals(1, kv.size());
+			kv.setCurrentKey(2);
+			assertNull(kv.value());
+			kv.update("2");
+			assertEquals(2, kv.size());
+			kv.setCurrentKey(1);
+			assertEquals("1", kv.value());
+			assertEquals(2, kv.size());
+
+			kv.shapshot(682375462378L, 100);
+
+			// make some more modifications
+			kv.setCurrentKey(1);
+			kv.update("u1");
+			kv.setCurrentKey(2);
+			kv.update("u2");
+			kv.setCurrentKey(3);
+			kv.update("u3");
+
+			assertTrue(containsKey(backend.getConnection(), tableName, 1, 1));
+
+			kv.notifyCheckpointComplete(682375462378L);
+
+			// draw another snapshot
+			KvStateSnapshot<Integer, String, DbStateBackend> snapshot2 = kv.shapshot(682375462379L,
+					200);
+			assertTrue(containsKey(backend.getConnection(), tableName, 1, 1));
+			assertTrue(containsKey(backend.getConnection(), tableName, 1, 682375462379L));
+			kv.notifyCheckpointComplete(682375462379L);
+			// Compaction should be performed
+			assertFalse(containsKey(backend.getConnection(), tableName, 1, 1));
+			assertTrue(containsKey(backend.getConnection(), tableName, 1, 682375462379L));
+
+			// validate the original state
+			assertEquals(3, kv.size());
+			kv.setCurrentKey(1);
+			assertEquals("u1", kv.value());
+			kv.setCurrentKey(2);
+			assertEquals("u2", kv.value());
+			kv.setCurrentKey(3);
+			assertEquals("u3", kv.value());
+
+			// restore the first snapshot and validate it
+			KvState<Integer, String, DbStateBackend> restored2 = snapshot2.restoreState(backend, IntSerializer.INSTANCE,
+					StringSerializer.INSTANCE, null, getClass().getClassLoader(), 6823754623710L);
+
+			assertEquals(0, restored2.size());
+			restored2.setCurrentKey(1);
+			assertEquals("u1", restored2.value());
+			restored2.setCurrentKey(2);
+			assertEquals("u2", restored2.value());
+			restored2.setCurrentKey(3);
+			assertEquals("u3", restored2.value());
+
+			LazyDbKvState<Integer, String> kv2 = backend2.createKvState(1, "state2", IntSerializer.INSTANCE,
+					StringSerializer.INSTANCE, "a");
+
+			kv2.setCurrentKey(1);
+			kv2.update("c");
+
+			assertEquals("c", kv2.value());
+
+			kv2.update(null);
+			assertEquals("a", kv2.value());
+
+			KvStateSnapshot<Integer, String, DbStateBackend> snapshot3 = kv2.shapshot(682375462380L,
+					400);
+			kv2.notifyCheckpointComplete(682375462380L);
+			try {
+				// Restoring should fail with the wrong backend
+				snapshot3.restoreState(backend, IntSerializer.INSTANCE, StringSerializer.INSTANCE, "a",
+						getClass().getClassLoader(), System.currentTimeMillis());
+				fail();
+			} catch (IOException e) {
+
+			}
+
+			backend.close();
+			backend2.close();
+		} finally {
+			deleteDirectorySilently(tempDir);
+		}
+	}
+
+	@Test
+	public void testCleanupTasks() throws Exception {
+		String url = "jdbc:derby://localhost:1527/" + tempDir.getAbsolutePath() + "/flinkDB1;create=true";
+
+		DbBackendConfig conf = new DbBackendConfig("flink", "flink", Lists.newArrayList(url, url));
+		conf.setDbAdapterClass(DerbyAdapter.class);
+
+		DbStateBackend backend1 = new DbStateBackend(conf);
+		DbStateBackend backend2 = new DbStateBackend(conf);
+		DbStateBackend backend3 = new DbStateBackend(conf);
+		DbStateBackend backend4 = new DbStateBackend(conf);
+		DbStateBackend backend5 = new DbStateBackend(conf);
+
+		backend1.initializeForJob(new DummyEnvironment("test", 5, 0));
+		backend2.initializeForJob(new DummyEnvironment("test", 5, 1));
+		backend3.initializeForJob(new DummyEnvironment("test", 5, 2));
+		backend4.initializeForJob(new DummyEnvironment("test", 5, 3));
+		backend5.initializeForJob(new DummyEnvironment("test", 5, 4));
+
+		assertTrue(backend1.createKvState(1, "a", null, null, null).isCompacter());
+		assertTrue(backend2.createKvState(1, "a", null, null, null).isCompacter());
+		assertFalse(backend3.createKvState(1, "a", null, null, null).isCompacter());
+		assertFalse(backend4.createKvState(1, "a", null, null, null).isCompacter());
+		assertFalse(backend5.createKvState(1, "a", null, null, null).isCompacter());
+	}
+
+	@Test
+	public void testCaching() throws Exception {
+		// We copy the config before setting the caching parameters
+		DbBackendConfig conf = DbStateBackendTest.conf.createConfigForShard(0);
+		conf.setKvCacheSize(3);
+		conf.setMaxKvInsertBatchSize(2);
+
+		// We evict 2 elements when the cache is full
+		conf.setMaxKvCacheEvictFraction(0.6f);
+
+		DbStateBackend backend = new DbStateBackend(conf);
+
+		Environment env = new DummyEnvironment("test", 2, 0);
+
+		backend.initializeForJob(env);
+
+		LazyDbKvState<Integer, String> kv = backend.createKvState(1, "state1", IntSerializer.INSTANCE,
+				StringSerializer.INSTANCE, "a");
+		Map<Integer, Optional<String>> cache = kv.getStateCache();
+		Map<Integer, Optional<String>> modified = kv.getModified();
+
+		assertEquals(0, kv.size());
+
+		// some modifications to the state
+		kv.setCurrentKey(1);
+		assertEquals("a", kv.value());
+
+		kv.update(null);
+		assertEquals(1, kv.size());
+		kv.setCurrentKey(2);
+		assertEquals("a", kv.value());
+		kv.update("2");
+		assertEquals(2, kv.size());
+
+		kv.setCurrentKey(1);
+		assertEquals("a", kv.value());
+
+		kv.setCurrentKey(3);
+		kv.update("3");
+		assertEquals("3", kv.value());
+
+		assertTrue(modified.containsKey(1));
+		assertTrue(modified.containsKey(2));
+		assertTrue(modified.containsKey(3));
+
+		// 1,2 should be evicted as the cache filled
+		kv.setCurrentKey(4);
+		kv.update("4");
+		assertEquals("4", kv.value());
+
+		assertFalse(modified.containsKey(1));
+		assertFalse(modified.containsKey(2));
+		assertTrue(modified.containsKey(3));
+		assertTrue(modified.containsKey(4));
+
+		assertEquals(Optional.of("3"), cache.get(3));
+		assertEquals(Optional.of("4"), cache.get(4));
+		assertFalse(cache.containsKey(1));
+		assertFalse(cache.containsKey(2));
+
+		// draw a snapshot
+		kv.shapshot(682375462378L, 100);
+
+		assertTrue(modified.isEmpty());
+
+		// make some more modifications
+		kv.setCurrentKey(2);
+		assertEquals("2", kv.value());
+		kv.update(null);
+
+		assertTrue(modified.containsKey(2));
+		assertEquals(1, modified.size());
+
+		assertEquals(Optional.of("3"), cache.get(3));
+		assertEquals(Optional.of("4"), cache.get(4));
+		assertEquals(Optional.absent(), cache.get(2));
+		assertFalse(cache.containsKey(1));
+
+		assertTrue(modified.containsKey(2));
+		assertTrue(modified.containsKey(3));
+		assertTrue(modified.containsKey(4));
+
+		// clear cache from initial keys
+
+		kv.setCurrentKey(5);
+		kv.value();
+		kv.setCurrentKey(6);
+		kv.value();
+		kv.setCurrentKey(7);
+		kv.value();
+
+		assertFalse(modified.containsKey(5));
+		assertTrue(modified.containsKey(6));
+		assertTrue(modified.containsKey(7));
+
+		assertFalse(cache.containsKey(1));
+		assertFalse(cache.containsKey(2));
+		assertFalse(cache.containsKey(3));
+		assertFalse(cache.containsKey(4));
+
+		kv.setCurrentKey(2);
+		assertEquals("a", kv.value());
+
+		long checkpointTs = System.currentTimeMillis();
+
+		// Draw a snapshot that we will restore later
+		KvStateSnapshot<Integer, String, DbStateBackend> snapshot1 = kv.shapshot(682375462379L, checkpointTs);
+		assertTrue(modified.isEmpty());
+
+		// Do some updates then draw another snapshot (imitate a partial
+		// failure), these updates should not be visible if we restore snapshot1
+		kv.setCurrentKey(1);
+		kv.update("123");
+		kv.setCurrentKey(3);
+		kv.update("456");
+		kv.setCurrentKey(2);
+		kv.notifyCheckpointComplete(682375462379L);
+		kv.update("2");
+		kv.setCurrentKey(4);
+		kv.update("4");
+		kv.update("5");
+
+		kv.shapshot(6823754623710L, checkpointTs + 10);
+
+		// restore the second snapshot and validate it (we set a new default
+		// value here to make sure that the default wasn't written)
+		KvState<Integer, String, DbStateBackend> restored = snapshot1.restoreState(backend, IntSerializer.INSTANCE,
+				StringSerializer.INSTANCE, "b", getClass().getClassLoader(), 6823754623711L);
+
+		restored.setCurrentKey(1);
+		assertEquals("b", restored.value());
+		restored.setCurrentKey(2);
+		assertEquals("b", restored.value());
+		restored.setCurrentKey(3);
+		assertEquals("3", restored.value());
+		restored.setCurrentKey(4);
+		assertEquals("4", restored.value());
+
+		backend.close();
+	}
+
+	private static boolean isTableCreated(Connection con, String tableName) throws SQLException {
+		return con.getMetaData().getTables(null, null, tableName.toUpperCase(), null).next();
+	}
+
+	private static boolean isTableEmpty(Connection con, String tableName) throws SQLException {
+		try (Statement smt = con.createStatement()) {
+			ResultSet res = smt.executeQuery("select * from " + tableName);
+			return !res.next();
+		}
+	}
+
+	private static boolean containsKey(Connection con, String tableName, int key, long ts)
+			throws SQLException, IOException {
+		try (PreparedStatement smt = con
+				.prepareStatement("select * from " + tableName + " where k=? and id=?")) {
+			smt.setBytes(1, InstantiationUtil.serializeToByteArray(IntSerializer.INSTANCE, key));
+			smt.setLong(2, ts);
+			return smt.executeQuery().next();
+		}
+	}
+
+	private static String localFileUri(File path) {
+		return path.toURI().toString();
+	}
+
+	private static void deleteDirectorySilently(File dir) {
+		try {
+			FileUtils.deleteDirectory(dir);
+		} catch (IOException ignored) {
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
new file mode 100644
index 0000000..f0d6534
--- /dev/null
+++ b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *	http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.contrib.streaming.state;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * Adapter for the Derby JDBC driver which has slightly restricted CREATE TABLE
+ * and SELECT semantics compared to the default assumptions.
+ * 
+ */
+public class DerbyAdapter extends MySqlAdapter {
+
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * We need to override this method as Derby does not support the
+	 * "IF NOT EXISTS" clause at table creation
+	 */
+	@Override
+	public void createCheckpointsTable(String jobId, Connection con) throws SQLException {
+
+		try (Statement smt = con.createStatement()) {
+			smt.executeUpdate(
+					"CREATE TABLE checkpoints_" + jobId
+							+ " ("
+							+ "checkpointId bigint, "
+							+ "timestamp bigint, "
+							+ "handleId bigint,"
+							+ "checkpoint blob,"
+							+ "PRIMARY KEY (handleId)"
+							+ ")");
+		} catch (SQLException se) {
+			if (se.getSQLState().equals("X0Y32")) {
+				// table already created, ignore
+			} else {
+				throw se;
+			}
+		}
+	}
+
+	/**
+	 * We need to override this method as Derby does not support the
+	 * "IF NOT EXISTS" clause at table creation
+	 */
+	@Override
+	public void createKVStateTable(String stateId, Connection con) throws SQLException {
+
+		validateStateId(stateId);
+		try (Statement smt = con.createStatement()) {
+			smt.executeUpdate(
+					"CREATE TABLE kvstate_" + stateId
+							+ " ("
+							+ "id bigint, "
+							+ "k varchar(256) for bit data, "
+							+ "v blob, "
+							+ "PRIMARY KEY (k, id)"
+							+ ")");
+		} catch (SQLException se) {
+			if (se.getSQLState().equals("X0Y32")) {
+				// table already created, ignore
+			} else {
+				throw se;
+			}
+		}
+	}
+
+	/**
+	 * We need to override this method as Derby does not support "LIMIT n" for
+	 * select statements.
+	 */
+	@Override
+	public PreparedStatement prepareKeyLookup(String stateId, Connection con) throws SQLException {
+		validateStateId(stateId);
+		PreparedStatement smt = con.prepareStatement("SELECT v " + "FROM kvstate_" + stateId
+				+ " WHERE k = ? "
+				+ " AND id <= ? "
+				+ "ORDER BY id DESC");
+		smt.setMaxRows(1);
+		return smt;
+	}
+
+	@Override
+	protected void compactKvStates(String stateId, Connection con, long lowerBound, long upperBound)
+			throws SQLException {
+		validateStateId(stateId);
+
+		try (Statement smt = con.createStatement()) {
+			smt.executeUpdate("DELETE FROM kvstate_" + stateId + " t1"
+					+ " WHERE EXISTS"
+					+ " ("
+					+ " 	SELECT * FROM kvstate_" + stateId + " t2"
+					+ " 	WHERE t2.k = t1.k"
+					+ "		AND t2.id > t1.id"
+					+ " 	AND t2.id <=" + upperBound
+					+ "		AND t2.id >= " + lowerBound
+					+ " )");
+		}
+	}
+
+	@Override
+	public PreparedStatement prepareKVCheckpointInsert(String stateId, Connection con) throws SQLException {
+		validateStateId(stateId);
+		return con.prepareStatement(
+				"INSERT INTO kvstate_" + stateId + " (id, k, v) VALUES (?,?,?)");
+	}
+
+	@Override
+	public void insertBatch(final String stateId, final DbBackendConfig conf,
+			final Connection con, final PreparedStatement insertStatement, final long checkpointId,
+			final List<Tuple2<byte[], byte[]>> toInsert) throws IOException {
+
+		try (PreparedStatement smt = con
+				.prepareStatement("UPDATE kvstate_" + stateId + " SET v=? WHERE k=? AND id=?")) {
+			for (final Tuple2<byte[], byte[]> kv : toInsert) {
+				SQLRetrier.retry(new Callable<Void>() {
+					public Void call() throws Exception {
+						try {
+							setKVInsertParams(stateId, insertStatement, checkpointId, kv.f0, kv.f1);
+							insertStatement.executeUpdate();
+						} catch (SQLException e) {
+							if (kv.f0 != null) {
+								smt.setBytes(1, kv.f1);
+							} else {
+								smt.setNull(1, Types.BLOB);
+							}
+							smt.setBytes(2, kv.f0);
+							smt.setLong(3, checkpointId);
+							smt.executeUpdate();
+						}
+						return null;
+					}
+				}, conf.getMaxNumberOfSqlRetries(), conf.getSleepBetweenSqlRetries());
+
+			}
+		} catch (SQLException e) {
+			throw new IOException(e);
+		}
+	}
+
+	private void setKVInsertParams(String stateId, PreparedStatement insertStatement, long checkpointId,
+			byte[] key, byte[] value) throws SQLException {
+		insertStatement.setLong(1, checkpointId);
+		insertStatement.setBytes(2, key);
+		if (value != null) {
+			insertStatement.setBytes(3, value);
+		} else {
+			insertStatement.setNull(3, Types.BLOB);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index fdb59d9..791b5e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -489,14 +489,16 @@ public class CheckpointCoordinator {
 					return;
 				}
 			}
+			
+			long nextId = checkpointIdCounter.get();
 
 			if (allOrNothingState) {
 				Map<ExecutionJobVertex, Integer> stateCounts = new HashMap<ExecutionJobVertex, Integer>();
-
+				
 				for (StateForTask state : latest.getStates()) {
 					ExecutionJobVertex vertex = tasks.get(state.getOperatorId());
 					Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt();
-					exec.setInitialState(state.getState());
+					exec.setInitialState(state.getState(), nextId);
 
 					Integer count = stateCounts.get(vertex);
 					if (count != null) {
@@ -519,7 +521,7 @@ public class CheckpointCoordinator {
 				for (StateForTask state : latest.getStates()) {
 					ExecutionJobVertex vertex = tasks.get(state.getOperatorId());
 					Execution exec = vertex.getTaskVertices()[state.getSubtask()].getCurrentExecutionAttempt();
-					exec.setInitialState(state.getState());
+					exec.setInitialState(state.getState(), nextId);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
index 34b7946..cf1b91d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
@@ -39,5 +39,7 @@ public interface CheckpointIDCounter {
 	 * @return The previous checkpoint ID
 	 */
 	long getAndIncrement() throws Exception;
+	
+	long get() throws Exception;
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index 052d743..7ba9bfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -44,4 +44,9 @@ public class StandaloneCheckpointIDCounter implements CheckpointIDCounter {
 	public long getAndIncrement() throws Exception {
 		return checkpointIdCounter.getAndIncrement();
 	}
+	
+	@Override
+	public long get() throws Exception {
+		return checkpointIdCounter.get();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
index 6673050..22bce4d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -107,6 +107,17 @@ public class ZooKeeperCheckpointIDCounter implements CheckpointIDCounter {
 			}
 		}
 	}
+	
+	@Override
+	public long get() throws Exception {
+		ConnectionState connState = connStateListener.getLastState();
+
+		if (connState != null) {
+			throw new IllegalStateException("Connection state: " + connState);
+		}
+
+		return sharedCount.getVersionedValue().getValue();
+	}
 
 	/**
 	 * Connection state listener. In case of {@link ConnectionState#SUSPENDED} or {@link

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 558fcd0..b59c515 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -83,7 +83,9 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	private final List<URL> requiredClasspaths;
 
 	private final SerializedValue<StateHandle<?>> operatorState;
-	
+
+	private long nextCpId;
+		
 	/**
 	 * Constructs a task deployment descriptor.
 	 */
@@ -94,7 +96,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 			List<ResultPartitionDeploymentDescriptor> producedPartitions,
 			List<InputGateDeploymentDescriptor> inputGates,
 			List<BlobKey> requiredJarFiles, List<URL> requiredClasspaths,
-			int targetSlotNumber, SerializedValue<StateHandle<?>> operatorState) {
+			int targetSlotNumber, SerializedValue<StateHandle<?>> operatorState, long nextCpId) {
 
 		checkArgument(indexInSubtaskGroup >= 0);
 		checkArgument(numberOfSubtasks > indexInSubtaskGroup);
@@ -115,6 +117,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		this.requiredClasspaths = checkNotNull(requiredClasspaths);
 		this.targetSlotNumber = targetSlotNumber;
 		this.operatorState = operatorState;
+		this.nextCpId = nextCpId;
 	}
 
 	public TaskDeploymentDescriptor(
@@ -128,7 +131,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 
 		this(jobID, vertexID, executionId, taskName, indexInSubtaskGroup, numberOfSubtasks,
 				jobConfiguration, taskConfiguration, invokableClassName, producedPartitions,
-				inputGates, requiredJarFiles, requiredClasspaths, targetSlotNumber, null);
+				inputGates, requiredJarFiles, requiredClasspaths, targetSlotNumber, null, -1);
 	}
 
 	/**
@@ -245,4 +248,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	public SerializedValue<StateHandle<?>> getOperatorState() {
 		return operatorState;
 	}
+	
+	public long getNextCpId() {
+		return nextCpId;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index faabfb3..72959c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -135,6 +135,8 @@ public class Execution implements Serializable {
 	private volatile InstanceConnectionInfo assignedResourceLocation; // for the archived execution
 	
 	private SerializedValue<StateHandle<?>> operatorState;
+	
+	private long nextCpId;
 
 	/** The execution context which is used to execute futures. */
 	@SuppressWarnings("NonSerializableFieldInSerializableClass")
@@ -231,11 +233,12 @@ public class Execution implements Serializable {
 		partialInputChannelDeploymentDescriptors = null;
 	}
 	
-	public void setInitialState(SerializedValue<StateHandle<?>> initialState) {
+	public void setInitialState(SerializedValue<StateHandle<?>> initialState, long nextCpId) {
 		if (state != ExecutionState.CREATED) {
 			throw new IllegalArgumentException("Can only assign operator state when execution attempt is in CREATED");
 		}
 		this.operatorState = initialState;
+		this.nextCpId = nextCpId;
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -359,7 +362,7 @@ public class Execution implements Serializable {
 						attemptNumber, slot.getInstance().getInstanceConnectionInfo().getHostname()));
 			}
 			
-			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot, operatorState);
+			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(attemptId, slot, operatorState, nextCpId);
 			
 			// register this execution at the execution graph, to receive call backs
 			vertex.getExecutionGraph().registerExecution(this);

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 1e5d02c..d10aac1 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -811,6 +811,7 @@ public class ExecutionGraph implements Serializable {
 	 *
 	 * <p>The recovery of checkpoints might block. Make sure that calls to this method don't
 	 * block the job manager actor and run asynchronously.
+	 * 
 	 */
 	public void restoreLatestCheckpointedState() throws Exception {
 		synchronized (progressLock) {

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6a63528..fba5652 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -616,7 +616,8 @@ public class ExecutionVertex implements Serializable {
 	TaskDeploymentDescriptor createDeploymentDescriptor(
 			ExecutionAttemptID executionId,
 			SimpleSlot targetSlot,
-			SerializedValue<StateHandle<?>> operatorState) {
+			SerializedValue<StateHandle<?>> operatorState,
+			long recoveryTimestamp) {
 
 		// Produced intermediate results
 		List<ResultPartitionDeploymentDescriptor> producedPartitions = new ArrayList<ResultPartitionDeploymentDescriptor>(resultPartitions.size());
@@ -651,7 +652,7 @@ public class ExecutionVertex implements Serializable {
 				subTaskIndex, getTotalNumberOfParallelSubtasks(), getExecutionGraph().getJobConfiguration(),
 				jobVertex.getJobVertex().getConfiguration(), jobVertex.getJobVertex().getInvokableClassName(),
 				producedPartitions, consumedPartitions, jarFiles, classpaths, targetSlot.getRoot().getSlotNumber(),
-				operatorState);
+				operatorState, recoveryTimestamp);
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 894e6d9..2d990b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -31,8 +31,9 @@ public interface StatefulTask<T extends StateHandle<?>> {
 	 * a snapshot of the state from a previous execution.
 	 * 
 	 * @param stateHandle The handle to the state.
+	 * @param nextCheckpointId Next checkpoint id.
 	 */
-	void setInitialState(T stateHandle) throws Exception;
+	void setInitialState(T stateHandle, long nextCheckpointId) throws Exception;
 
 	/**
 	 * This method is either called directly and asynchronously by the checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
index 3d6c56c..efc600f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KvStateSnapshot.java
@@ -57,7 +57,8 @@ public interface KvStateSnapshot<K, V, Backend extends StateBackend<Backend>> ex
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<V> valueSerializer,
 			V defaultValue,
-			ClassLoader classLoader) throws Exception;
+			ClassLoader classLoader,
+			long nextCheckpointId) throws Exception;
 
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
index 88b0d18..c9a3b55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateUtils.java
@@ -38,17 +38,19 @@ public class StateUtils {
 	 *            The state carrier operator.
 	 * @param state
 	 *            The state handle.
+	 * @param nextCheckpointId
+	 *            Next checkpoint id
 	 * @param <T>
 	 *            Type bound for the
 	 */
 	public static <T extends StateHandle<?>> void setOperatorState(StatefulTask<?> op,
-			StateHandle<?> state) throws Exception {
+			StateHandle<?> state, long nextCheckpointId) throws Exception {
 		@SuppressWarnings("unchecked")
 		StatefulTask<T> typedOp = (StatefulTask<T>) op;
 		@SuppressWarnings("unchecked")
 		T typedHandle = (T) state;
 
-		typedOp.setInitialState(typedHandle);
+		typedOp.setInitialState(typedHandle, nextCheckpointId);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
index 781ee3d..c328e3f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsHeapKvStateSnapshot.java
@@ -62,7 +62,8 @@ public class FsHeapKvStateSnapshot<K, V> extends AbstractFileState implements Kv
 			final TypeSerializer<K> keySerializer,
 			final TypeSerializer<V> valueSerializer,
 			V defaultValue,
-			ClassLoader classLoader) throws Exception {
+			ClassLoader classLoader,
+			long nextCpId) throws Exception {
 
 		// validity checks
 		if (!keySerializer.getClass().getName().equals(keySerializerClassName) ||

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java
index 1b03def..13b1e4e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryHeapKvStateSnapshot.java
@@ -70,7 +70,8 @@ public class MemoryHeapKvStateSnapshot<K, V> implements KvStateSnapshot<K, V, Me
 			final TypeSerializer<K> keySerializer,
 			final TypeSerializer<V> valueSerializer,
 			V defaultValue,
-			ClassLoader classLoader) throws Exception {
+			ClassLoader classLoader,
+			long nextCpId) throws Exception {
 
 		// validity checks
 		if (!keySerializer.getClass().getName().equals(keySerializerClassName) ||

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index c8d50c7..efba325 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -219,6 +219,8 @@ public class Task implements Runnable {
 	 * initialization, to be memory friendly */
 	private volatile SerializedValue<StateHandle<?>> operatorState;
 
+	private volatile long recoveryTimestamp;
+
 	/**
 	 * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to 
 	 * be undone in the case of a failing task deployment.</p>
@@ -252,6 +254,7 @@ public class Task implements Runnable {
 		this.requiredClasspaths = checkNotNull(tdd.getRequiredClasspaths());
 		this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName());
 		this.operatorState = tdd.getOperatorState();
+		this.recoveryTimestamp = tdd.getNextCpId();
 
 		this.memoryManager = checkNotNull(memManager);
 		this.ioManager = checkNotNull(ioManager);
@@ -535,13 +538,14 @@ public class Task implements Runnable {
 
 			// get our private reference onto the stack (be safe against concurrent changes) 
 			SerializedValue<StateHandle<?>> operatorState = this.operatorState;
+			long recoveryTimestamp = this.recoveryTimestamp;
 
 			if (operatorState != null) {
 				if (invokable instanceof StatefulTask) {
 					try {
 						StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader);
 						StatefulTask<?> op = (StatefulTask<?>) invokable;
-						StateUtils.setOperatorState(op, state);
+						StateUtils.setOperatorState(op, state, recoveryTimestamp);
 					}
 					catch (Exception e) {
 						throw new RuntimeException("Failed to deserialize state handle and setup initial operator state.", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 32c15bf..7b2c2d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -45,25 +45,25 @@ import static org.mockito.Mockito.*;
  * Tests concerning the restoring of state from a checkpoint to the task executions.
  */
 public class CheckpointStateRestoreTest {
-	
+
 	private static final ClassLoader cl = Thread.currentThread().getContextClassLoader();
-	
+
 	@Test
 	public void testSetState() {
 		try {
 			final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>(
 					new LocalStateHandle<SerializableObject>(new SerializableObject()));
-			
+
 			final JobID jid = new JobID();
 			final JobVertexID statefulId = new JobVertexID();
 			final JobVertexID statelessId = new JobVertexID();
-			
+
 			Execution statefulExec1 = mockExecution();
 			Execution statefulExec2 = mockExecution();
 			Execution statefulExec3 = mockExecution();
 			Execution statelessExec1 = mockExecution();
 			Execution statelessExec2 = mockExecution();
-			
+
 			ExecutionVertex stateful1 = mockExecutionVertex(statefulExec1, statefulId, 0);
 			ExecutionVertex stateful2 = mockExecutionVertex(statefulExec2, statefulId, 1);
 			ExecutionVertex stateful3 = mockExecutionVertex(statefulExec3, statefulId, 2);
@@ -74,44 +74,44 @@ public class CheckpointStateRestoreTest {
 					new ExecutionVertex[] { stateful1, stateful2, stateful3 });
 			ExecutionJobVertex stateless = mockExecutionJobVertex(statelessId,
 					new ExecutionVertex[] { stateless1, stateless2 });
-			
+
 			Map<JobVertexID, ExecutionJobVertex> map = new HashMap<JobVertexID, ExecutionJobVertex>();
 			map.put(statefulId, stateful);
 			map.put(statelessId, stateless);
-			
-			
+
+
 			CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L,
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[0], cl,
 					new StandaloneCheckpointIDCounter(),
 					new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
-			
+
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
 			coord.triggerCheckpoint(timestamp);
-			
+
 			PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next();
 			final long checkpointId = pending.getCheckpointId();
-			
+
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, serializedState));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
-			
+
 			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
-			
+
 			// let the coordinator inject the state
 			coord.restoreLatestCheckpointedState(map, true, false);
-			
+
 			// verify that each stateful vertex got the state
-			verify(statefulExec1, times(1)).setInitialState(serializedState);
-			verify(statefulExec2, times(1)).setInitialState(serializedState);
-			verify(statefulExec3, times(1)).setInitialState(serializedState);
-			verify(statelessExec1, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any());
-			verify(statelessExec2, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any());
+			verify(statefulExec1, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.anyLong());
+			verify(statefulExec2, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.anyLong());
+			verify(statefulExec3, times(1)).setInitialState(Mockito.eq(serializedState), Mockito.anyLong());
+			verify(statelessExec1, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.anyLong());
+			verify(statelessExec2, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), Mockito.anyLong());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -189,7 +189,7 @@ public class CheckpointStateRestoreTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	@Test
 	public void testNoCheckpointAvailable() {
 		try {
@@ -213,20 +213,20 @@ public class CheckpointStateRestoreTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 
 	private Execution mockExecution() {
 		return mockExecution(ExecutionState.RUNNING);
 	}
-	
+
 	private Execution mockExecution(ExecutionState state) {
 		Execution mock = mock(Execution.class);
 		when(mock.getAttemptId()).thenReturn(new ExecutionAttemptID());
 		when(mock.getState()).thenReturn(state);
 		return mock;
 	}
-	
+
 	private ExecutionVertex mockExecutionVertex(Execution execution, JobVertexID vertexId, int subtask) {
 		ExecutionVertex mock = mock(ExecutionVertex.class);
 		when(mock.getJobvertexId()).thenReturn(vertexId);
@@ -234,7 +234,7 @@ public class CheckpointStateRestoreTest {
 		when(mock.getCurrentExecutionAttempt()).thenReturn(execution);
 		return mock;
 	}
-	
+
 	private ExecutionJobVertex mockExecutionJobVertex(JobVertexID id, ExecutionVertex[] vertices) {
 		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
 		when(vertex.getParallelism()).thenReturn(vertices.length);

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index fc5d8c8..4f10acd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -279,7 +279,7 @@ public class FileStateBackendTest {
 
 			// restore the first snapshot and validate it
 			KvState<Integer, String, FsStateBackend> restored1 = snapshot1.restoreState(backend,
-					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader(), 1);
 
 			assertEquals(2, restored1.size());
 			restored1.setCurrentKey(1);
@@ -289,7 +289,7 @@ public class FileStateBackendTest {
 
 			// restore the first snapshot and validate it
 			KvState<Integer, String, FsStateBackend> restored2 = snapshot2.restoreState(backend,
-					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader(), 1);
 
 			assertEquals(3, restored2.size());
 			restored2.setCurrentKey(1);
@@ -345,7 +345,7 @@ public class FileStateBackendTest {
 
 			try {
 				snapshot.restoreState(backend, fakeIntSerializer,
-						StringSerializer.INSTANCE, null, getClass().getClassLoader());
+						StringSerializer.INSTANCE, null, getClass().getClassLoader(), 1);
 				fail("should recognize wrong serializers");
 			} catch (IllegalArgumentException e) {
 				// expected
@@ -355,7 +355,7 @@ public class FileStateBackendTest {
 
 			try {
 				snapshot.restoreState(backend, IntSerializer.INSTANCE,
-						fakeStringSerializer, null, getClass().getClassLoader());
+						fakeStringSerializer, null, getClass().getClassLoader(), 1);
 				fail("should recognize wrong serializers");
 			} catch (IllegalArgumentException e) {
 				// expected
@@ -365,7 +365,7 @@ public class FileStateBackendTest {
 
 			try {
 				snapshot.restoreState(backend, fakeIntSerializer,
-						fakeStringSerializer, null, getClass().getClassLoader());
+						fakeStringSerializer, null, getClass().getClassLoader(), 1);
 				fail("should recognize wrong serializers");
 			} catch (IllegalArgumentException e) {
 				// expected

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index ae027e3..faa2cfa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -191,7 +191,7 @@ public class MemoryStateBackendTest {
 
 			// restore the first snapshot and validate it
 			KvState<Integer, String, MemoryStateBackend> restored1 = snapshot1.restoreState(backend,
-							IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+							IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader(), 1);
 
 			assertEquals(2, restored1.size());
 			restored1.setCurrentKey(1);
@@ -201,7 +201,7 @@ public class MemoryStateBackendTest {
 
 			// restore the first snapshot and validate it
 			KvState<Integer, String, MemoryStateBackend> restored2 = snapshot2.restoreState(backend,
-					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader());
+					IntSerializer.INSTANCE, StringSerializer.INSTANCE, null, getClass().getClassLoader(), 1);
 
 			assertEquals(3, restored2.size());
 			restored2.setCurrentKey(1);
@@ -243,7 +243,7 @@ public class MemoryStateBackendTest {
 
 			try {
 				snapshot.restoreState(backend, fakeIntSerializer,
-						StringSerializer.INSTANCE, null, getClass().getClassLoader());
+						StringSerializer.INSTANCE, null, getClass().getClassLoader(), 1);
 				fail("should recognize wrong serializers");
 			} catch (IllegalArgumentException e) {
 				// expected
@@ -253,7 +253,7 @@ public class MemoryStateBackendTest {
 
 			try {
 				snapshot.restoreState(backend, IntSerializer.INSTANCE,
-						fakeStringSerializer, null, getClass().getClassLoader());
+						fakeStringSerializer, null, getClass().getClassLoader(), 1);
 				fail("should recognize wrong serializers");
 			} catch (IllegalArgumentException e) {
 				// expected
@@ -263,7 +263,7 @@ public class MemoryStateBackendTest {
 
 			try {
 				snapshot.restoreState(backend, fakeIntSerializer,
-						fakeStringSerializer, null, getClass().getClassLoader());
+						fakeStringSerializer, null, getClass().getClassLoader(), 1);
 				fail("should recognize wrong serializers");
 			} catch (IllegalArgumentException e) {
 				// expected

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index fff6e70..85f8be5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -198,7 +198,7 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
-		public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
+		public void setInitialState(StateHandle<Serializable> stateHandle, long ts) throws Exception {
 
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 6c1f1ba..2122800 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.OperatorState;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.runtime.state.KvState;
 import org.apache.flink.runtime.state.KvStateSnapshot;
@@ -93,6 +94,8 @@ public abstract class AbstractStreamOperator<OUT>
 	private transient TypeSerializer<?> keySerializer;
 	
 	private transient HashMap<String, KvStateSnapshot<?, ?, ?>> keyValueStateSnapshots;
+
+	private long recoveryTimsetamp;
 	
 	// ------------------------------------------------------------------------
 	//  Life Cycle
@@ -172,15 +175,23 @@ public abstract class AbstractStreamOperator<OUT>
 	}
 	
 	@Override
-	public void restoreState(StreamTaskState state) throws Exception {
+	public void restoreState(StreamTaskState state, long recoceryTimestamp) throws Exception {
 		// restore the key/value state. the actual restore happens lazily, when the function requests
 		// the state again, because the restore method needs information provided by the user function
 		keyValueStateSnapshots = state.getKvStates();
+		this.recoveryTimsetamp = recoceryTimestamp;
 	}
 	
 	@Override
 	public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
-		// by default, nothing needs a notification of checkpoint completion
+		// We check whether the KvStates require notifications
+		if (keyValueStates != null) {
+			for (KvState<?, ?, ?> kvstate : keyValueStates) {
+				if (kvstate instanceof CheckpointNotifier) {
+					((CheckpointNotifier) kvstate).notifyCheckpointComplete(checkpointId);
+				}
+			}
+		}
 	}
 
 	// ------------------------------------------------------------------------
@@ -269,7 +280,7 @@ public abstract class AbstractStreamOperator<OUT>
 	 * @throws IllegalStateException Thrown, if the key/value state was already initialized.
 	 * @throws Exception Thrown, if the state backend cannot create the key/value state.
 	 */
-	@SuppressWarnings({"rawtypes", "unchecked"})
+	@SuppressWarnings("unchecked")
 	protected <K, V, Backend extends StateBackend<Backend>> OperatorState<V> createKeyValueState(
 			String name, TypeSerializer<V> valueSerializer, V defaultValue) throws Exception
 	{
@@ -304,19 +315,17 @@ public abstract class AbstractStreamOperator<OUT>
 			throw new RuntimeException();
 		}
 		
-		@SuppressWarnings("unchecked")
 		Backend stateBackend = (Backend) container.getStateBackend();
 
 		KvState<K, V, Backend> kvstate = null;
 		
 		// check whether we restore the key/value state from a snapshot, or create a new blank one
 		if (keyValueStateSnapshots != null) {
-			@SuppressWarnings("unchecked")
 			KvStateSnapshot<K, V, Backend> snapshot = (KvStateSnapshot<K, V, Backend>) keyValueStateSnapshots.remove(name);
 
 			if (snapshot != null) {
 				kvstate = snapshot.restoreState(
-						stateBackend, keySerializer, valueSerializer, defaultValue, getUserCodeClassloader());
+						stateBackend, keySerializer, valueSerializer, defaultValue, getUserCodeClassloader(), recoveryTimsetamp);
 			}
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 32be2ba..93c541a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -147,8 +147,8 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends
 	}
 
 	@Override
-	public void restoreState(StreamTaskState state) throws Exception {
-		super.restoreState(state);
+	public void restoreState(StreamTaskState state, long nextCpId) throws Exception {
+		super.restoreState(state, nextCpId);
 		
 		StateHandle<Serializable> stateHandle =  state.getFunctionState();
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index fac26f1..9da3c04 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -112,11 +112,13 @@ public interface StreamOperator<OUT> extends Serializable {
 	 *
 	 * @param state The state of operator that was snapshotted as part of checkpoint
 	 *              from which the execution is restored.
+	 * 
+	 * @param nextCpId Next checkpoint id
 	 *
 	 * @throws Exception Exceptions during state restore should be forwarded, so that the system can
 	 *                   properly react to failed state restore and fail the execution attempt.
 	 */
-	void restoreState(StreamTaskState state) throws Exception;
+	void restoreState(StreamTaskState state, long nextCpId) throws Exception;
 
 	/**
 	 * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index 90d3d82..2959d88 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -264,8 +264,8 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
 	}
 
 	@Override
-	public void restoreState(StreamTaskState taskState) throws Exception {
-		super.restoreState(taskState);
+	public void restoreState(StreamTaskState taskState, long nextCpId) throws Exception {
+		super.restoreState(taskState, nextCpId);
 
 		@SuppressWarnings("unchecked")
 		StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState();

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 5e4dea7..7fc3a35 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -536,8 +536,8 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void restoreState(StreamTaskState taskState) throws Exception {
-		super.restoreState(taskState);
+	public void restoreState(StreamTaskState taskState, long nextCpId) throws Exception {
+		super.restoreState(taskState, nextCpId);
 
 		final ClassLoader userClassloader = getUserCodeClassloader();
 		@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index f19e760..7bd6705 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -609,8 +609,8 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	@Override
-	public void restoreState(StreamTaskState taskState) throws Exception {
-		super.restoreState(taskState);
+	public void restoreState(StreamTaskState taskState, long nextCpId) throws Exception {
+		super.restoreState(taskState, nextCpId);
 
 		final ClassLoader userClassloader = getUserCodeClassloader();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index ed7182d..1de6ff9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -138,6 +139,8 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	/** Flag to mark the task "in operation", in which case check
 	 * needs to be initialized to true, so that early cancel() before invoke() behaves correctly */
 	private volatile boolean isRunning;
+
+	private long nextCpId;
 	
 
 	// ------------------------------------------------------------------------
@@ -382,8 +385,9 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	// ------------------------------------------------------------------------
 	
 	@Override
-	public void setInitialState(StreamTaskStateList initialState) {
+	public void setInitialState(StreamTaskStateList initialState, long nextCpId) {
 		lazyRestoreState = initialState;
+		this.nextCpId = nextCpId;
 	}
 	
 	public void restoreStateLazy() throws Exception {
@@ -403,7 +407,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					
 					if (state != null && operator != null) {
 						LOG.debug("Task {} in chain ({}) has checkpointed state", i, getName());
-						operator.restoreState(state);
+						operator.restoreState(state, nextCpId);
 					}
 					else if (operator != null) {
 						LOG.debug("Task {} in chain ({}) does not have checkpointed state", i, getName());
@@ -464,6 +468,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			if (isRunning) {
 				LOG.debug("Notification of complete checkpoint for task {}", getName());
 				
+				// We first notify the state backend if necessary
+				if (stateBackend instanceof CheckpointNotifier) {
+					((CheckpointNotifier) stateBackend).notifyCheckpointComplete(checkpointId);
+				}
+				
 				for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
 					if (operator != null) {
 						operator.notifyOfCompletedCheckpoint(checkpointId);

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
index 62eb268..63cbd6a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AccumulatingAlignedProcessingTimeWindowOperatorTest.java
@@ -595,7 +595,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 							windowSize, windowSize);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
-			op.restoreState(state);
+			op.restoreState(state, 1);
 			op.open();
 
 			// inject some more elements
@@ -694,7 +694,7 @@ public class AccumulatingAlignedProcessingTimeWindowOperatorTest {
 					windowSize, windowSlide);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
-			op.restoreState(state);
+			op.restoreState(state, 1);
 			op.open();
 			
 

http://git-wip-us.apache.org/repos/asf/flink/blob/75a7c4b1/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
index 4d507fb..55cd9fe 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/AggregatingAlignedProcessingTimeWindowOperatorTest.java
@@ -653,7 +653,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					windowSize, windowSize);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
-			op.restoreState(state);
+			op.restoreState(state, 1);
 			op.open();
 
 			// inject the remaining elements
@@ -759,7 +759,7 @@ public class AggregatingAlignedProcessingTimeWindowOperatorTest {
 					windowSize, windowSlide);
 
 			op.setup(mockTask, new StreamConfig(new Configuration()), out2);
-			op.restoreState(state);
+			op.restoreState(state, 1);
 			op.open();
 
 


Mime
View raw message