flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/3] flink git commit: [FLINK-6390] [checkpoints] Add API for checkpoints that are triggered via external systems
Date Wed, 26 Apr 2017 21:42:47 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
index 38130d4..3dd037e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.jobgraph.tasks;
 
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.StateBackend;
 
@@ -32,21 +33,21 @@ import static java.util.Objects.requireNonNull;
  * need to participate.
  */
 public class JobCheckpointingSettings implements java.io.Serializable {
-	
+
 	private static final long serialVersionUID = -2593319571078198180L;
-	
+
 	private final List<JobVertexID> verticesToTrigger;
 
 	private final List<JobVertexID> verticesToAcknowledge;
 
 	private final List<JobVertexID> verticesToConfirm;
-	
+
 	private final long checkpointInterval;
-	
+
 	private final long checkpointTimeout;
-	
+
 	private final long minPauseBetweenCheckpoints;
-	
+
 	private final int maxConcurrentCheckpoints;
 
 	/** Settings for externalized checkpoints. */
@@ -56,6 +57,9 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 	@Nullable
 	private final StateBackend defaultStateBackend;
 
+	/** (Factories for) hooks that are executed on the checkpoint coordinator */
+	private final MasterTriggerRestoreHook.Factory[] masterHooks;
+
 	/**
 	 * Flag indicating whether exactly once checkpoint mode has been configured.
 	 * If <code>false</code>, at least once mode has been configured. This is
@@ -77,12 +81,30 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 			@Nullable StateBackend defaultStateBackend,
 			boolean isExactlyOnce) {
 
+		this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm,
+				checkpointInterval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints,
+				externalizedCheckpointSettings, defaultStateBackend, null, isExactlyOnce);
+	}
+
+	public JobCheckpointingSettings(
+			List<JobVertexID> verticesToTrigger,
+			List<JobVertexID> verticesToAcknowledge,
+			List<JobVertexID> verticesToConfirm,
+			long checkpointInterval,
+			long checkpointTimeout,
+			long minPauseBetweenCheckpoints,
+			int maxConcurrentCheckpoints,
+			ExternalizedCheckpointSettings externalizedCheckpointSettings,
+			@Nullable StateBackend defaultStateBackend,
+			@Nullable MasterTriggerRestoreHook.Factory[] masterHooks,
+			boolean isExactlyOnce) {
+
 		// sanity checks
 		if (checkpointInterval < 1 || checkpointTimeout < 1 ||
 				minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) {
 			throw new IllegalArgumentException();
 		}
-		
+
 		this.verticesToTrigger = requireNonNull(verticesToTrigger);
 		this.verticesToAcknowledge = requireNonNull(verticesToAcknowledge);
 		this.verticesToConfirm = requireNonNull(verticesToConfirm);
@@ -93,14 +115,16 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 		this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings);
 		this.defaultStateBackend = defaultStateBackend;
 		this.isExactlyOnce = isExactlyOnce;
+
+		this.masterHooks = masterHooks != null ? masterHooks : new MasterTriggerRestoreHook.Factory[0];
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	public List<JobVertexID> getVerticesToTrigger() {
 		return verticesToTrigger;
 	}
-	
+
 	public List<JobVertexID> getVerticesToAcknowledge() {
 		return verticesToAcknowledge;
 	}
@@ -134,12 +158,16 @@ public class JobCheckpointingSettings implements java.io.Serializable {
 		return defaultStateBackend;
 	}
 
+	public MasterTriggerRestoreHook.Factory[] getMasterHooks() {
+		return masterHooks;
+	}
+
 	public boolean isExactlyOnce() {
 		return isExactlyOnce;
 	}
 
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return String.format("SnapshotSettings: interval=%d, timeout=%d, pause-between=%d, " +

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
new file mode 100644
index 0000000..0ec4606
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mockExecutionVertex;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the user-defined hooks that the checkpoint coordinator can call.
+ */
+public class CheckpointCoordinatorMasterHooksTest {
+
+	// ------------------------------------------------------------------------
+	//  hook registration
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method tests that hooks with the same identifier are not registered
+	 * multiple times.
+	 */
+	@Test
+	public void testDeduplicateOnRegister() {
+		final CheckpointCoordinator cc = instantiateCheckpointCoordinator(new JobID());
+
+		MasterTriggerRestoreHook<?> hook1 = mock(MasterTriggerRestoreHook.class);
+		when(hook1.getIdentifier()).thenReturn("test id");
+
+		MasterTriggerRestoreHook<?> hook2 = mock(MasterTriggerRestoreHook.class);
+		when(hook2.getIdentifier()).thenReturn("test id");
+
+		MasterTriggerRestoreHook<?> hook3 = mock(MasterTriggerRestoreHook.class);
+		when(hook3.getIdentifier()).thenReturn("anotherId");
+
+		assertTrue(cc.addMasterHook(hook1));
+		assertFalse(cc.addMasterHook(hook2));
+		assertTrue(cc.addMasterHook(hook3));
+	}
+
+	/**
+	 * Test that validates correct exceptions when supplying hooks with invalid IDs.
+	 */
+	@Test
+	public void testNullOrInvalidId() {
+		final CheckpointCoordinator cc = instantiateCheckpointCoordinator(new JobID());
+
+		try {
+			cc.addMasterHook(null);
+			fail("expected an exception");
+		} catch (NullPointerException ignored) {}
+
+		try {
+			cc.addMasterHook(mock(MasterTriggerRestoreHook.class));
+			fail("expected an exception");
+		} catch (IllegalArgumentException ignored) {}
+
+		try {
+			MasterTriggerRestoreHook<?> hook = mock(MasterTriggerRestoreHook.class);
+			when(hook.getIdentifier()).thenReturn("        ");
+
+			cc.addMasterHook(hook);
+			fail("expected an exception");
+		} catch (IllegalArgumentException ignored) {}
+	}
+
+	// ------------------------------------------------------------------------
+	//  trigger / restore behavior
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testHooksAreCalledOnTrigger() throws Exception {
+		final String id1 = "id1";
+		final String id2 = "id2";
+
+		final String state1 = "the-test-string-state";
+		final byte[] state1serialized = new StringSerializer().serialize(state1);
+
+		final long state2 = 987654321L;
+		final byte[] state2serialized = new LongSerializer().serialize(state2);
+
+		final MasterTriggerRestoreHook<String> statefulHook1 = mockGeneric(MasterTriggerRestoreHook.class);
+		when(statefulHook1.getIdentifier()).thenReturn(id1);
+		when(statefulHook1.createCheckpointDataSerializer()).thenReturn(new StringSerializer());
+		when(statefulHook1.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class)))
+				.thenReturn(FlinkCompletableFuture.completed(state1));
+
+		final MasterTriggerRestoreHook<Long> statefulHook2 = mockGeneric(MasterTriggerRestoreHook.class);
+		when(statefulHook2.getIdentifier()).thenReturn(id2);
+		when(statefulHook2.createCheckpointDataSerializer()).thenReturn(new LongSerializer());
+		when(statefulHook2.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class)))
+				.thenReturn(FlinkCompletableFuture.completed(state2));
+
+		final MasterTriggerRestoreHook<Void> statelessHook = mockGeneric(MasterTriggerRestoreHook.class);
+		when(statelessHook.getIdentifier()).thenReturn("some-id");
+
+		// create the checkpoint coordinator
+		final JobID jid = new JobID();
+		final ExecutionAttemptID execId = new ExecutionAttemptID();
+		final ExecutionVertex ackVertex = mockExecutionVertex(execId);
+		final CheckpointCoordinator cc = instantiateCheckpointCoordinator(jid, ackVertex);
+
+		cc.addMasterHook(statefulHook1);
+		cc.addMasterHook(statelessHook);
+		cc.addMasterHook(statefulHook2);
+
+		// trigger a checkpoint
+		assertTrue(cc.triggerCheckpoint(System.currentTimeMillis(), false));
+		assertEquals(1, cc.getNumberOfPendingCheckpoints());
+
+		verify(statefulHook1, times(1)).triggerCheckpoint(anyLong(), anyLong(), any(Executor.class));
+		verify(statefulHook2, times(1)).triggerCheckpoint(anyLong(), anyLong(), any(Executor.class));
+		verify(statelessHook, times(1)).triggerCheckpoint(anyLong(), anyLong(), any(Executor.class));
+
+		final long checkpointId = cc.getPendingCheckpoints().values().iterator().next().getCheckpointId();
+		cc.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, execId, checkpointId));
+		assertEquals(0, cc.getNumberOfPendingCheckpoints());
+
+		assertEquals(1, cc.getNumberOfRetainedSuccessfulCheckpoints());
+		final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint();
+
+		final Collection<MasterState> masterStates = chk.getMasterHookStates();
+		assertEquals(2, masterStates.size());
+
+		for (MasterState ms : masterStates) {
+			if (ms.name().equals(id1)) {
+				assertArrayEquals(state1serialized, ms.bytes());
+				assertEquals(StringSerializer.VERSION, ms.version());
+			}
+			else if (ms.name().equals(id2)) {
+				assertArrayEquals(state2serialized, ms.bytes());
+				assertEquals(LongSerializer.VERSION, ms.version());
+			}
+			else {
+				fail("unrecognized state name: " + ms.name());
+			}
+		}
+	}
+
+	@Test
+	public void testHooksAreCalledOnRestore() throws Exception {
+		final String id1 = "id1";
+		final String id2 = "id2";
+
+		final String state1 = "the-test-string-state";
+		final byte[] state1serialized = new StringSerializer().serialize(state1);
+
+		final long state2 = 987654321L;
+		final byte[] state2serialized = new LongSerializer().serialize(state2);
+
+		final List<MasterState> masterHookStates = Arrays.asList(
+				new MasterState(id1, state1serialized, StringSerializer.VERSION),
+				new MasterState(id2, state2serialized, LongSerializer.VERSION));
+
+		final MasterTriggerRestoreHook<String> statefulHook1 = mockGeneric(MasterTriggerRestoreHook.class);
+		when(statefulHook1.getIdentifier()).thenReturn(id1);
+		when(statefulHook1.createCheckpointDataSerializer()).thenReturn(new StringSerializer());
+		when(statefulHook1.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class)))
+				.thenThrow(new Exception("not expected"));
+
+		final MasterTriggerRestoreHook<Long> statefulHook2 = mockGeneric(MasterTriggerRestoreHook.class);
+		when(statefulHook2.getIdentifier()).thenReturn(id2);
+		when(statefulHook2.createCheckpointDataSerializer()).thenReturn(new LongSerializer());
+		when(statefulHook2.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class)))
+				.thenThrow(new Exception("not expected"));
+
+		final MasterTriggerRestoreHook<Void> statelessHook = mockGeneric(MasterTriggerRestoreHook.class);
+		when(statelessHook.getIdentifier()).thenReturn("some-id");
+
+		final JobID jid = new JobID();
+		final long checkpointId = 13L;
+
+		final CompletedCheckpoint checkpoint = new CompletedCheckpoint(
+				jid, checkpointId, 123L, 125L,
+				Collections.<JobVertexID, TaskState>emptyMap(),
+				masterHookStates,
+				CheckpointProperties.forStandardCheckpoint(),
+				null,
+				null);
+
+		final ExecutionAttemptID execId = new ExecutionAttemptID();
+		final ExecutionVertex ackVertex = mockExecutionVertex(execId);
+		final CheckpointCoordinator cc = instantiateCheckpointCoordinator(jid, ackVertex);
+
+		cc.addMasterHook(statefulHook1);
+		cc.addMasterHook(statelessHook);
+		cc.addMasterHook(statefulHook2);
+
+		cc.getCheckpointStore().addCheckpoint(checkpoint);
+		cc.restoreLatestCheckpointedState(
+				Collections.<JobVertexID, ExecutionJobVertex>emptyMap(),
+				true,
+				false);
+
+		verify(statefulHook1, times(1)).restoreCheckpoint(eq(checkpointId), eq(state1));
+		verify(statefulHook2, times(1)).restoreCheckpoint(eq(checkpointId), eq(state2));
+		verify(statelessHook, times(1)).restoreCheckpoint(eq(checkpointId), isNull(Void.class));
+	}
+
+	@Test
+	public void checkUnMatchedStateOnRestore() throws Exception {
+		final String id1 = "id1";
+		final String id2 = "id2";
+
+		final String state1 = "the-test-string-state";
+		final byte[] state1serialized = new StringSerializer().serialize(state1);
+
+		final long state2 = 987654321L;
+		final byte[] state2serialized = new LongSerializer().serialize(state2);
+
+		final List<MasterState> masterHookStates = Arrays.asList(
+				new MasterState(id1, state1serialized, StringSerializer.VERSION),
+				new MasterState(id2, state2serialized, LongSerializer.VERSION));
+
+		final MasterTriggerRestoreHook<String> statefulHook = mockGeneric(MasterTriggerRestoreHook.class);
+		when(statefulHook.getIdentifier()).thenReturn(id1);
+		when(statefulHook.createCheckpointDataSerializer()).thenReturn(new StringSerializer());
+		when(statefulHook.triggerCheckpoint(anyLong(), anyLong(), any(Executor.class)))
+				.thenThrow(new Exception("not expected"));
+
+		final MasterTriggerRestoreHook<Void> statelessHook = mockGeneric(MasterTriggerRestoreHook.class);
+		when(statelessHook.getIdentifier()).thenReturn("some-id");
+
+		final JobID jid = new JobID();
+		final long checkpointId = 44L;
+
+		final CompletedCheckpoint checkpoint = new CompletedCheckpoint(
+				jid, checkpointId, 123L, 125L,
+				Collections.<JobVertexID, TaskState>emptyMap(),
+				masterHookStates,
+				CheckpointProperties.forStandardCheckpoint(),
+				null,
+				null);
+
+		final ExecutionAttemptID execId = new ExecutionAttemptID();
+		final ExecutionVertex ackVertex = mockExecutionVertex(execId);
+		final CheckpointCoordinator cc = instantiateCheckpointCoordinator(jid, ackVertex);
+
+		cc.addMasterHook(statefulHook);
+		cc.addMasterHook(statelessHook);
+
+		cc.getCheckpointStore().addCheckpoint(checkpoint);
+
+		// since we have unmatched state, this should fail
+		try {
+			cc.restoreLatestCheckpointedState(
+					Collections.<JobVertexID, ExecutionJobVertex>emptyMap(),
+					true,
+					false);
+			fail("exception expected");
+		}
+		catch (IllegalStateException ignored) {}
+
+		// permitting unmatched state should succeed
+		cc.restoreLatestCheckpointedState(
+				Collections.<JobVertexID, ExecutionJobVertex>emptyMap(),
+				true,
+				true);
+
+		verify(statefulHook, times(1)).restoreCheckpoint(eq(checkpointId), eq(state1));
+		verify(statelessHook, times(1)).restoreCheckpoint(eq(checkpointId), isNull(Void.class));
+	}
+
+	// ------------------------------------------------------------------------
+	//  failure scenarios
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testSerializationFailsOnTrigger() {
+	}
+
+	@Test
+	public void testHookCallFailsOnTrigger() {
+	}
+
+	@Test
+	public void testDeserializationFailsOnRestore() {
+	}
+
+	@Test
+	public void testHookCallFailsOnRestore() {
+	}
+
+	@Test
+	public void testTypeIncompatibleWithSerializerOnStore() {
+	}
+
+	@Test
+	public void testTypeIncompatibleWithHookOnRestore() {
+	}
+
+	// ------------------------------------------------------------------------
+	//  utilities
+	// ------------------------------------------------------------------------
+
+	private static CheckpointCoordinator instantiateCheckpointCoordinator(JobID jid, ExecutionVertex... ackVertices) {
+		return new CheckpointCoordinator(
+				jid,
+				10000000L,
+				600000L,
+				0L,
+				1,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[0],
+				ackVertices,
+				new ExecutionVertex[0],
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(10),
+				null,
+				Executors.directExecutor());
+	}
+
+	private static <T> T mockGeneric(Class<?> clazz) {
+		@SuppressWarnings("unchecked")
+		Class<T> typedClass = (Class<T>) clazz;
+		return mock(typedClass);
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class StringSerializer implements SimpleVersionedSerializer<String> {
+
+		static final int VERSION = 77;
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		@Override
+		public byte[] serialize(String checkpointData) throws IOException {
+			return checkpointData.getBytes(StandardCharsets.UTF_8);
+		}
+
+		@Override
+		public String deserialize(int version, byte[] serialized) throws IOException {
+			if (version != VERSION) {
+				throw new IOException("version mismatch");
+			}
+			return new String(serialized, StandardCharsets.UTF_8);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final class LongSerializer implements SimpleVersionedSerializer<Long> {
+
+		static final int VERSION = 5;
+
+		@Override
+		public int getVersion() {
+			return VERSION;
+		}
+
+		@Override
+		public byte[] serialize(Long checkpointData) throws IOException {
+			final byte[] bytes = new byte[8];
+			ByteBuffer.wrap(bytes).order(ByteOrder.LITTLE_ENDIAN).putLong(0, checkpointData);
+			return bytes;
+		}
+
+		@Override
+		public Long deserialize(int version, byte[] serialized) throws IOException {
+			assertEquals(VERSION, version);
+			assertEquals(8, serialized.length);
+
+			return ByteBuffer.wrap(serialized).order(ByteOrder.LITTLE_ENDIAN).getLong(0);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 4a36dd2..fc6e516 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -273,7 +273,7 @@ public abstract class CompletedCheckpointStoreTest extends TestLogger {
 			Map<JobVertexID, TaskState> taskGroupStates,
 			CheckpointProperties props) {
 
-			super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, props);
+			super(jobId, checkpointId, timestamp, Long.MAX_VALUE, taskGroupStates, null, props);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 0b759d4..652cc76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -45,7 +45,7 @@ import static org.mockito.Mockito.verify;
 public class CompletedCheckpointTest {
 
 	@Rule
-	public TemporaryFolder tmpFolder = new TemporaryFolder();
+	public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
 	/**
 	 * Tests that persistent checkpoints discard their header file.
@@ -61,7 +61,10 @@ public class CompletedCheckpointTest {
 
 		// Verify discard call is forwarded to state
 		CompletedCheckpoint checkpoint = new CompletedCheckpoint(
-				new JobID(), 0, 0, 1, taskStates, CheckpointProperties.forStandardCheckpoint(),
+				new JobID(), 0, 0, 1,
+				taskStates,
+				Collections.<MasterState>emptyList(),
+				CheckpointProperties.forStandardCheckpoint(),
 				new FileStateHandle(new Path(file.toURI()), file.length()),
 				file.getAbsolutePath());
 
@@ -81,8 +84,12 @@ public class CompletedCheckpointTest {
 
 		boolean discardSubsumed = true;
 		CheckpointProperties props = new CheckpointProperties(false, false, discardSubsumed, true, true, true, true);
+		
 		CompletedCheckpoint checkpoint = new CompletedCheckpoint(
-				new JobID(), 0, 0, 1, taskStates, props);
+				new JobID(), 0, 0, 1,
+				taskStates,
+				Collections.<MasterState>emptyList(),
+				props);
 
 		SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 		checkpoint.registerSharedStates(sharedStateRegistry);
@@ -117,7 +124,10 @@ public class CompletedCheckpointTest {
 			// Keep
 			CheckpointProperties props = new CheckpointProperties(false, true, false, false, false, false, false);
 			CompletedCheckpoint checkpoint = new CompletedCheckpoint(
-					new JobID(), 0, 0, 1, new HashMap<>(taskStates), props,
+					new JobID(), 0, 0, 1,
+					new HashMap<>(taskStates),
+					Collections.<MasterState>emptyList(),
+					props,
 					new FileStateHandle(new Path(file.toURI()), file.length()),
 					externalPath);
 
@@ -132,7 +142,10 @@ public class CompletedCheckpointTest {
 			// Discard
 			props = new CheckpointProperties(false, false, true, true, true, true, true);
 			checkpoint = new CompletedCheckpoint(
-					new JobID(), 0, 0, 1, new HashMap<>(taskStates), props);
+					new JobID(), 0, 0, 1,
+					new HashMap<>(taskStates),
+					Collections.<MasterState>emptyList(),
+					props);
 
 			checkpoint.discardOnShutdown(status, sharedStateRegistry);
 			verify(state, times(1)).discardState();
@@ -155,6 +168,7 @@ public class CompletedCheckpointTest {
 			0,
 			1,
 			new HashMap<>(taskStates),
+			Collections.<MasterState>emptyList(),
 			CheckpointProperties.forStandardCheckpoint());
 
 		CompletedCheckpointStats.DiscardCallback callback = mock(CompletedCheckpointStats.DiscardCallback.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
index 5fce62e..1f038bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java
@@ -105,6 +105,7 @@ public class ExecutionGraphCheckpointCoordinatorTest {
 				Collections.<ExecutionJobVertex>emptyList(),
 				Collections.<ExecutionJobVertex>emptyList(),
 				Collections.<ExecutionJobVertex>emptyList(),
+				Collections.<MasterTriggerRestoreHook<?>>emptyList(),
 				counter,
 				store,
 				null,

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
new file mode 100644
index 0000000..7d9874e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle.StateMetaInfo;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
+import org.apache.flink.util.StringUtils;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * A collection of utility methods for testing the (de)serialization of
+ * checkpoint metadata for persistence.
+ */
+public class CheckpointTestUtils {
+
+	/**
+	 * Creates a random collection of TaskState objects containing various types of state handles.
+	 */
+	public static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtasksPerTask) {
+		return createTaskStates(new Random(), numTaskStates, numSubtasksPerTask);
+	}
+
+	/**
+	 * Creates a random collection of TaskState objects containing various types of state handles.
+	 */
+	public static Collection<TaskState> createTaskStates(
+			Random random,
+			int numTaskStates,
+			int numSubtasksPerTask) {
+
+		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+		for (int stateIdx = 0; stateIdx < numTaskStates; ++stateIdx) {
+
+			int chainLength = 1 + random.nextInt(8);
+
+			TaskState taskState = new TaskState(new JobVertexID(), numSubtasksPerTask, 128, chainLength);
+
+			int noNonPartitionableStateAtIndex = random.nextInt(chainLength);
+			int noOperatorStateBackendAtIndex = random.nextInt(chainLength);
+			int noOperatorStateStreamAtIndex = random.nextInt(chainLength);
+
+			boolean hasKeyedBackend = random.nextInt(4) != 0;
+			boolean hasKeyedStream = random.nextInt(4) != 0;
+
+			for (int subtaskIdx = 0; subtaskIdx < numSubtasksPerTask; subtaskIdx++) {
+
+				List<StreamStateHandle> nonPartitionableStates = new ArrayList<>(chainLength);
+				List<OperatorStateHandle> operatorStatesBackend = new ArrayList<>(chainLength);
+				List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(chainLength);
+
+				for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
+
+					StreamStateHandle nonPartitionableState =
+							new TestByteStreamStateHandleDeepCompare("a-" + chainIdx, ("Hi-" + chainIdx).getBytes(
+									ConfigConstants.DEFAULT_CHARSET));
+					StreamStateHandle operatorStateBackend =
+							new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
+					StreamStateHandle operatorStateStream =
+							new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
+					Map<String, StateMetaInfo> offsetsMap = new HashMap<>();
+					offsetsMap.put("A", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+					offsetsMap.put("B", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+					offsetsMap.put("C", new OperatorStateHandle.StateMetaInfo(new long[]{60, 70, 80}, OperatorStateHandle.Mode.BROADCAST));
+
+					if (chainIdx != noNonPartitionableStateAtIndex) {
+						nonPartitionableStates.add(nonPartitionableState);
+					}
+
+					if (chainIdx != noOperatorStateBackendAtIndex) {
+						OperatorStateHandle operatorStateHandleBackend =
+								new OperatorStateHandle(offsetsMap, operatorStateBackend);
+						operatorStatesBackend.add(operatorStateHandleBackend);
+					}
+
+					if (chainIdx != noOperatorStateStreamAtIndex) {
+						OperatorStateHandle operatorStateHandleStream =
+								new OperatorStateHandle(offsetsMap, operatorStateStream);
+						operatorStatesStream.add(operatorStateHandleStream);
+					}
+				}
+
+				KeyGroupsStateHandle keyedStateBackend = null;
+				KeyGroupsStateHandle keyedStateStream = null;
+
+				if (hasKeyedBackend) {
+					keyedStateBackend = new KeyGroupsStateHandle(
+							new KeyGroupRangeOffsets(1, 1, new long[]{42}),
+							new TestByteStreamStateHandleDeepCompare("c", "Hello"
+									.getBytes(ConfigConstants.DEFAULT_CHARSET)));
+				}
+
+				if (hasKeyedStream) {
+					keyedStateStream = new KeyGroupsStateHandle(
+							new KeyGroupRangeOffsets(1, 1, new long[]{23}),
+							new TestByteStreamStateHandleDeepCompare("d", "World"
+									.getBytes(ConfigConstants.DEFAULT_CHARSET)));
+				}
+
+				taskState.putState(subtaskIdx, new SubtaskState(
+						new ChainedStateHandle<>(nonPartitionableStates),
+						new ChainedStateHandle<>(operatorStatesBackend),
+						new ChainedStateHandle<>(operatorStatesStream),
+						keyedStateStream,
+						keyedStateBackend));
+			}
+
+			taskStates.add(taskState);
+		}
+
+		return taskStates;
+	}
+
+	/**
+	 * Creates a bunch of random master states.
+	 */
+	public static Collection<MasterState> createRandomMasterStates(Random random, int num) {
+		final ArrayList<MasterState> states = new ArrayList<>(num);
+
+		for (int i = 0; i < num; i++) {
+			int version = random.nextInt(10);
+			String name = StringUtils.getRandomString(random, 5, 500);
+			byte[] bytes = new byte[random.nextInt(5000) + 1];
+			random.nextBytes(bytes);
+
+			states.add(new MasterState(name, bytes, version));
+		}
+
+		return states;
+	}
+
+	/**
+	 * Asserts that two MasterStates are equal.
+	 * 
+	 * <p>The MasterState avoids overriding {@code equals()} on purpose, because equality is not well
+	 * defined in the raw contents.
+	 */
+	public static void assertMasterStateEquality(MasterState a, MasterState b) {
+		assertEquals(a.version(), b.version());
+		assertEquals(a.name(), b.name());
+		assertArrayEquals(a.bytes(), b.bytes());
+
+	}
+
+	// ------------------------------------------------------------------------
+
+	/** utility class, not meant to be instantiated */
+	private CheckpointTestUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index c66b29d..20b1e57 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -67,7 +67,7 @@ public class SavepointLoaderTest {
 		JobID jobId = new JobID();
 
 		// Store savepoint
-		SavepointV1 savepoint = new SavepointV1(checkpointId, taskStates.values());
+		SavepointV2 savepoint = new SavepointV2(checkpointId, taskStates.values());
 		String path = SavepointStore.storeSavepoint(tmp.getAbsolutePath(), savepoint);
 
 		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
index 1eb8055..cf79282 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.MasterState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.junit.Rule;
@@ -68,7 +69,7 @@ public class SavepointStoreTest {
 
 		// Store
 		String savepointDirectory = SavepointStore.createSavepointDirectory(root, new JobID());
-		SavepointV1 stored = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24));
+		SavepointV2 stored = new SavepointV2(1929292, CheckpointTestUtils.createTaskStates(4, 24));
 		String path = SavepointStore.storeSavepoint(savepointDirectory, stored);
 
 		list = rootFile.listFiles();
@@ -77,7 +78,10 @@ public class SavepointStoreTest {
 
 		// Load
 		Savepoint loaded = SavepointStore.loadSavepoint(path, Thread.currentThread().getContextClassLoader());
-		assertEquals(stored, loaded);
+
+		assertEquals(stored.getCheckpointId(), loaded.getCheckpointId());
+		assertEquals(stored.getTaskStates(), loaded.getTaskStates());
+		assertEquals(stored.getMasterStates(), loaded.getMasterStates());
 
 		loaded.dispose();
 
@@ -126,8 +130,8 @@ public class SavepointStoreTest {
 		File rootFile = new File(root);
 
 		// New savepoint type for test
-		int version = ThreadLocalRandom.current().nextInt();
-		long checkpointId = ThreadLocalRandom.current().nextLong();
+		int version = ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE); // make this a positive number
+		long checkpointId = ThreadLocalRandom.current().nextLong(Long.MAX_VALUE); // make this a positive number
 
 		// Add serializer
 		serializers.put(version, NewSavepointSerializer.INSTANCE);
@@ -143,7 +147,7 @@ public class SavepointStoreTest {
 
 		// Savepoint v0
 		String savepointDirectory2 = SavepointStore.createSavepointDirectory(root, new JobID());
-		Savepoint savepoint = new SavepointV1(checkpointId, SavepointV1Test.createTaskStates(4, 32));
+		SavepointV2 savepoint = new SavepointV2(checkpointId, CheckpointTestUtils.createTaskStates(4, 32));
 		String pathSavepoint = SavepointStore.storeSavepoint(savepointDirectory2, savepoint);
 
 		list = rootFile.listFiles();
@@ -156,7 +160,9 @@ public class SavepointStoreTest {
 		assertEquals(newSavepoint, loaded);
 
 		loaded = SavepointStore.loadSavepoint(pathSavepoint, Thread.currentThread().getContextClassLoader());
-		assertEquals(savepoint, loaded);
+		assertEquals(savepoint.getCheckpointId(), loaded.getCheckpointId());
+		assertEquals(savepoint.getTaskStates(), loaded.getTaskStates());
+		assertEquals(savepoint.getMasterStates(), loaded.getMasterStates());
 	}
 
 	/**
@@ -199,7 +205,7 @@ public class SavepointStoreTest {
 		FileSystem fs = FileSystem.get(new Path(root).toUri());
 
 		// Store
-		SavepointV1 savepoint = new SavepointV1(1929292, SavepointV1Test.createTaskStates(4, 24));
+		SavepointV2 savepoint = new SavepointV2(1929292, CheckpointTestUtils.createTaskStates(4, 24));
 
 		FileStateHandle store1 = SavepointStore.storeExternalizedCheckpointToHandle(root, savepoint);
 		fs.exists(store1.getFilePath());
@@ -251,7 +257,12 @@ public class SavepointStoreTest {
 
 		@Override
 		public Collection<TaskState> getTaskStates() {
-			return Collections.EMPTY_LIST;
+			return Collections.emptyList();
+		}
+
+		@Override
+		public Collection<MasterState> getMasterStates() {
+			return Collections.emptyList();
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
index 58cf1aa..0eff7bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
@@ -27,6 +27,7 @@ import java.io.ByteArrayInputStream;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class SavepointV1SerializerTest {
 
@@ -35,25 +36,29 @@ public class SavepointV1SerializerTest {
 	 */
 	@Test
 	public void testSerializeDeserializeV1() throws Exception {
-		Random r = new Random(42);
-		for (int i = 0; i < 100; ++i) {
+		final Random r = new Random(42);
+
+		for (int i = 0; i < 50; ++i) {
 			SavepointV1 expected =
-					new SavepointV1(i+ 123123, SavepointV1Test.createTaskStates(1 + r.nextInt(64), 1 + r.nextInt(64)));
+					new SavepointV1(i+ 123123, CheckpointTestUtils.createTaskStates(r, 1 + r.nextInt(64), 1 + r.nextInt(64)));
 
 			SavepointV1Serializer serializer = SavepointV1Serializer.INSTANCE;
 
 			// Serialize
 			ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
-			serializer.serialize(expected, new DataOutputViewStreamWrapper(baos));
+			serializer.serializeOld(expected, new DataOutputViewStreamWrapper(baos));
 			byte[] bytes = baos.toByteArray();
 
 			// Deserialize
 			ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-			Savepoint actual = serializer.deserialize(
+			SavepointV2 actual = serializer.deserialize(
 					new DataInputViewStreamWrapper(bais),
 					Thread.currentThread().getContextClassLoader());
 
-			assertEquals(expected, actual);
+
+			assertEquals(expected.getCheckpointId(), actual.getCheckpointId());
+			assertEquals(expected.getTaskStates(), actual.getTaskStates());
+			assertTrue(actual.getMasterStates().isEmpty());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
deleted file mode 100644
index 08ec35e..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
-import org.apache.flink.runtime.state.KeyGroupsStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class SavepointV1Test {
-
-	/**
-	 * Simple test of savepoint methods.
-	 */
-	@Test
-	public void testSavepointV1() throws Exception {
-		long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
-		int numTaskStates = 4;
-		int numSubtaskStates = 16;
-
-		Collection<TaskState> expected = createTaskStates(numTaskStates, numSubtaskStates);
-
-		SavepointV1 savepoint = new SavepointV1(checkpointId, expected);
-
-		assertEquals(SavepointV1.VERSION, savepoint.getVersion());
-		assertEquals(checkpointId, savepoint.getCheckpointId());
-		assertEquals(expected, savepoint.getTaskStates());
-
-		assertFalse(savepoint.getTaskStates().isEmpty());
-		savepoint.dispose();
-		assertTrue(savepoint.getTaskStates().isEmpty());
-	}
-
-	static Collection<TaskState> createTaskStates(int numTaskStates, int numSubtasksPerTask) throws IOException {
-
-		Random random = new Random(numTaskStates * 31 + numSubtasksPerTask);
-
-		List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
-		for (int stateIdx = 0; stateIdx < numTaskStates; ++stateIdx) {
-
-			int chainLength = 1 + random.nextInt(8);
-
-			TaskState taskState = new TaskState(new JobVertexID(), numSubtasksPerTask, 128, chainLength);
-
-			int noNonPartitionableStateAtIndex = random.nextInt(chainLength);
-			int noOperatorStateBackendAtIndex = random.nextInt(chainLength);
-			int noOperatorStateStreamAtIndex = random.nextInt(chainLength);
-
-			boolean hasKeyedBackend = random.nextInt(4) != 0;
-			boolean hasKeyedStream = random.nextInt(4) != 0;
-
-			for (int subtaskIdx = 0; subtaskIdx < numSubtasksPerTask; subtaskIdx++) {
-
-				List<StreamStateHandle> nonPartitionableStates = new ArrayList<>(chainLength);
-				List<OperatorStateHandle> operatorStatesBackend = new ArrayList<>(chainLength);
-				List<OperatorStateHandle> operatorStatesStream = new ArrayList<>(chainLength);
-
-				for (int chainIdx = 0; chainIdx < chainLength; ++chainIdx) {
-
-					StreamStateHandle nonPartitionableState =
-							new TestByteStreamStateHandleDeepCompare("a-" + chainIdx, ("Hi-" + chainIdx).getBytes(
-								ConfigConstants.DEFAULT_CHARSET));
-					StreamStateHandle operatorStateBackend =
-							new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
-					StreamStateHandle operatorStateStream =
-							new TestByteStreamStateHandleDeepCompare("b-" + chainIdx, ("Beautiful-" + chainIdx).getBytes(ConfigConstants.DEFAULT_CHARSET));
-					Map<String, OperatorStateHandle.StateMetaInfo> offsetsMap = new HashMap<>();
-					offsetsMap.put("A", new OperatorStateHandle.StateMetaInfo(new long[]{0, 10, 20}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
-					offsetsMap.put("B", new OperatorStateHandle.StateMetaInfo(new long[]{30, 40, 50}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
-					offsetsMap.put("C", new OperatorStateHandle.StateMetaInfo(new long[]{60, 70, 80}, OperatorStateHandle.Mode.BROADCAST));
-
-					if (chainIdx != noNonPartitionableStateAtIndex) {
-						nonPartitionableStates.add(nonPartitionableState);
-					}
-
-					if (chainIdx != noOperatorStateBackendAtIndex) {
-						OperatorStateHandle operatorStateHandleBackend =
-								new OperatorStateHandle(offsetsMap, operatorStateBackend);
-						operatorStatesBackend.add(operatorStateHandleBackend);
-					}
-
-					if (chainIdx != noOperatorStateStreamAtIndex) {
-						OperatorStateHandle operatorStateHandleStream =
-								new OperatorStateHandle(offsetsMap, operatorStateStream);
-						operatorStatesStream.add(operatorStateHandleStream);
-					}
-				}
-
-				KeyGroupsStateHandle keyedStateBackend = null;
-				KeyGroupsStateHandle keyedStateStream = null;
-
-				if (hasKeyedBackend) {
-					keyedStateBackend = new KeyGroupsStateHandle(
-							new KeyGroupRangeOffsets(1, 1, new long[]{42}),
-							new TestByteStreamStateHandleDeepCompare("c", "Hello"
-								.getBytes(ConfigConstants.DEFAULT_CHARSET)));
-				}
-
-				if (hasKeyedStream) {
-					keyedStateStream = new KeyGroupsStateHandle(
-							new KeyGroupRangeOffsets(1, 1, new long[]{23}),
-							new TestByteStreamStateHandleDeepCompare("d", "World"
-								.getBytes(ConfigConstants.DEFAULT_CHARSET)));
-				}
-
-				taskState.putState(subtaskIdx, new SubtaskState(
-						new ChainedStateHandle<>(nonPartitionableStates),
-						new ChainedStateHandle<>(operatorStatesBackend),
-						new ChainedStateHandle<>(operatorStatesStream),
-						keyedStateStream,
-						keyedStateBackend));
-			}
-
-			taskStates.add(taskState);
-		}
-
-		return taskStates;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
new file mode 100644
index 0000000..deb14dd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2SerializerTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.core.memory.ByteArrayInputStreamWithPos;
+import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Various tests for the version 2 format serializer of a checkpoint. 
+ */
+public class SavepointV2SerializerTest {
+
+	@Test
+	public void testCheckpointWithNoState() throws Exception {
+		final Random rnd = new Random();
+
+		for (int i = 0; i < 100; ++i) {
+			final long checkpointId = rnd.nextLong() & 0x7fffffffffffffffL;
+			final Collection<TaskState> taskStates = Collections.emptyList();
+			final Collection<MasterState> masterStates = Collections.emptyList();
+
+			testCheckpointSerialization(checkpointId, taskStates, masterStates);
+		}
+	}
+
+	@Test
+	public void testCheckpointWithOnlyMasterState() throws Exception {
+		final Random rnd = new Random();
+		final int maxNumMasterStates = 5;
+
+		for (int i = 0; i < 100; ++i) {
+			final long checkpointId = rnd.nextLong() & 0x7fffffffffffffffL;
+
+			final Collection<TaskState> taskStates = Collections.emptyList();
+
+			final int numMasterStates = rnd.nextInt(maxNumMasterStates) + 1;
+			final Collection<MasterState> masterStates = 
+					CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
+
+			testCheckpointSerialization(checkpointId, taskStates, masterStates);
+		}
+	}
+
+	@Test
+	public void testCheckpointWithOnlyTaskState() throws Exception {
+		final Random rnd = new Random();
+		final int maxTaskStates = 20;
+		final int maxNumSubtasks = 20;
+
+		for (int i = 0; i < 100; ++i) {
+			final long checkpointId = rnd.nextLong() & 0x7fffffffffffffffL;
+
+			final int numTasks = rnd.nextInt(maxTaskStates) + 1;
+			final int numSubtasks = rnd.nextInt(maxNumSubtasks) + 1;
+			final Collection<TaskState> taskStates = 
+					CheckpointTestUtils.createTaskStates(rnd, numTasks, numSubtasks);
+
+			final Collection<MasterState> masterStates = Collections.emptyList();
+
+			testCheckpointSerialization(checkpointId, taskStates, masterStates);
+		}
+	}
+
+	@Test
+	public void testCheckpointWithMasterAndTaskState() throws Exception {
+		final Random rnd = new Random();
+
+		final int maxNumMasterStates = 5;
+		final int maxTaskStates = 20;
+		final int maxNumSubtasks = 20;
+
+		for (int i = 0; i < 100; ++i) {
+			final long checkpointId = rnd.nextLong() & 0x7fffffffffffffffL;
+
+			final int numTasks = rnd.nextInt(maxTaskStates) + 1;
+			final int numSubtasks = rnd.nextInt(maxNumSubtasks) + 1;
+			final Collection<TaskState> taskStates =
+					CheckpointTestUtils.createTaskStates(rnd, numTasks, numSubtasks);
+
+			final int numMasterStates = rnd.nextInt(maxNumMasterStates) + 1;
+			final Collection<MasterState> masterStates =
+					CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
+
+			testCheckpointSerialization(checkpointId, taskStates, masterStates);
+		}
+	}
+
+	private void testCheckpointSerialization(
+			long checkpointId,
+			Collection<TaskState> taskStates,
+			Collection<MasterState> masterStates) throws IOException {
+
+		SavepointV2Serializer serializer = SavepointV2Serializer.INSTANCE;
+
+		ByteArrayOutputStreamWithPos baos = new ByteArrayOutputStreamWithPos();
+		DataOutputStream out = new DataOutputViewStreamWrapper(baos);
+
+		serializer.serialize(new SavepointV2(checkpointId, taskStates, masterStates), out);
+		out.close();
+
+		byte[] bytes = baos.toByteArray();
+
+		DataInputStream in = new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(bytes));
+		SavepointV2 deserialized = serializer.deserialize(in, getClass().getClassLoader());
+
+		assertEquals(checkpointId, deserialized.getCheckpointId());
+		assertEquals(taskStates, deserialized.getTaskStates());
+
+		assertEquals(masterStates.size(), deserialized.getMasterStates().size());
+		for (Iterator<MasterState> a = masterStates.iterator(), b = deserialized.getMasterStates().iterator();
+				a.hasNext();)
+		{
+			CheckpointTestUtils.assertMasterStateEquality(a.next(), b.next());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Test.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Test.java
new file mode 100644
index 0000000..428a62a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Test.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.MasterState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SavepointV2Test {
+
+	/**
+	 * Simple test of savepoint methods.
+	 */
+	@Test
+	public void testSavepointV1() throws Exception {
+		final Random rnd = new Random();
+
+		final long checkpointId = rnd.nextInt(Integer.MAX_VALUE) + 1;
+		final int numTaskStates = 4;
+		final int numSubtaskStates = 16;
+		final int numMasterStates = 7;
+
+		Collection<TaskState> taskStates =
+				CheckpointTestUtils.createTaskStates(rnd, numTaskStates, numSubtaskStates);
+
+		Collection<MasterState> masterStates =
+				CheckpointTestUtils.createRandomMasterStates(rnd, numMasterStates);
+
+		SavepointV2 checkpoint = new SavepointV2(checkpointId, taskStates, masterStates);
+
+		assertEquals(2, checkpoint.getVersion());
+		assertEquals(checkpointId, checkpoint.getCheckpointId());
+		assertEquals(taskStates, checkpoint.getTaskStates());
+		assertEquals(masterStates, checkpoint.getMasterStates());
+
+		assertFalse(checkpoint.getTaskStates().isEmpty());
+		assertFalse(checkpoint.getMasterStates().isEmpty());
+
+		checkpoint.dispose();
+
+		assertTrue(checkpoint.getTaskStates().isEmpty());
+		assertTrue(checkpoint.getMasterStates().isEmpty());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index f96b624..4e1d0f7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -128,6 +129,7 @@ public class ArchivedExecutionGraphTest {
 			Collections.<ExecutionJobVertex>emptyList(),
 			Collections.<ExecutionJobVertex>emptyList(),
 			Collections.<ExecutionJobVertex>emptyList(),
+			Collections.<MasterTriggerRestoreHook<?>>emptyList(),
 			new StandaloneCheckpointIDCounter(),
 			new StandaloneCompletedCheckpointStore(1),
 			null,

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ExternallyInducedSource.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ExternallyInducedSource.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ExternallyInducedSource.java
new file mode 100644
index 0000000..b26cf4f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ExternallyInducedSource.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.checkpoint;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Sources that implement this interface do not trigger checkpoints when receiving a
+ * trigger message from the checkpoint coordinator, but when their input data/events
+ * indicate that a checkpoint should be triggered.
+ *
+ * <p>Since sources cannot simply create a new checkpoint on their own, this mechanism
+ * always goes together with a {@link WithMasterCheckpointHook hook on the master side}.
+ * In a typical setup, the hook on the master tells the source system (for example
+ * the message queue) to prepare a checkpoint. The exact point when the checkpoint is
+ * taken is then controlled by the event stream received from the source, and triggered
+ * by the source function (implementing this interface) in Flink when seeing the relevant
+ * events.
+ *
+ * @param <T> Type of the elements produced by the source function
+ * @param <CD> The type of the data stored in the checkpoint by the master that triggers
+ */
+@PublicEvolving
+public interface ExternallyInducedSource<T, CD> extends SourceFunction<T>, WithMasterCheckpointHook<CD> {
+
+	/**
+	 * Sets the checkpoint trigger through which the source can trigger the checkpoint.
+	 *
+	 * @param checkpointTrigger The checkpoint trigger to set
+	 */
+	void setCheckpointTrigger(CheckpointTrigger checkpointTrigger);
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Through the {@code CheckpointTrigger}, the source function notifies the Flink
+	 * source operator when to trigger the checkpoint.
+	 */
+	interface CheckpointTrigger {
+
+		/**
+		 * Triggers a checkpoint. This method should be called by the source
+		 * when it sees the event that indicates that a checkpoint should be triggered.
+		 *
+		 * <p>When this method is called, the parallel operator instance in which the
+		 * calling source function runs will perform its checkpoint and insert the
+		 * checkpoint barrier into the data stream.
+		 *
+		 * @param checkpointId The ID that identifies the checkpoint.
+		 *
+		 * @throws FlinkException Thrown when the checkpoint could not be triggered, for example
+		 *                        because of an invalid state or errors when storing the
+		 *                        checkpoint state.
+		 */
+		void triggerCheckpoint(long checkpointId) throws FlinkException;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/WithMasterCheckpointHook.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/WithMasterCheckpointHook.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/WithMasterCheckpointHook.java
new file mode 100644
index 0000000..ef872de
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/WithMasterCheckpointHook.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.checkpoint;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+
+/**
+ * This interface can be implemented by streaming functions that need to trigger a
+ * "global action" on the master (in the checkpoint coordinator) as part of every
+ * checkpoint and restore operation.
+ *
+ * @param <E> The type of the data stored by the hook in the checkpoint, or {@code Void}, if none.
+ */
+@PublicEvolving
+public interface WithMasterCheckpointHook<E> extends java.io.Serializable {
+
+	/**
+	 * Creates the hook that should be called by the checkpoint coordinator.
+	 */
+	MasterTriggerRestoreHook<E> createMasterTriggerRestoreHook();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/FunctionMasterCheckpointHookFactory.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/FunctionMasterCheckpointHookFactory.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/FunctionMasterCheckpointHookFactory.java
new file mode 100644
index 0000000..c256698
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/FunctionMasterCheckpointHookFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
+
+/**
+ * Utility class that turns a {@link WithMasterCheckpointHook} into a
+ * {@link org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook.Factory}.
+ */
+class FunctionMasterCheckpointHookFactory implements MasterTriggerRestoreHook.Factory {
+
+	private static final long serialVersionUID = 2L;
+
+	private final WithMasterCheckpointHook<?> creator;
+
+	FunctionMasterCheckpointHookFactory(WithMasterCheckpointHook<?> creator) {
+		this.creator = checkNotNull(creator);
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public <V> MasterTriggerRestoreHook<V> create() {
+		return (MasterTriggerRestoreHook<V>) creator.createMasterTriggerRestoreHook();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index a1d33d8..7f24cd3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import org.apache.commons.lang3.StringUtils;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -36,6 +37,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.migration.streaming.api.graph.StreamGraphHasherV1;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.InputFormatVertex;
@@ -51,7 +53,9 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
@@ -542,6 +546,8 @@ public class StreamingJobGraphGenerator {
 			interval = Long.MAX_VALUE;
 		}
 
+		//  --- configure the participating vertices ---
+
 		// collect the vertices that receive "trigger checkpoint" messages.
 		// currently, these are all the sources
 		List<JobVertexID> triggerVertices = new ArrayList<>();
@@ -552,7 +558,7 @@ public class StreamingJobGraphGenerator {
 
 		// collect the vertices that receive "commit checkpoint" messages
 		// currently, these are all vertices
-		List<JobVertexID> commitVertices = new ArrayList<>();
+		List<JobVertexID> commitVertices = new ArrayList<>(jobVertices.size());
 
 		for (JobVertex vertex : jobVertices.values()) {
 			if (vertex.isInputVertex()) {
@@ -562,6 +568,8 @@ public class StreamingJobGraphGenerator {
 			ackVertices.add(vertex.getID());
 		}
 
+		//  --- configure options ---
+
 		ExternalizedCheckpointSettings externalizedCheckpointSettings;
 		if (cfg.isExternalizedCheckpointsEnabled()) {
 			CheckpointConfig.ExternalizedCheckpointCleanup cleanup = cfg.getExternalizedCheckpointCleanup();
@@ -587,12 +595,30 @@ public class StreamingJobGraphGenerator {
 				"exactly-once or at-least-once.");
 		}
 
+		//  --- configure the master-side checkpoint hooks ---
+
+		final ArrayList<MasterTriggerRestoreHook.Factory> hooks = new ArrayList<>();
+
+		for (StreamNode node : streamGraph.getStreamNodes()) {
+			StreamOperator<?> op = node.getOperator();
+			if (op instanceof AbstractUdfStreamOperator) {
+				Function f = ((AbstractUdfStreamOperator<?, ?>) op).getUserFunction();
+
+				if (f instanceof WithMasterCheckpointHook) {
+					hooks.add(new FunctionMasterCheckpointHookFactory((WithMasterCheckpointHook<?>) f));
+				}
+			}
+		}
+
+		//  --- done, put it all together ---
+
 		JobCheckpointingSettings settings = new JobCheckpointingSettings(
 				triggerVertices, ackVertices, commitVertices, interval,
 				cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
 				cfg.getMaxConcurrentCheckpoints(),
 				externalizedCheckpointSettings,
 				streamGraph.getStateBackend(),
+				hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]),
 				isExactlyOnce);
 
 		jobGraph.setSnapshotSettings(settings);

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 66e92df..31cd7c1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -19,8 +19,12 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.util.FlinkException;
 
 /**
  * {@link StreamTask} for executing a {@link StreamSource}.
@@ -40,9 +44,44 @@ import org.apache.flink.streaming.api.operators.StreamSource;
 public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>>
 	extends StreamTask<OUT, OP> {
 
+	private volatile boolean externallyInducedCheckpoints;
+
 	@Override
 	protected void init() {
 		// does not hold any resources, so no initialization needed
+
+		// we check if the source is actually inducing the checkpoints, rather
+		// than the trigger ch
+		SourceFunction<?> source = headOperator.getUserFunction();
+		if (source instanceof ExternallyInducedSource) {
+			externallyInducedCheckpoints = true;
+
+			ExternallyInducedSource.CheckpointTrigger triggerHook = new ExternallyInducedSource.CheckpointTrigger() {
+
+				@Override
+				public void triggerCheckpoint(long checkpointId) throws FlinkException {
+					// TODO - we need to see how to derive those. We should probably not encode this in the
+					// TODO -   source's trigger message, but do a handshake in this task between the trigger
+					// TODO -   message from the master, and the source's trigger notification
+					final CheckpointOptions checkpointOptions = CheckpointOptions.forFullCheckpoint();
+					final long timestamp = System.currentTimeMillis();
+
+					final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);
+
+					try {
+						SourceStreamTask.super.triggerCheckpoint(checkpointMetaData, checkpointOptions);
+					}
+					catch (RuntimeException | FlinkException e) {
+						throw e;
+					}
+					catch (Exception e) {
+						throw new FlinkException(e.getMessage(), e);
+					}
+				}
+			};
+
+			((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);
+		}
 	}
 
 	@Override
@@ -62,4 +101,21 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 			headOperator.cancel();
 		}
 	}
+
+	// ------------------------------------------------------------------------
+	//  Checkpointing
+	// ------------------------------------------------------------------------
+
+	@Override
+	public boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
+		if (!externallyInducedCheckpoints) {
+			return super.triggerCheckpoint(checkpointMetaData, checkpointOptions);
+		}
+		else {
+			// we do not trigger checkpoints here, we simply state whether we can trigger them
+			synchronized (getCheckpointLock()) {
+				return isRunning();
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
new file mode 100644
index 0000000..b5a95eb
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/graph/WithMasterCheckpointHookConfigTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.graph;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
+import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook.Factory;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
+import static java.util.Arrays.asList;
+import static org.junit.Assert.*;
+
+/**
+ * Tests that when sources implement {@link WithMasterCheckpointHook} the hooks are properly
+ * configured in the job's checkpoint settings.
+ */
+@SuppressWarnings("serial")
+public class WithMasterCheckpointHookConfigTest {
+
+	/**
+	 * This test creates a program with 4 sources (2 with master hooks, 2 without).
+	 * The resulting job graph must have 2 configured master hooks.
+	 */
+	@Test
+	public void testHookConfiguration() throws Exception {
+		// create some sources some of which configure master hooks
+		final TestSource source1 = new TestSource();
+		final TestSourceWithHook source2 = new TestSourceWithHook("foo");
+		final TestSource source3 = new TestSource();
+		final TestSourceWithHook source4 = new TestSourceWithHook("bar");
+
+		final MapFunction<String, String> identity = new Identity<>();
+		final IdentityWithHook<String> identityWithHook1 = new IdentityWithHook<>("apple");
+		final IdentityWithHook<String> identityWithHook2 = new IdentityWithHook<>("orange");
+
+		final Set<MasterTriggerRestoreHook<?>> hooks = new HashSet<MasterTriggerRestoreHook<?>>(asList(
+				source2.createMasterTriggerRestoreHook(),
+				source4.createMasterTriggerRestoreHook(),
+				identityWithHook1.createMasterTriggerRestoreHook(),
+				identityWithHook2.createMasterTriggerRestoreHook()));
+
+		// we can instantiate a local environment here, because we never actually execute something
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
+		env.enableCheckpointing(500);
+
+		env
+			.addSource(source1).map(identity)
+			.union(env.addSource(source2).map(identity))
+			.union(env.addSource(source3).map(identityWithHook1))
+			.union(env.addSource(source4).map(identityWithHook2))
+			.addSink(new DiscardingSink<String>());
+
+		final JobGraph jg = env.getStreamGraph().getJobGraph();
+		assertEquals(hooks.size(), jg.getCheckpointingSettings().getMasterHooks().length);
+
+		// check that all hooks are contained and exist exactly once
+		for (Factory f : jg.getCheckpointingSettings().getMasterHooks()) {
+			MasterTriggerRestoreHook<?> hook = f.create();
+			assertTrue(hooks.remove(hook));
+		}
+		assertTrue(hooks.isEmpty());
+	}
+
+	// -----------------------------------------------------------------------
+
+	private static class TestHook implements MasterTriggerRestoreHook<String> {
+
+		private final String id;
+
+		TestHook(String id) {
+			this.id = id;
+		}
+
+		@Override
+		public String getIdentifier() {
+			return id;
+		}
+
+		@Override
+		public Future<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void restoreCheckpoint(long checkpointId, @Nullable String checkpointData) throws Exception {
+			throw new UnsupportedOperationException();
+		}
+
+		@Nullable
+		@Override
+		public SimpleVersionedSerializer<String> createCheckpointDataSerializer() {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			return obj == this || (obj != null && obj.getClass() == getClass() && ((TestHook) obj).id.equals(id));
+		}
+
+		@Override
+		public int hashCode() {
+			return id.hashCode();
+		}
+	}
+
+	// -----------------------------------------------------------------------
+
+	private static class TestSource implements SourceFunction<String> {
+
+		@Override
+		public void run(SourceContext<String> ctx) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public void cancel() {}
+	}
+
+	// -----------------------------------------------------------------------
+
+	private static class TestSourceWithHook extends TestSource implements WithMasterCheckpointHook<String> {
+
+		private final String id;
+
+		TestSourceWithHook(String id) {
+			this.id = id;
+		}
+
+		@Override
+		public TestHook createMasterTriggerRestoreHook() {
+			return new TestHook(id);
+		}
+	}
+
+	// -----------------------------------------------------------------------
+
+	private static class Identity<T> implements MapFunction<T, T> {
+
+		@Override
+		public T map(T value) {
+			return value;
+		}
+	}
+
+	// -----------------------------------------------------------------------
+
+	private static class IdentityWithHook<T> extends Identity<T> implements WithMasterCheckpointHook<String> {
+
+		private final String id;
+
+		IdentityWithHook(String id) {
+			this.id = id;
+		}
+
+		@Override
+		public TestHook createMasterTriggerRestoreHook() {
+			return new TestHook(id);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/90ca4381/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
index 38741ba..54cd186 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/StreamRecordWriterTest.java
@@ -29,12 +29,9 @@ import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.types.LongValue;
 
 import org.junit.Test;
-import org.junit.runner.RunWith;
 
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.IOException;
 
@@ -45,8 +42,6 @@ import static org.mockito.Mockito.*;
  * This test uses the PowerMockRunner runner to work around the fact that the
  * {@link ResultPartitionWriter} class is final.
  */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(ResultPartitionWriter.class)
 public class StreamRecordWriterTest {
 
 	/**


Mime
View raw message