flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tracker
Date Mon, 30 Jan 2017 15:33:38 GMT
Repository: flink
Updated Branches:
  refs/heads/master 126fb1779 -> dcfa3fbb0


[FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tracker

This closes #3215.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dcfa3fbb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dcfa3fbb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dcfa3fbb

Branch: refs/heads/master
Commit: dcfa3fbb0f17400ebf823e10f803cde8563fff4a
Parents: 126fb17
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Jan 25 15:42:24 2017 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Jan 30 16:33:22 2017 +0100

----------------------------------------------------------------------
 .../checkpoints/CheckpointConfigHandler.java    |  8 +-
 .../CheckpointStatsDetailsHandler.java          |  7 +-
 .../CheckpointStatsDetailsSubtasksHandler.java  |  7 +-
 .../checkpoints/CheckpointStatsHandler.java     |  7 +-
 .../CheckpointConfigHandlerTest.java            | 13 +--
 .../CheckpointStatsDetailsHandlerTest.java      | 13 +--
 .../checkpoints/CheckpointStatsHandlerTest.java |  5 +-
 ...heckpointStatsSubtaskDetailsHandlerTest.java | 13 +--
 .../checkpoint/AbstractCheckpointStats.java     |  5 +-
 .../checkpoint/CheckpointStatsHistory.java      |  4 +-
 .../checkpoint/CheckpointStatsTracker.java      | 16 ++--
 .../checkpoint/CompletedCheckpointStats.java    | 26 +++---
 .../checkpoint/FailedCheckpointStats.java       | 24 +++---
 .../checkpoint/PendingCheckpointStats.java      |  4 +-
 .../checkpoint/RestoredCheckpointStats.java     |  2 +-
 .../runtime/checkpoint/SubtaskStateStats.java   |  8 +-
 .../runtime/checkpoint/TaskStateStats.java      |  9 ++-
 .../executiongraph/AccessExecutionGraph.java    | 20 +++--
 .../executiongraph/ArchivedExecutionGraph.java  | 54 ++++++++-----
 .../runtime/executiongraph/ExecutionGraph.java  | 22 ++++-
 .../tasks/ExternalizedCheckpointSettings.java   |  2 +
 .../checkpoint/CheckpointStatsHistoryTest.java  |  1 -
 .../checkpoint/CheckpointStatsSnapshotTest.java | 84 ++++++++++++++++++++
 .../checkpoint/CompletedCheckpointTest.java     | 35 ++++++++
 .../checkpoint/FailedCheckpointStatsTest.java   | 40 ++++++++++
 .../checkpoint/PendingCheckpointStatsTest.java  | 38 +++++++--
 .../checkpoint/SubtaskStateStatsTest.java       | 36 +++++++++
 .../runtime/checkpoint/TaskStateStatsTest.java  | 46 ++++++++++-
 .../ArchivedExecutionGraphTest.java             |  7 +-
 .../tasks/JobSnapshottingSettingsTest.java      | 59 ++++++++++++++
 30 files changed, 490 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index 1ad5e65..be0d283 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
@@ -42,10 +41,13 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
 	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
 		StringWriter writer = new StringWriter();
+
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+		JobSnapshottingSettings settings = graph.getJobSnapshottingSettings();
 
-		CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
-		JobSnapshottingSettings settings = tracker.getSnapshottingSettings();
+		if (settings == null) {
+			return "{}";
+		}
 
 		gen.writeStartObject();
 		{

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 6bb8300..33d6cf7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
@@ -54,8 +53,10 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
 			return "{}";
 		}
 
-		CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
-		CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+		CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+		if (snapshot == null) {
+			return "{}";
+		}
 
 		AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index 00195b5..d55467f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
@@ -72,8 +71,10 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
 			return "{}";
 		}
 
-		CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
-		CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+		CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+		if (snapshot == null) {
+			return "{}";
+		}
 
 		AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 71f3637..8aab5fa 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
@@ -54,8 +53,10 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 
-		CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
-		CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+		CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+		if (snapshot == null) {
+			return "{}";
+		}
 
 		gen.writeStartObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index 410e044..e517c3c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
@@ -60,9 +59,7 @@ public class CheckpointConfigHandlerTest {
 			true);
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-		when(tracker.getSnapshottingSettings()).thenReturn(settings);
+		when(graph.getJobSnapshottingSettings()).thenReturn(settings);
 
 		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
 		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
@@ -98,9 +95,7 @@ public class CheckpointConfigHandlerTest {
 			false); // at least once
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-		when(tracker.getSnapshottingSettings()).thenReturn(settings);
+		when(graph.getJobSnapshottingSettings()).thenReturn(settings);
 
 		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
 		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
@@ -130,9 +125,7 @@ public class CheckpointConfigHandlerTest {
 			false); // at least once
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-		when(tracker.getSnapshottingSettings()).thenReturn(settings);
+		when(graph.getJobSnapshottingSettings()).thenReturn(settings);
 
 		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
 		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index 17c8558..fb5cfc5 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
@@ -89,9 +88,7 @@ public class CheckpointStatsDetailsHandlerTest {
 		when(snapshot.getHistory()).thenReturn(history);
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-		when(tracker.createSnapshot()).thenReturn(snapshot);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
 		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
@@ -238,16 +235,14 @@ public class CheckpointStatsDetailsHandlerTest {
 
 	// ------------------------------------------------------------------------
 
-	static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
+	private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
 		CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
 		when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
 		CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
 		when(snapshot.getHistory()).thenReturn(history);
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-		when(tracker.createSnapshot()).thenReturn(snapshot);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
 		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
@@ -258,7 +253,7 @@ public class CheckpointStatsDetailsHandlerTest {
 		return mapper.readTree(json);
 	}
 
-	static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
+	private static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
 		long duration = ThreadLocalRandom.current().nextInt(128);
 
 		JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index 8274b36..23a1900 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
@@ -179,9 +178,7 @@ public class CheckpointStatsHandlerTest {
 		when(snapshot.getLatestRestoredCheckpoint()).thenReturn(latestRestored);
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-		when(tracker.createSnapshot()).thenReturn(snapshot);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
 		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
 		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
index 8b7201d..571adad 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
@@ -129,9 +128,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 		when(snapshot.getHistory()).thenReturn(history);
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-		when(tracker.createSnapshot()).thenReturn(snapshot);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
 		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
@@ -186,9 +183,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 		when(snapshot.getHistory()).thenReturn(history);
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-		when(tracker.createSnapshot()).thenReturn(snapshot);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
 		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
@@ -209,9 +204,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 		when(snapshot.getHistory()).thenReturn(history);
 
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
-		when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
-		when(tracker.createSnapshot()).thenReturn(snapshot);
+		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
 		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
index 6c261a5..5b3c7c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import javax.annotation.Nullable;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
 
@@ -30,7 +31,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Base class for checkpoint statistics.
  */
-public abstract class AbstractCheckpointStats {
+public abstract class AbstractCheckpointStats implements Serializable {
+
+	private static final long serialVersionUID = 1041218202028265151L;
 
 	/** ID of this checkpoint. */
 	final long checkpointId;

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
index 56fc9c1..13ce642 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
@@ -307,7 +307,9 @@ public class CheckpointStatsHistory implements Serializable {
 	 *
 	 * <p>The iteration order is in reverse insertion order.
 	 */
-	private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats> {
+	private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats>, Serializable {
+
+		private static final long serialVersionUID = 726376482426055490L;
 
 		/** Copy of the checkpointsArray array at the point when this iterable was created. */
 		private final AbstractCheckpointStats[] checkpointsArray;

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index 92f707f..d324c25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 
 import javax.annotation.Nullable;
-import java.io.Serializable;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.locks.ReentrantLock;
@@ -52,9 +51,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * <p>The statistics are accessed via {@link #createSnapshot()} and exposed via
  * both the web frontend and the {@link Metric} system.
  */
-public class CheckpointStatsTracker implements Serializable {
-
-	private static final long serialVersionUID = 1694085244807339288L;
+public class CheckpointStatsTracker {
 
 	/**
 	 * Lock used to update stats and creating snapshots. Updates always happen
@@ -67,9 +64,6 @@ public class CheckpointStatsTracker implements Serializable {
 	 */
 	private final ReentrantLock statsReadWriteLock = new ReentrantLock();
 
-	/** The job vertices taking part in the checkpoints. */
-	private final List<ExecutionJobVertex> jobVertices;
-
 	/** Total number of subtasks to checkpoint. */
 	private final int totalSubtaskCount;
 
@@ -85,6 +79,9 @@ public class CheckpointStatsTracker implements Serializable {
 	/** History of checkpoints. */
 	private final CheckpointStatsHistory history;
 
+	/** The job vertices taking part in the checkpoints. */
+	private final transient List<ExecutionJobVertex> jobVertices;
+
 	/** The latest restored checkpoint. */
 	@Nullable
 	private RestoredCheckpointStats latestRestoredCheckpoint;
@@ -217,6 +214,11 @@ public class CheckpointStatsTracker implements Serializable {
 		return pending;
 	}
 
+	/**
+	 * Callback when a checkpoint is restored.
+	 *
+	 * @param restored The restored checkpoint stats.
+	 */
 	void reportRestoredCheckpoint(RestoredCheckpointStats restored) {
 		checkNotNull(restored, "Restored checkpoint");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
index 4d2d995..f6b6aed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
@@ -35,8 +35,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class CompletedCheckpointStats extends AbstractCheckpointStats {
 
-	/** Callback for the {@link CompletedCheckpoint} instance to notify about discard. */
-	private final DiscardCallback discardCallback;
+	private static final long serialVersionUID = 138833868551861343L;
 
 	/** Total checkpoint state size over all subtasks. */
 	private final long stateSize;
@@ -69,16 +68,16 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
 	 * @param externalPath Optional external path if persisted externally.
 	 */
 	CompletedCheckpointStats(
-		long checkpointId,
-		long triggerTimestamp,
-		CheckpointProperties props,
-		int totalSubtaskCount,
-		Map<JobVertexID, TaskStateStats> taskStats,
-		int numAcknowledgedSubtasks,
-		long stateSize,
-		long alignmentBuffered,
-		SubtaskStateStats latestAcknowledgedSubtask,
-		@Nullable String externalPath) {
+			long checkpointId,
+			long triggerTimestamp,
+			CheckpointProperties props,
+			int totalSubtaskCount,
+			Map<JobVertexID, TaskStateStats> taskStats,
+			int numAcknowledgedSubtasks,
+			long stateSize,
+			long alignmentBuffered,
+			SubtaskStateStats latestAcknowledgedSubtask,
+			@Nullable String externalPath) {
 
 		super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
 		checkArgument(numAcknowledgedSubtasks == totalSubtaskCount, "Did not acknowledge all subtasks.");
@@ -87,7 +86,6 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
 		this.alignmentBuffered = alignmentBuffered;
 		this.latestAcknowledgedSubtask = checkNotNull(latestAcknowledgedSubtask);
 		this.externalPath = externalPath;
-		this.discardCallback = new DiscardCallback();
 	}
 
 	@Override
@@ -145,7 +143,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
 	 * @return Callback for the {@link CompletedCheckpoint}.
 	 */
 	DiscardCallback getDiscardCallback() {
-		return discardCallback;
+		return new DiscardCallback();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
index 83d7c3d..2f596a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
@@ -32,6 +32,8 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  */
 public class FailedCheckpointStats extends AbstractCheckpointStats {
 
+	private static final long serialVersionUID = 8000748529515900106L;
+
 	/** Number of acknowledged tasks. */
 	private final int numAcknowledgedSubtasks;
 
@@ -71,17 +73,17 @@ public class FailedCheckpointStats extends AbstractCheckpointStats {
 	 * @param cause Cause of the checkpoint failure or <code>null</code>.
 	 */
 	FailedCheckpointStats(
-		long checkpointId,
-		long triggerTimestamp,
-		CheckpointProperties props,
-		int totalSubtaskCount,
-		Map<JobVertexID, TaskStateStats> taskStats,
-		int numAcknowledgedSubtasks,
-		long stateSize,
-		long alignmentBuffered,
-		long failureTimestamp,
-		@Nullable SubtaskStateStats latestAcknowledgedSubtask,
-		@Nullable Throwable cause) {
+			long checkpointId,
+			long triggerTimestamp,
+			CheckpointProperties props,
+			int totalSubtaskCount,
+			Map<JobVertexID, TaskStateStats> taskStats,
+			int numAcknowledgedSubtasks,
+			long stateSize,
+			long alignmentBuffered,
+			long failureTimestamp,
+			@Nullable SubtaskStateStats latestAcknowledgedSubtask,
+			@Nullable Throwable cause) {
 
 		super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
 		checkArgument(numAcknowledgedSubtasks >= 0, "Negative number of ACKs");

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
index e6fa80f..0f32250 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
@@ -42,8 +42,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class PendingCheckpointStats extends AbstractCheckpointStats {
 
+	private static final long serialVersionUID = -973959257699390327L;
+
 	/** Tracker callback when the pending checkpoint is finalized or aborted. */
-	private final CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback;
+	private transient final CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback;
 
 	/** The current number of acknowledged subtasks. */
 	private volatile int currentNumAcknowledgedSubtasks;

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
index c21937a..8b8a5e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
@@ -56,7 +56,7 @@ public class RestoredCheckpointStats implements Serializable {
 			long checkpointId,
 			CheckpointProperties props,
 			long restoreTimestamp,
-			String externalPath) {
+			@Nullable String externalPath) {
 
 		this.checkpointId = checkpointId;
 		this.props = checkNotNull(props, "Checkpoint Properties");

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
index 3a66032..ee9e287 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import java.io.Serializable;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /**
@@ -30,8 +32,10 @@ import static org.apache.flink.util.Preconditions.checkArgument;
  *
  * <p>This is the smallest immutable unit of the stats.
  */
-public class SubtaskStateStats {
-	
+public class SubtaskStateStats implements Serializable {
+
+	private static final long serialVersionUID = 8928594531621862214L;
+
 	/** Index of this sub task. */
 	private final int subtaskIndex;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
index fc118d9..2f779a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import javax.annotation.Nullable;
+import java.io.Serializable;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -29,7 +30,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Statistics for a single task/operator that gathers all statistics of its
  * subtasks and provides summary statistics about all subtasks.
  */
-public class TaskStateStats {
+public class TaskStateStats implements Serializable {
+
+	private static final long serialVersionUID = 531803101206574444L;
 
 	/** ID of the task the stats belong to. */
 	private final JobVertexID jobVertexId;
@@ -195,7 +198,9 @@ public class TaskStateStats {
 	/**
 	 * Summary of the subtask stats of a single task/operator.
 	 */
-	public static class TaskStateStatsSummary {
+	public static class TaskStateStatsSummary implements Serializable {
+
+		private static final long serialVersionUID = 1009476026522091909L;
 
 		private MinMaxAvgStats stateSize = new MinMaxAvgStats();
 		private MinMaxAvgStats ackTimestamp = new MinMaxAvgStats();

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 3490dc8..18c2ec2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -17,13 +17,14 @@
  */
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
-import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.util.SerializedValue;
 
 import java.io.IOException;
@@ -115,11 +116,20 @@ public interface AccessExecutionGraph {
 	CheckpointCoordinator getCheckpointCoordinator();
 
 	/**
-	 * Returns the {@link CheckpointStatsTracker} for this execution graph.
+	 * Returns the {@link JobSnapshottingSettings} or <code>null</code> if
+	 * checkpointing is disabled.
+	 *
+	 * @return JobSnapshottingSettings for this execution graph
+	 */
+	JobSnapshottingSettings getJobSnapshottingSettings();
+
+	/**
+	 * Returns a snapshot of the checkpoint statistics or <code>null</code> if
+	 * checkpointing is disabled.
 	 *
-	 * @return CheckpointStatsTracker for thie execution graph
+	 * @return Snapshot of the checkpoint statistics for this execution graph
 	 */
-	CheckpointStatsTracker getCheckpointStatsTracker();
+	CheckpointStatsSnapshot getCheckpointStatsSnapshot();
 
 	/**
 	 * Returns the {@link ArchivedExecutionConfig} for this execution graph.

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 440ecda..334b0d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -21,11 +21,13 @@ import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.util.SerializedValue;
 
+import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.Iterator;
@@ -34,6 +36,7 @@ import java.util.Map;
 import java.util.NoSuchElementException;
 
 public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializable {
+
 	private static final long serialVersionUID = 7231383912742578428L;
 	// --------------------------------------------------------------------------------------------
 
@@ -76,23 +79,29 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 	private final ArchivedExecutionConfig archivedExecutionConfig;
 	private final boolean isStoppable;
 	private final Map<String, SerializedValue<Object>> serializedUserAccumulators;
-	private final CheckpointStatsTracker tracker;
+
+	@Nullable
+	private final JobSnapshottingSettings jobSnapshottingSettings;
+
+	@Nullable
+	private final CheckpointStatsSnapshot checkpointStatsSnapshot;
 
 	public ArchivedExecutionGraph(
-		JobID jobID,
-		String jobName,
-		Map<JobVertexID, ArchivedExecutionJobVertex> tasks,
-		List<ArchivedExecutionJobVertex> verticesInCreationOrder,
-		long[] stateTimestamps,
-		JobStatus state,
-		String failureCause,
-		String jsonPlan,
-		StringifiedAccumulatorResult[] archivedUserAccumulators,
-		Map<String, SerializedValue<Object>> serializedUserAccumulators,
-		ArchivedExecutionConfig executionConfig,
-		boolean isStoppable,
-		CheckpointStatsTracker tracker
-	) {
+			JobID jobID,
+			String jobName,
+			Map<JobVertexID, ArchivedExecutionJobVertex> tasks,
+			List<ArchivedExecutionJobVertex> verticesInCreationOrder,
+			long[] stateTimestamps,
+			JobStatus state,
+			String failureCause,
+			String jsonPlan,
+			StringifiedAccumulatorResult[] archivedUserAccumulators,
+			Map<String, SerializedValue<Object>> serializedUserAccumulators,
+			ArchivedExecutionConfig executionConfig,
+			boolean isStoppable,
+			@Nullable JobSnapshottingSettings jobSnapshottingSettings,
+			@Nullable CheckpointStatsSnapshot checkpointStatsSnapshot) {
+
 		this.jobID = jobID;
 		this.jobName = jobName;
 		this.tasks = tasks;
@@ -105,10 +114,12 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 		this.serializedUserAccumulators = serializedUserAccumulators;
 		this.archivedExecutionConfig = executionConfig;
 		this.isStoppable = isStoppable;
-		this.tracker = tracker;
+		this.jobSnapshottingSettings = jobSnapshottingSettings;
+		this.checkpointStatsSnapshot = checkpointStatsSnapshot;
 	}
 
 	// --------------------------------------------------------------------------------------------
+
 	@Override
 	public String getJsonPlan() {
 		return jsonPlan;
@@ -200,8 +211,13 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 	}
 
 	@Override
-	public CheckpointStatsTracker getCheckpointStatsTracker() {
-		return tracker;
+	public JobSnapshottingSettings getJobSnapshottingSettings() {
+		return jobSnapshottingSettings;
+	}
+
+	@Override
+	public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
+		return checkpointStatsSnapshot;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 2069638..da4a66e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -50,6 +51,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -425,8 +427,21 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	@Override
-	public CheckpointStatsTracker getCheckpointStatsTracker() {
-		return checkpointStatsTracker;
+	public JobSnapshottingSettings getJobSnapshottingSettings() {
+		if (checkpointStatsTracker != null) {
+			return checkpointStatsTracker.getSnapshottingSettings();
+		} else {
+			return null;
+		}
+	}
+
+	@Override
+	public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
+		if (checkpointStatsTracker != null) {
+			return checkpointStatsTracker.createSnapshot();
+		} else {
+			return null;
+		}
 	}
 
 	private ExecutionVertex[] collectExecutionVertices(List<ExecutionJobVertex> jobVertices) {
@@ -1338,6 +1353,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			serializedUserAccumulators,
 			getArchivedExecutionConfig(),
 			isStoppable(),
-			getCheckpointStatsTracker());
+			getJobSnapshottingSettings(),
+			getCheckpointStatsSnapshot());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
index 779fc76..b3b487c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
@@ -26,6 +26,8 @@ import org.apache.flink.annotation.Internal;
 @Internal
 public class ExternalizedCheckpointSettings implements java.io.Serializable {
 
+	private static final long serialVersionUID = -6271691851124392955L;
+
 	private static final ExternalizedCheckpointSettings NONE = new ExternalizedCheckpointSettings(false, false);
 
 	/** Flag indicating whether checkpoints should be externalized. */

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 098fe17..7541806 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -192,5 +192,4 @@ public class CheckpointStatsHistoryTest {
 		return failed;
 	}
 
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
new file mode 100644
index 0000000..6500369
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CheckpointStatsSnapshotTest {
+
+	/**
+	 * Tests that the snapshot is actually serializable.
+	 */
+	@Test
+	public void testIsJavaSerializable() throws Exception {
+		CheckpointStatsCounts counts = new CheckpointStatsCounts();
+		counts.incrementInProgressCheckpoints();
+		counts.incrementInProgressCheckpoints();
+		counts.incrementInProgressCheckpoints();
+		counts.incrementCompletedCheckpoints();
+		counts.incrementFailedCheckpoints();
+		counts.incrementRestoredCheckpoints();
+
+		CompletedCheckpointStatsSummary summary = new CompletedCheckpointStatsSummary();
+		summary.updateSummary(createCompletedCheckpointsStats(12398, 9919, 12));
+		summary.updateSummary(createCompletedCheckpointsStats(2221, 3333, 9122));
+
+		CheckpointStatsHistory history = new CheckpointStatsHistory(1);
+		RestoredCheckpointStats restored = new RestoredCheckpointStats(1, CheckpointProperties.forStandardCheckpoint(), 99119, null);
+
+		CheckpointStatsSnapshot snapshot = new CheckpointStatsSnapshot(
+			counts,
+			summary,
+			history,
+			restored);
+
+		CheckpointStatsSnapshot copy = CommonTestUtils.createCopySerializable(snapshot);
+
+		assertEquals(counts.getNumberOfCompletedCheckpoints(), copy.getCounts().getNumberOfCompletedCheckpoints());
+		assertEquals(counts.getNumberOfFailedCheckpoints(), copy.getCounts().getNumberOfFailedCheckpoints());
+		assertEquals(counts.getNumberOfInProgressCheckpoints(), copy.getCounts().getNumberOfInProgressCheckpoints());
+		assertEquals(counts.getNumberOfRestoredCheckpoints(), copy.getCounts().getNumberOfRestoredCheckpoints());
+		assertEquals(counts.getTotalNumberOfCheckpoints(), copy.getCounts().getTotalNumberOfCheckpoints());
+
+		assertEquals(summary.getStateSizeStats().getSum(), copy.getSummaryStats().getStateSizeStats().getSum());
+		assertEquals(summary.getEndToEndDurationStats().getSum(), copy.getSummaryStats().getEndToEndDurationStats().getSum());
+		assertEquals(summary.getAlignmentBufferedStats().getSum(), copy.getSummaryStats().getAlignmentBufferedStats().getSum());
+
+		assertEquals(restored.getCheckpointId(), copy.getLatestRestoredCheckpoint().getCheckpointId());
+	}
+
+	private CompletedCheckpointStats createCompletedCheckpointsStats(
+			long stateSize,
+			long endToEndDuration,
+			long alignmentBuffered) {
+
+		CompletedCheckpointStats completed = mock(CompletedCheckpointStats.class);
+		when(completed.getStateSize()).thenReturn(stateSize);
+		when(completed.getEndToEndDuration()).thenReturn(endToEndDuration);
+		when(completed.getAlignmentBuffered()).thenReturn(alignmentBuffered);
+
+		return completed;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index e466dc7..0d933ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.junit.Rule;
@@ -143,4 +144,38 @@ public class CompletedCheckpointTest {
 		completed.discard(JobStatus.FINISHED);
 		verify(callback, times(1)).notifyDiscardedCheckpoint();
 	}
+
+	@Test
+	public void testIsJavaSerializable() throws Exception {
+		TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
+		TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
+
+		HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+		taskStats.put(task1.getJobVertexId(), task1);
+		taskStats.put(task2.getJobVertexId(), task2);
+
+		CompletedCheckpointStats completed = new CompletedCheckpointStats(
+			123123123L,
+			10123L,
+			CheckpointProperties.forStandardCheckpoint(),
+			1337,
+			taskStats,
+			1337,
+			123129837912L,
+			123819239812L,
+			new SubtaskStateStats(123, 213123, 123123, 0, 0, 0, 0),
+			null);
+
+		CompletedCheckpointStats copy = CommonTestUtils.createCopySerializable(completed);
+
+		assertEquals(completed.getCheckpointId(), copy.getCheckpointId());
+		assertEquals(completed.getTriggerTimestamp(), copy.getTriggerTimestamp());
+		assertEquals(completed.getProperties(), copy.getProperties());
+		assertEquals(completed.getNumberOfSubtasks(), copy.getNumberOfSubtasks());
+		assertEquals(completed.getNumberOfAcknowledgedSubtasks(), copy.getNumberOfAcknowledgedSubtasks());
+		assertEquals(completed.getEndToEndDuration(), copy.getEndToEndDuration());
+		assertEquals(completed.getStateSize(), copy.getStateSize());
+		assertEquals(completed.getLatestAcknowledgedSubtaskStats().getSubtaskIndex(), copy.getLatestAcknowledgedSubtaskStats().getSubtaskIndex());
+		assertEquals(completed.getStatus(), copy.getStatus());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
index 683c1c9..f1a56be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.junit.Test;
 
+import java.io.NotSerializableException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -57,4 +59,42 @@ public class FailedCheckpointStatsTest {
 
 		assertEquals(duration, failed.getEndToEndDuration());
 	}
+
+	@Test
+	public void testIsJavaSerializable() throws Exception {
+		long duration = 123912931293L;
+		long triggerTimestamp = 10123;
+		long failureTimestamp = triggerTimestamp + duration;
+
+		Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+		JobVertexID jobVertexId = new JobVertexID();
+		taskStats.put(jobVertexId, new TaskStateStats(jobVertexId, 1));
+
+		FailedCheckpointStats failed = new FailedCheckpointStats(
+			123123123L,
+			triggerTimestamp,
+			CheckpointProperties.forStandardCheckpoint(),
+			1337,
+			taskStats,
+			3,
+			190890123,
+			0,
+			failureTimestamp,
+			null,
+			new NotSerializableException("message"));
+
+		FailedCheckpointStats copy = CommonTestUtils.createCopySerializable(failed);
+
+		assertEquals(failed.getCheckpointId(), copy.getCheckpointId());
+		assertEquals(failed.getTriggerTimestamp(), copy.getTriggerTimestamp());
+		assertEquals(failed.getProperties(), copy.getProperties());
+		assertEquals(failed.getNumberOfSubtasks(), copy.getNumberOfSubtasks());
+		assertEquals(failed.getNumberOfAcknowledgedSubtasks(), copy.getNumberOfAcknowledgedSubtasks());
+		assertEquals(failed.getEndToEndDuration(), copy.getEndToEndDuration());
+		assertEquals(failed.getStateSize(), copy.getStateSize());
+		assertEquals(failed.getLatestAcknowledgedSubtaskStats(), copy.getLatestAcknowledgedSubtaskStats());
+		assertEquals(failed.getStatus(), copy.getStatus());
+		assertEquals(failed.getFailureMessage(), copy.getFailureMessage());
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
index 854e106..6c5e8fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.util.HashMap;
-import java.util.Map;
 
 import static junit.framework.TestCase.assertFalse;
 import static org.junit.Assert.assertEquals;
@@ -47,7 +47,7 @@ public class PendingCheckpointStatsTest {
 		TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
 		int totalSubtaskCount = task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks();
 
-		Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+		HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
 		taskStats.put(task1.getJobVertexId(), task1);
 		taskStats.put(task2.getJobVertexId(), task2);
 
@@ -128,7 +128,7 @@ public class PendingCheckpointStatsTest {
 		TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
 		TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
 
-		Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+		HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
 		taskStats.put(task1.getJobVertexId(), task1);
 		taskStats.put(task2.getJobVertexId(), task2);
 
@@ -165,7 +165,6 @@ public class PendingCheckpointStatsTest {
 		assertNotNull(completed);
 		assertEquals(CheckpointStatsStatus.COMPLETED, completed.getStatus());
 		assertFalse(completed.isDiscarded());
-		assertEquals(discardCallback, completed.getDiscardCallback());
 		discardCallback.notifyDiscardedCheckpoint();
 		assertTrue(completed.isDiscarded());
 		assertEquals(externalPath, completed.getExternalPath());
@@ -189,7 +188,7 @@ public class PendingCheckpointStatsTest {
 		TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
 		TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
 
-		Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+		HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
 		taskStats.put(task1.getJobVertexId(), task1);
 		taskStats.put(task2.getJobVertexId(), task2);
 
@@ -240,6 +239,35 @@ public class PendingCheckpointStatsTest {
 		assertEquals(task2, failed.getTaskStateStats(task2.getJobVertexId()));
 	}
 
+	@Test
+	public void testIsJavaSerializable() throws Exception {
+		TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
+		TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
+
+		HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+		taskStats.put(task1.getJobVertexId(), task1);
+		taskStats.put(task2.getJobVertexId(), task2);
+
+		PendingCheckpointStats pending = new PendingCheckpointStats(
+			123123123L,
+			10123L,
+			CheckpointProperties.forStandardCheckpoint(),
+			1337,
+			taskStats,
+			mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class));
+
+		PendingCheckpointStats copy = CommonTestUtils.createCopySerializable(pending);
+
+		assertEquals(pending.getCheckpointId(), copy.getCheckpointId());
+		assertEquals(pending.getTriggerTimestamp(), copy.getTriggerTimestamp());
+		assertEquals(pending.getProperties(), copy.getProperties());
+		assertEquals(pending.getNumberOfSubtasks(), copy.getNumberOfSubtasks());
+		assertEquals(pending.getNumberOfAcknowledgedSubtasks(), copy.getNumberOfAcknowledgedSubtasks());
+		assertEquals(pending.getEndToEndDuration(), copy.getEndToEndDuration());
+		assertEquals(pending.getStateSize(), copy.getStateSize());
+		assertEquals(pending.getLatestAcknowledgedSubtaskStats(), copy.getLatestAcknowledgedSubtaskStats());
+		assertEquals(pending.getStatus(), copy.getStatus());
+	}
 
 	// ------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
index 75c40c5..8514f33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -54,4 +55,39 @@ public class SubtaskStateStatsTest {
 		// Trigger timestamp < ack timestamp
 		assertEquals(0, stats.getEndToEndDuration(ackTimestamp + 1));
 	}
+
+	/**
+	 * Tests that the snapshot is actually serializable.
+	 */
+	@Test
+	public void testIsJavaSerializable() throws Exception {
+		SubtaskStateStats stats = new SubtaskStateStats(
+			0,
+			Integer.MAX_VALUE + 1L,
+			Integer.MAX_VALUE + 2L,
+			Integer.MAX_VALUE + 3L,
+			Integer.MAX_VALUE + 4L,
+			Integer.MAX_VALUE + 5L,
+			Integer.MAX_VALUE + 6L);
+
+		SubtaskStateStats copy = CommonTestUtils.createCopySerializable(stats);
+
+		assertEquals(0, copy.getSubtaskIndex());
+		assertEquals(Integer.MAX_VALUE + 1L, copy.getAckTimestamp());
+		assertEquals(Integer.MAX_VALUE + 2L, copy.getStateSize());
+		assertEquals(Integer.MAX_VALUE + 3L, copy.getSyncCheckpointDuration());
+		assertEquals(Integer.MAX_VALUE + 4L, copy.getAsyncCheckpointDuration());
+		assertEquals(Integer.MAX_VALUE + 5L, copy.getAlignmentBuffered());
+		assertEquals(Integer.MAX_VALUE + 6L, copy.getAlignmentDuration());
+
+		// Check duration helper
+		long ackTimestamp = copy.getAckTimestamp();
+		long triggerTimestamp = ackTimestamp - 10123;
+		assertEquals(10123, copy.getEndToEndDuration(triggerTimestamp));
+
+		// Trigger timestamp < ack timestamp
+		assertEquals(0, copy.getEndToEndDuration(ackTimestamp + 1));
+
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
index 94edd9e..dd46fe2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.junit.Test;
 
@@ -31,6 +32,8 @@ import static org.junit.Assert.assertTrue;
 
 public class TaskStateStatsTest {
 
+	private final ThreadLocalRandom rand = ThreadLocalRandom.current();
+
 	/**
 	 * Tests that subtask stats are correctly collected.
 	 */
@@ -48,8 +51,6 @@ public class TaskStateStatsTest {
 		assertEquals(-1, taskStats.getLatestAckTimestamp());
 		assertArrayEquals(subtasks, taskStats.getSubtaskStats());
 
-		ThreadLocalRandom rand = ThreadLocalRandom.current();
-
 		long stateSize = 0;
 		long alignmentBuffered = 0;
 
@@ -90,4 +91,45 @@ public class TaskStateStatsTest {
 		assertEquals(subtasks.length, summary.getAlignmentBufferedStats().getCount());
 		assertEquals(subtasks.length, summary.getAlignmentDurationStats().getCount());
 	}
+
+	@Test
+	public void testIsJavaSerializable() throws Exception {
+		JobVertexID jobVertexId = new JobVertexID();
+		SubtaskStateStats[] subtasks = new SubtaskStateStats[7];
+
+		TaskStateStats taskStats = new TaskStateStats(jobVertexId, subtasks.length);
+
+		long stateSize = 0;
+		long alignmentBuffered = 0;
+
+		for (int i = 0; i < subtasks.length; i++) {
+			subtasks[i] = new SubtaskStateStats(
+				i,
+				rand.nextInt(128),
+				rand.nextInt(128),
+				rand.nextInt(128),
+				rand.nextInt(128),
+				rand.nextInt(128),
+				rand.nextInt(128));
+
+			stateSize += subtasks[i].getStateSize();
+			alignmentBuffered += subtasks[i].getAlignmentBuffered();
+
+			taskStats.reportSubtaskStats(subtasks[i]);
+		}
+
+		TaskStateStats copy = CommonTestUtils.createCopySerializable(taskStats);
+
+		assertEquals(stateSize, copy.getStateSize());
+		assertEquals(alignmentBuffered, copy.getAlignmentBuffered());
+
+		TaskStateStats.TaskStateStatsSummary summary = copy.getSummaryStats();
+		assertEquals(subtasks.length, summary.getStateSizeStats().getCount());
+		assertEquals(subtasks.length, summary.getAckTimestampStats().getCount());
+		assertEquals(subtasks.length, summary.getSyncCheckpointDurationStats().getCount());
+		assertEquals(subtasks.length, summary.getAsyncCheckpointDurationStats().getCount());
+		assertEquals(subtasks.length, summary.getAlignmentBufferedStats().getCount());
+		assertEquals(subtasks.length, summary.getAlignmentDurationStats().getCount());
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 6d7427a..9b1064d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -172,11 +172,8 @@ public class ArchivedExecutionGraphTest {
 		// -------------------------------------------------------------------------------------------------------------
 		// CheckpointStats
 		// -------------------------------------------------------------------------------------------------------------
-		CheckpointStatsTracker runtimeStats = runtimeGraph.getCheckpointStatsTracker();
-		CheckpointStatsTracker archivedStats = archivedGraph.getCheckpointStatsTracker();
-
-		CheckpointStatsSnapshot runtimeSnapshot = runtimeStats.createSnapshot();
-		CheckpointStatsSnapshot archivedSnapshot = archivedStats.createSnapshot();
+		CheckpointStatsSnapshot runtimeSnapshot = runtimeGraph.getCheckpointStatsSnapshot();
+		CheckpointStatsSnapshot archivedSnapshot = archivedGraph.getCheckpointStatsSnapshot();
 
 		assertEquals(runtimeSnapshot.getSummaryStats().getEndToEndDurationStats().getAverage(), archivedSnapshot.getSummaryStats().getEndToEndDurationStats().getAverage());
 		assertEquals(runtimeSnapshot.getSummaryStats().getEndToEndDurationStats().getMinimum(), archivedSnapshot.getSummaryStats().getEndToEndDurationStats().getMinimum());

http://git-wip-us.apache.org/repos/asf/flink/blob/dcfa3fbb/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
new file mode 100644
index 0000000..667dbca
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class JobSnapshottingSettingsTest {
+
+	/**
+	 * Tests that the settings are actually serializable.
+	 */
+	@Test
+	public void testIsJavaSerializable() throws Exception {
+		JobSnapshottingSettings settings = new JobSnapshottingSettings(
+			Arrays.asList(new JobVertexID(), new JobVertexID()),
+			Arrays.asList(new JobVertexID(), new JobVertexID()),
+			Arrays.asList(new JobVertexID(), new JobVertexID()),
+			1231231,
+			1231,
+			112,
+			12,
+			ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+			false);
+
+		JobSnapshottingSettings copy = CommonTestUtils.createCopySerializable(settings);
+		assertEquals(settings.getVerticesToAcknowledge(), copy.getVerticesToAcknowledge());
+		assertEquals(settings.getVerticesToConfirm(), copy.getVerticesToConfirm());
+		assertEquals(settings.getVerticesToTrigger(), copy.getVerticesToTrigger());
+		assertEquals(settings.getCheckpointInterval(), copy.getCheckpointInterval());
+		assertEquals(settings.getCheckpointTimeout(), copy.getCheckpointTimeout());
+		assertEquals(settings.getMinPauseBetweenCheckpoints(), copy.getMinPauseBetweenCheckpoints());
+		assertEquals(settings.getMaxConcurrentCheckpoints(), copy.getMaxConcurrentCheckpoints());
+		assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(), copy.getExternalizedCheckpointSettings().externalizeCheckpoints());
+		assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation());
+		assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
+	}
+}


Mime
View raw message