Repository: flink
Updated Branches:
refs/heads/release-1.2 f523deca8 -> 3e2e49fd9
[FLINK-5628] [webfrontend] Fix serializability of checkpoint stats tracker
This closes #3215.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3e2e49fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3e2e49fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3e2e49fd
Branch: refs/heads/release-1.2
Commit: 3e2e49fd959bc185770b64fa89240f6a7ec80f02
Parents: f523dec
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Jan 25 15:42:24 2017 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Jan 30 16:33:52 2017 +0100
----------------------------------------------------------------------
.../checkpoints/CheckpointConfigHandler.java | 8 +-
.../CheckpointStatsDetailsHandler.java | 7 +-
.../CheckpointStatsDetailsSubtasksHandler.java | 7 +-
.../checkpoints/CheckpointStatsHandler.java | 7 +-
.../CheckpointConfigHandlerTest.java | 13 +--
.../CheckpointStatsDetailsHandlerTest.java | 13 +--
.../checkpoints/CheckpointStatsHandlerTest.java | 5 +-
...heckpointStatsSubtaskDetailsHandlerTest.java | 13 +--
.../checkpoint/AbstractCheckpointStats.java | 5 +-
.../checkpoint/CheckpointStatsHistory.java | 4 +-
.../checkpoint/CheckpointStatsTracker.java | 16 ++--
.../checkpoint/CompletedCheckpointStats.java | 26 +++---
.../checkpoint/FailedCheckpointStats.java | 24 +++---
.../checkpoint/PendingCheckpointStats.java | 4 +-
.../checkpoint/RestoredCheckpointStats.java | 2 +-
.../runtime/checkpoint/SubtaskStateStats.java | 8 +-
.../runtime/checkpoint/TaskStateStats.java | 9 ++-
.../executiongraph/AccessExecutionGraph.java | 20 +++--
.../executiongraph/ArchivedExecutionGraph.java | 54 ++++++++-----
.../runtime/executiongraph/ExecutionGraph.java | 22 ++++-
.../tasks/ExternalizedCheckpointSettings.java | 2 +
.../checkpoint/CheckpointStatsHistoryTest.java | 1 -
.../checkpoint/CheckpointStatsSnapshotTest.java | 84 ++++++++++++++++++++
.../checkpoint/CompletedCheckpointTest.java | 35 ++++++++
.../checkpoint/FailedCheckpointStatsTest.java | 40 ++++++++++
.../checkpoint/PendingCheckpointStatsTest.java | 38 +++++++--
.../checkpoint/SubtaskStateStatsTest.java | 36 +++++++++
.../runtime/checkpoint/TaskStateStatsTest.java | 46 ++++++++++-
.../ArchivedExecutionGraphTest.java | 7 +-
.../tasks/JobSnapshottingSettingsTest.java | 59 ++++++++++++++
30 files changed, 490 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index 1ad5e65..be0d283 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.core.JsonGenerator;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
@@ -42,10 +41,13 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
@Override
public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
StringWriter writer = new StringWriter();
+
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+ JobSnapshottingSettings settings = graph.getJobSnapshottingSettings();
- CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
- JobSnapshottingSettings settings = tracker.getSnapshottingSettings();
+ if (settings == null) {
+ return "{}";
+ }
gen.writeStartObject();
{
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 6bb8300..33d6cf7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -22,7 +22,6 @@ import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
@@ -54,8 +53,10 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
return "{}";
}
- CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
- CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+ CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+ if (snapshot == null) {
+ return "{}";
+ }
AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index 00195b5..d55467f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
@@ -72,8 +71,10 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
return "{}";
}
- CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
- CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+ CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+ if (snapshot == null) {
+ return "{}";
+ }
AbstractCheckpointStats checkpoint = snapshot.getHistory().getCheckpointById(checkpointId);
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 71f3637..8aab5fa 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
@@ -54,8 +53,10 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
- CheckpointStatsTracker tracker = graph.getCheckpointStatsTracker();
- CheckpointStatsSnapshot snapshot = tracker.createSnapshot();
+ CheckpointStatsSnapshot snapshot = graph.getCheckpointStatsSnapshot();
+ if (snapshot == null) {
+ return "{}";
+ }
gen.writeStartObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index 410e044..e517c3c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
@@ -60,9 +59,7 @@ public class CheckpointConfigHandlerTest {
true);
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
- when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
- when(tracker.getSnapshottingSettings()).thenReturn(settings);
+ when(graph.getJobSnapshottingSettings()).thenReturn(settings);
CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
@@ -98,9 +95,7 @@ public class CheckpointConfigHandlerTest {
false); // at least once
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
- when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
- when(tracker.getSnapshottingSettings()).thenReturn(settings);
+ when(graph.getJobSnapshottingSettings()).thenReturn(settings);
CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
@@ -130,9 +125,7 @@ public class CheckpointConfigHandlerTest {
false); // at least once
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
- when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
- when(tracker.getSnapshottingSettings()).thenReturn(settings);
+ when(graph.getJobSnapshottingSettings()).thenReturn(settings);
CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index 17c8558..fb5cfc5 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
@@ -89,9 +88,7 @@ public class CheckpointStatsDetailsHandlerTest {
when(snapshot.getHistory()).thenReturn(history);
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
- when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
- when(tracker.createSnapshot()).thenReturn(snapshot);
+ when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
@@ -238,16 +235,14 @@ public class CheckpointStatsDetailsHandlerTest {
// ------------------------------------------------------------------------
- static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
+ private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throws Exception {
CheckpointStatsHistory history = mock(CheckpointStatsHistory.class);
when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
CheckpointStatsSnapshot snapshot = mock(CheckpointStatsSnapshot.class);
when(snapshot.getHistory()).thenReturn(history);
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
- when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
- when(tracker.createSnapshot()).thenReturn(snapshot);
+ when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
@@ -258,7 +253,7 @@ public class CheckpointStatsDetailsHandlerTest {
return mapper.readTree(json);
}
- static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
+ private static void verifyTaskNode(TaskStateStats task, JsonNode parentNode) {
long duration = ThreadLocalRandom.current().nextInt(128);
JsonNode taskNode = parentNode.get("tasks").get(task.getJobVertexId().toString());
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index 8274b36..23a1900 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
@@ -179,9 +178,7 @@ public class CheckpointStatsHandlerTest {
when(snapshot.getLatestRestoredCheckpoint()).thenReturn(latestRestored);
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
- when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
- when(tracker.createSnapshot()).thenReturn(snapshot);
+ when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
index 8b7201d..571adad 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
@@ -129,9 +128,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
when(snapshot.getHistory()).thenReturn(history);
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
- when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
- when(tracker.createSnapshot()).thenReturn(snapshot);
+ when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
@@ -186,9 +183,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
when(snapshot.getHistory()).thenReturn(history);
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
- when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
- when(tracker.createSnapshot()).thenReturn(snapshot);
+ when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
@@ -209,9 +204,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
when(snapshot.getHistory()).thenReturn(history);
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class);
- when(graph.getCheckpointStatsTracker()).thenReturn(tracker);
- when(tracker.createSnapshot()).thenReturn(snapshot);
+ when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
Map<String, String> params = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
index 6c261a5..5b3c7c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/AbstractCheckpointStats.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import javax.annotation.Nullable;
+import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
@@ -30,7 +31,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Base class for checkpoint statistics.
*/
-public abstract class AbstractCheckpointStats {
+public abstract class AbstractCheckpointStats implements Serializable {
+
+ private static final long serialVersionUID = 1041218202028265151L;
/** ID of this checkpoint. */
final long checkpointId;
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
index 56fc9c1..13ce642 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
@@ -307,7 +307,9 @@ public class CheckpointStatsHistory implements Serializable {
*
* <p>The iteration order is in reverse insertion order.
*/
- private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats> {
+ private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats>, Serializable {
+
+ private static final long serialVersionUID = 726376482426055490L;
/** Copy of the checkpointsArray array at the point when this iterable was created. */
private final AbstractCheckpointStats[] checkpointsArray;
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index 92f707f..d324c25 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import javax.annotation.Nullable;
-import java.io.Serializable;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
@@ -52,9 +51,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* <p>The statistics are accessed via {@link #createSnapshot()} and exposed via
* both the web frontend and the {@link Metric} system.
*/
-public class CheckpointStatsTracker implements Serializable {
-
- private static final long serialVersionUID = 1694085244807339288L;
+public class CheckpointStatsTracker {
/**
* Lock used to update stats and creating snapshots. Updates always happen
@@ -67,9 +64,6 @@ public class CheckpointStatsTracker implements Serializable {
*/
private final ReentrantLock statsReadWriteLock = new ReentrantLock();
- /** The job vertices taking part in the checkpoints. */
- private final List<ExecutionJobVertex> jobVertices;
-
/** Total number of subtasks to checkpoint. */
private final int totalSubtaskCount;
@@ -85,6 +79,9 @@ public class CheckpointStatsTracker implements Serializable {
/** History of checkpoints. */
private final CheckpointStatsHistory history;
+ /** The job vertices taking part in the checkpoints. */
+ private final transient List<ExecutionJobVertex> jobVertices;
+
/** The latest restored checkpoint. */
@Nullable
private RestoredCheckpointStats latestRestoredCheckpoint;
@@ -217,6 +214,11 @@ public class CheckpointStatsTracker implements Serializable {
return pending;
}
+ /**
+ * Callback when a checkpoint is restored.
+ *
+ * @param restored The restored checkpoint stats.
+ */
void reportRestoredCheckpoint(RestoredCheckpointStats restored) {
checkNotNull(restored, "Restored checkpoint");
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
index 4d2d995..f6b6aed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStats.java
@@ -35,8 +35,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class CompletedCheckpointStats extends AbstractCheckpointStats {
- /** Callback for the {@link CompletedCheckpoint} instance to notify about discard. */
- private final DiscardCallback discardCallback;
+ private static final long serialVersionUID = 138833868551861343L;
/** Total checkpoint state size over all subtasks. */
private final long stateSize;
@@ -69,16 +68,16 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
* @param externalPath Optional external path if persisted externally.
*/
CompletedCheckpointStats(
- long checkpointId,
- long triggerTimestamp,
- CheckpointProperties props,
- int totalSubtaskCount,
- Map<JobVertexID, TaskStateStats> taskStats,
- int numAcknowledgedSubtasks,
- long stateSize,
- long alignmentBuffered,
- SubtaskStateStats latestAcknowledgedSubtask,
- @Nullable String externalPath) {
+ long checkpointId,
+ long triggerTimestamp,
+ CheckpointProperties props,
+ int totalSubtaskCount,
+ Map<JobVertexID, TaskStateStats> taskStats,
+ int numAcknowledgedSubtasks,
+ long stateSize,
+ long alignmentBuffered,
+ SubtaskStateStats latestAcknowledgedSubtask,
+ @Nullable String externalPath) {
super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
checkArgument(numAcknowledgedSubtasks == totalSubtaskCount, "Did not acknowledge all subtasks.");
@@ -87,7 +86,6 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
this.alignmentBuffered = alignmentBuffered;
this.latestAcknowledgedSubtask = checkNotNull(latestAcknowledgedSubtask);
this.externalPath = externalPath;
- this.discardCallback = new DiscardCallback();
}
@Override
@@ -145,7 +143,7 @@ public class CompletedCheckpointStats extends AbstractCheckpointStats {
* @return Callback for the {@link CompletedCheckpoint}.
*/
DiscardCallback getDiscardCallback() {
- return discardCallback;
+ return new DiscardCallback();
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
index 83d7c3d..2f596a7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStats.java
@@ -32,6 +32,8 @@ import static org.apache.flink.util.Preconditions.checkArgument;
*/
public class FailedCheckpointStats extends AbstractCheckpointStats {
+ private static final long serialVersionUID = 8000748529515900106L;
+
/** Number of acknowledged tasks. */
private final int numAcknowledgedSubtasks;
@@ -71,17 +73,17 @@ public class FailedCheckpointStats extends AbstractCheckpointStats {
* @param cause Cause of the checkpoint failure or <code>null</code>.
*/
FailedCheckpointStats(
- long checkpointId,
- long triggerTimestamp,
- CheckpointProperties props,
- int totalSubtaskCount,
- Map<JobVertexID, TaskStateStats> taskStats,
- int numAcknowledgedSubtasks,
- long stateSize,
- long alignmentBuffered,
- long failureTimestamp,
- @Nullable SubtaskStateStats latestAcknowledgedSubtask,
- @Nullable Throwable cause) {
+ long checkpointId,
+ long triggerTimestamp,
+ CheckpointProperties props,
+ int totalSubtaskCount,
+ Map<JobVertexID, TaskStateStats> taskStats,
+ int numAcknowledgedSubtasks,
+ long stateSize,
+ long alignmentBuffered,
+ long failureTimestamp,
+ @Nullable SubtaskStateStats latestAcknowledgedSubtask,
+ @Nullable Throwable cause) {
super(checkpointId, triggerTimestamp, props, totalSubtaskCount, taskStats);
checkArgument(numAcknowledgedSubtasks >= 0, "Negative number of ACKs");
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
index e6fa80f..0f32250 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStats.java
@@ -42,8 +42,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
*/
public class PendingCheckpointStats extends AbstractCheckpointStats {
+ private static final long serialVersionUID = -973959257699390327L;
+
/** Tracker callback when the pending checkpoint is finalized or aborted. */
- private final CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback;
+ private transient final CheckpointStatsTracker.PendingCheckpointStatsCallback trackerCallback;
/** The current number of acknowledged subtasks. */
private volatile int currentNumAcknowledgedSubtasks;
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
index c21937a..8b8a5e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStats.java
@@ -56,7 +56,7 @@ public class RestoredCheckpointStats implements Serializable {
long checkpointId,
CheckpointProperties props,
long restoreTimestamp,
- String externalPath) {
+ @Nullable String externalPath) {
this.checkpointId = checkpointId;
this.props = checkNotNull(props, "Checkpoint Properties");
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
index 3a66032..ee9e287 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SubtaskStateStats.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.checkpoint;
+import java.io.Serializable;
+
import static org.apache.flink.util.Preconditions.checkArgument;
/**
@@ -30,8 +32,10 @@ import static org.apache.flink.util.Preconditions.checkArgument;
*
* <p>This is the smallest immutable unit of the stats.
*/
-public class SubtaskStateStats {
-
+public class SubtaskStateStats implements Serializable {
+
+ private static final long serialVersionUID = 8928594531621862214L;
+
/** Index of this sub task. */
private final int subtaskIndex;
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
index fc118d9..2f779a1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateStats.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import javax.annotation.Nullable;
+import java.io.Serializable;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -29,7 +30,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
* Statistics for a single task/operator that gathers all statistics of its
* subtasks and provides summary statistics about all subtasks.
*/
-public class TaskStateStats {
+public class TaskStateStats implements Serializable {
+
+ private static final long serialVersionUID = 531803101206574444L;
/** ID of the task the stats belong to. */
private final JobVertexID jobVertexId;
@@ -195,7 +198,9 @@ public class TaskStateStats {
/**
* Summary of the subtask stats of a single task/operator.
*/
- public static class TaskStateStatsSummary {
+ public static class TaskStateStatsSummary implements Serializable {
+
+ private static final long serialVersionUID = 1009476026522091909L;
private MinMaxAvgStats stateSize = new MinMaxAvgStats();
private MinMaxAvgStats ackTimestamp = new MinMaxAvgStats();
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 3490dc8..18c2ec2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -17,13 +17,14 @@
*/
package org.apache.flink.runtime.executiongraph;
+import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
-import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.util.SerializedValue;
import java.io.IOException;
@@ -115,11 +116,20 @@ public interface AccessExecutionGraph {
CheckpointCoordinator getCheckpointCoordinator();
/**
- * Returns the {@link CheckpointStatsTracker} for this execution graph.
+ * Returns the {@link JobSnapshottingSettings} or <code>null</code> if
+ * checkpointing is disabled.
+ *
+ * @return JobSnapshottingSettings for this execution graph
+ */
+ JobSnapshottingSettings getJobSnapshottingSettings();
+
+ /**
+ * Returns a snapshot of the checkpoint statistics or <code>null</code> if
+ * checkpointing is disabled.
*
- * @return CheckpointStatsTracker for thie execution graph
+ * @return Snapshot of the checkpoint statistics for this execution graph
*/
- CheckpointStatsTracker getCheckpointStatsTracker();
+ CheckpointStatsSnapshot getCheckpointStatsSnapshot();
/**
* Returns the {@link ArchivedExecutionConfig} for this execution graph.
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 440ecda..334b0d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -21,11 +21,13 @@ import org.apache.flink.api.common.ArchivedExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.util.SerializedValue;
+import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
@@ -34,6 +36,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializable {
+
private static final long serialVersionUID = 7231383912742578428L;
// --------------------------------------------------------------------------------------------
@@ -76,23 +79,29 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
private final ArchivedExecutionConfig archivedExecutionConfig;
private final boolean isStoppable;
private final Map<String, SerializedValue<Object>> serializedUserAccumulators;
- private final CheckpointStatsTracker tracker;
+
+ @Nullable
+ private final JobSnapshottingSettings jobSnapshottingSettings;
+
+ @Nullable
+ private final CheckpointStatsSnapshot checkpointStatsSnapshot;
public ArchivedExecutionGraph(
- JobID jobID,
- String jobName,
- Map<JobVertexID, ArchivedExecutionJobVertex> tasks,
- List<ArchivedExecutionJobVertex> verticesInCreationOrder,
- long[] stateTimestamps,
- JobStatus state,
- String failureCause,
- String jsonPlan,
- StringifiedAccumulatorResult[] archivedUserAccumulators,
- Map<String, SerializedValue<Object>> serializedUserAccumulators,
- ArchivedExecutionConfig executionConfig,
- boolean isStoppable,
- CheckpointStatsTracker tracker
- ) {
+ JobID jobID,
+ String jobName,
+ Map<JobVertexID, ArchivedExecutionJobVertex> tasks,
+ List<ArchivedExecutionJobVertex> verticesInCreationOrder,
+ long[] stateTimestamps,
+ JobStatus state,
+ String failureCause,
+ String jsonPlan,
+ StringifiedAccumulatorResult[] archivedUserAccumulators,
+ Map<String, SerializedValue<Object>> serializedUserAccumulators,
+ ArchivedExecutionConfig executionConfig,
+ boolean isStoppable,
+ @Nullable JobSnapshottingSettings jobSnapshottingSettings,
+ @Nullable CheckpointStatsSnapshot checkpointStatsSnapshot) {
+
this.jobID = jobID;
this.jobName = jobName;
this.tasks = tasks;
@@ -105,10 +114,12 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
this.serializedUserAccumulators = serializedUserAccumulators;
this.archivedExecutionConfig = executionConfig;
this.isStoppable = isStoppable;
- this.tracker = tracker;
+ this.jobSnapshottingSettings = jobSnapshottingSettings;
+ this.checkpointStatsSnapshot = checkpointStatsSnapshot;
}
// --------------------------------------------------------------------------------------------
+
@Override
public String getJsonPlan() {
return jsonPlan;
@@ -200,8 +211,13 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
}
@Override
- public CheckpointStatsTracker getCheckpointStatsTracker() {
- return tracker;
+ public JobSnapshottingSettings getJobSnapshottingSettings() {
+ return jobSnapshottingSettings;
+ }
+
+ @Override
+ public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
+ return checkpointStatsSnapshot;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index b9e6c84..66aec8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.execution.ExecutionState;
@@ -50,6 +51,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -425,8 +427,21 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
}
@Override
- public CheckpointStatsTracker getCheckpointStatsTracker() {
- return checkpointStatsTracker;
+ public JobSnapshottingSettings getJobSnapshottingSettings() {
+ if (checkpointStatsTracker != null) {
+ return checkpointStatsTracker.getSnapshottingSettings();
+ } else {
+ return null;
+ }
+ }
+
+ @Override
+ public CheckpointStatsSnapshot getCheckpointStatsSnapshot() {
+ if (checkpointStatsTracker != null) {
+ return checkpointStatsTracker.createSnapshot();
+ } else {
+ return null;
+ }
}
private ExecutionVertex[] collectExecutionVertices(List<ExecutionJobVertex> jobVertices) {
@@ -1323,6 +1338,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
serializedUserAccumulators,
getArchivedExecutionConfig(),
isStoppable(),
- getCheckpointStatsTracker());
+ getJobSnapshottingSettings(),
+ getCheckpointStatsSnapshot());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
index 779fc76..b3b487c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
@@ -26,6 +26,8 @@ import org.apache.flink.annotation.Internal;
@Internal
public class ExternalizedCheckpointSettings implements java.io.Serializable {
+ private static final long serialVersionUID = -6271691851124392955L;
+
private static final ExternalizedCheckpointSettings NONE = new ExternalizedCheckpointSettings(false, false);
/** Flag indicating whether checkpoints should be externalized. */
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
index 098fe17..7541806 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistoryTest.java
@@ -192,5 +192,4 @@ public class CheckpointStatsHistoryTest {
return failed;
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
new file mode 100644
index 0000000..6500369
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshotTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CheckpointStatsSnapshotTest {
+
+ /**
+ * Tests that the snapshot is actually serializable.
+ */
+ @Test
+ public void testIsJavaSerializable() throws Exception {
+ CheckpointStatsCounts counts = new CheckpointStatsCounts();
+ counts.incrementInProgressCheckpoints();
+ counts.incrementInProgressCheckpoints();
+ counts.incrementInProgressCheckpoints();
+ counts.incrementCompletedCheckpoints();
+ counts.incrementFailedCheckpoints();
+ counts.incrementRestoredCheckpoints();
+
+ CompletedCheckpointStatsSummary summary = new CompletedCheckpointStatsSummary();
+ summary.updateSummary(createCompletedCheckpointsStats(12398, 9919, 12));
+ summary.updateSummary(createCompletedCheckpointsStats(2221, 3333, 9122));
+
+ CheckpointStatsHistory history = new CheckpointStatsHistory(1);
+ RestoredCheckpointStats restored = new RestoredCheckpointStats(1, CheckpointProperties.forStandardCheckpoint(), 99119, null);
+
+ CheckpointStatsSnapshot snapshot = new CheckpointStatsSnapshot(
+ counts,
+ summary,
+ history,
+ restored);
+
+ CheckpointStatsSnapshot copy = CommonTestUtils.createCopySerializable(snapshot);
+
+ assertEquals(counts.getNumberOfCompletedCheckpoints(), copy.getCounts().getNumberOfCompletedCheckpoints());
+ assertEquals(counts.getNumberOfFailedCheckpoints(), copy.getCounts().getNumberOfFailedCheckpoints());
+ assertEquals(counts.getNumberOfInProgressCheckpoints(), copy.getCounts().getNumberOfInProgressCheckpoints());
+ assertEquals(counts.getNumberOfRestoredCheckpoints(), copy.getCounts().getNumberOfRestoredCheckpoints());
+ assertEquals(counts.getTotalNumberOfCheckpoints(), copy.getCounts().getTotalNumberOfCheckpoints());
+
+ assertEquals(summary.getStateSizeStats().getSum(), copy.getSummaryStats().getStateSizeStats().getSum());
+ assertEquals(summary.getEndToEndDurationStats().getSum(), copy.getSummaryStats().getEndToEndDurationStats().getSum());
+ assertEquals(summary.getAlignmentBufferedStats().getSum(), copy.getSummaryStats().getAlignmentBufferedStats().getSum());
+
+ assertEquals(restored.getCheckpointId(), copy.getLatestRestoredCheckpoint().getCheckpointId());
+ }
+
+ private CompletedCheckpointStats createCompletedCheckpointsStats(
+ long stateSize,
+ long endToEndDuration,
+ long alignmentBuffered) {
+
+ CompletedCheckpointStats completed = mock(CompletedCheckpointStats.class);
+ when(completed.getStateSize()).thenReturn(stateSize);
+ when(completed.getEndToEndDuration()).thenReturn(endToEndDuration);
+ when(completed.getAlignmentBuffered()).thenReturn(alignmentBuffered);
+
+ return completed;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index e466dc7..0d933ff 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.checkpoint;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.Rule;
@@ -143,4 +144,38 @@ public class CompletedCheckpointTest {
completed.discard(JobStatus.FINISHED);
verify(callback, times(1)).notifyDiscardedCheckpoint();
}
+
+ @Test
+ public void testIsJavaSerializable() throws Exception {
+ TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
+ TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
+
+ HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+ taskStats.put(task1.getJobVertexId(), task1);
+ taskStats.put(task2.getJobVertexId(), task2);
+
+ CompletedCheckpointStats completed = new CompletedCheckpointStats(
+ 123123123L,
+ 10123L,
+ CheckpointProperties.forStandardCheckpoint(),
+ 1337,
+ taskStats,
+ 1337,
+ 123129837912L,
+ 123819239812L,
+ new SubtaskStateStats(123, 213123, 123123, 0, 0, 0, 0),
+ null);
+
+ CompletedCheckpointStats copy = CommonTestUtils.createCopySerializable(completed);
+
+ assertEquals(completed.getCheckpointId(), copy.getCheckpointId());
+ assertEquals(completed.getTriggerTimestamp(), copy.getTriggerTimestamp());
+ assertEquals(completed.getProperties(), copy.getProperties());
+ assertEquals(completed.getNumberOfSubtasks(), copy.getNumberOfSubtasks());
+ assertEquals(completed.getNumberOfAcknowledgedSubtasks(), copy.getNumberOfAcknowledgedSubtasks());
+ assertEquals(completed.getEndToEndDuration(), copy.getEndToEndDuration());
+ assertEquals(completed.getStateSize(), copy.getStateSize());
+ assertEquals(completed.getLatestAcknowledgedSubtaskStats().getSubtaskIndex(), copy.getLatestAcknowledgedSubtaskStats().getSubtaskIndex());
+ assertEquals(completed.getStatus(), copy.getStatus());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
index 683c1c9..f1a56be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailedCheckpointStatsTest.java
@@ -18,9 +18,11 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.Test;
+import java.io.NotSerializableException;
import java.util.HashMap;
import java.util.Map;
@@ -57,4 +59,42 @@ public class FailedCheckpointStatsTest {
assertEquals(duration, failed.getEndToEndDuration());
}
+
+ @Test
+ public void testIsJavaSerializable() throws Exception {
+ long duration = 123912931293L;
+ long triggerTimestamp = 10123;
+ long failureTimestamp = triggerTimestamp + duration;
+
+ Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+ JobVertexID jobVertexId = new JobVertexID();
+ taskStats.put(jobVertexId, new TaskStateStats(jobVertexId, 1));
+
+ FailedCheckpointStats failed = new FailedCheckpointStats(
+ 123123123L,
+ triggerTimestamp,
+ CheckpointProperties.forStandardCheckpoint(),
+ 1337,
+ taskStats,
+ 3,
+ 190890123,
+ 0,
+ failureTimestamp,
+ null,
+ new NotSerializableException("message"));
+
+ FailedCheckpointStats copy = CommonTestUtils.createCopySerializable(failed);
+
+ assertEquals(failed.getCheckpointId(), copy.getCheckpointId());
+ assertEquals(failed.getTriggerTimestamp(), copy.getTriggerTimestamp());
+ assertEquals(failed.getProperties(), copy.getProperties());
+ assertEquals(failed.getNumberOfSubtasks(), copy.getNumberOfSubtasks());
+ assertEquals(failed.getNumberOfAcknowledgedSubtasks(), copy.getNumberOfAcknowledgedSubtasks());
+ assertEquals(failed.getEndToEndDuration(), copy.getEndToEndDuration());
+ assertEquals(failed.getStateSize(), copy.getStateSize());
+ assertEquals(failed.getLatestAcknowledgedSubtaskStats(), copy.getLatestAcknowledgedSubtaskStats());
+ assertEquals(failed.getStatus(), copy.getStatus());
+ assertEquals(failed.getFailureMessage(), copy.getFailureMessage());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
index 854e106..6c5e8fd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointStatsTest.java
@@ -18,12 +18,12 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.util.HashMap;
-import java.util.Map;
import static junit.framework.TestCase.assertFalse;
import static org.junit.Assert.assertEquals;
@@ -47,7 +47,7 @@ public class PendingCheckpointStatsTest {
TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
int totalSubtaskCount = task1.getNumberOfSubtasks() + task2.getNumberOfSubtasks();
- Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+ HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
taskStats.put(task1.getJobVertexId(), task1);
taskStats.put(task2.getJobVertexId(), task2);
@@ -128,7 +128,7 @@ public class PendingCheckpointStatsTest {
TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
- Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+ HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
taskStats.put(task1.getJobVertexId(), task1);
taskStats.put(task2.getJobVertexId(), task2);
@@ -165,7 +165,6 @@ public class PendingCheckpointStatsTest {
assertNotNull(completed);
assertEquals(CheckpointStatsStatus.COMPLETED, completed.getStatus());
assertFalse(completed.isDiscarded());
- assertEquals(discardCallback, completed.getDiscardCallback());
discardCallback.notifyDiscardedCheckpoint();
assertTrue(completed.isDiscarded());
assertEquals(externalPath, completed.getExternalPath());
@@ -189,7 +188,7 @@ public class PendingCheckpointStatsTest {
TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
- Map<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+ HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
taskStats.put(task1.getJobVertexId(), task1);
taskStats.put(task2.getJobVertexId(), task2);
@@ -240,6 +239,35 @@ public class PendingCheckpointStatsTest {
assertEquals(task2, failed.getTaskStateStats(task2.getJobVertexId()));
}
+ @Test
+ public void testIsJavaSerializable() throws Exception {
+ TaskStateStats task1 = new TaskStateStats(new JobVertexID(), 3);
+ TaskStateStats task2 = new TaskStateStats(new JobVertexID(), 4);
+
+ HashMap<JobVertexID, TaskStateStats> taskStats = new HashMap<>();
+ taskStats.put(task1.getJobVertexId(), task1);
+ taskStats.put(task2.getJobVertexId(), task2);
+
+ PendingCheckpointStats pending = new PendingCheckpointStats(
+ 123123123L,
+ 10123L,
+ CheckpointProperties.forStandardCheckpoint(),
+ 1337,
+ taskStats,
+ mock(CheckpointStatsTracker.PendingCheckpointStatsCallback.class));
+
+ PendingCheckpointStats copy = CommonTestUtils.createCopySerializable(pending);
+
+ assertEquals(pending.getCheckpointId(), copy.getCheckpointId());
+ assertEquals(pending.getTriggerTimestamp(), copy.getTriggerTimestamp());
+ assertEquals(pending.getProperties(), copy.getProperties());
+ assertEquals(pending.getNumberOfSubtasks(), copy.getNumberOfSubtasks());
+ assertEquals(pending.getNumberOfAcknowledgedSubtasks(), copy.getNumberOfAcknowledgedSubtasks());
+ assertEquals(pending.getEndToEndDuration(), copy.getEndToEndDuration());
+ assertEquals(pending.getStateSize(), copy.getStateSize());
+ assertEquals(pending.getLatestAcknowledgedSubtaskStats(), copy.getLatestAcknowledgedSubtaskStats());
+ assertEquals(pending.getStatus(), copy.getStatus());
+ }
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
index 75c40c5..8514f33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SubtaskStateStatsTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@@ -54,4 +55,39 @@ public class SubtaskStateStatsTest {
// Trigger timestamp < ack timestamp
assertEquals(0, stats.getEndToEndDuration(ackTimestamp + 1));
}
+
+ /**
+ * Tests that the snapshot is actually serializable.
+ */
+ @Test
+ public void testIsJavaSerializable() throws Exception {
+ SubtaskStateStats stats = new SubtaskStateStats(
+ 0,
+ Integer.MAX_VALUE + 1L,
+ Integer.MAX_VALUE + 2L,
+ Integer.MAX_VALUE + 3L,
+ Integer.MAX_VALUE + 4L,
+ Integer.MAX_VALUE + 5L,
+ Integer.MAX_VALUE + 6L);
+
+ SubtaskStateStats copy = CommonTestUtils.createCopySerializable(stats);
+
+ assertEquals(0, copy.getSubtaskIndex());
+ assertEquals(Integer.MAX_VALUE + 1L, copy.getAckTimestamp());
+ assertEquals(Integer.MAX_VALUE + 2L, copy.getStateSize());
+ assertEquals(Integer.MAX_VALUE + 3L, copy.getSyncCheckpointDuration());
+ assertEquals(Integer.MAX_VALUE + 4L, copy.getAsyncCheckpointDuration());
+ assertEquals(Integer.MAX_VALUE + 5L, copy.getAlignmentBuffered());
+ assertEquals(Integer.MAX_VALUE + 6L, copy.getAlignmentDuration());
+
+ // Check duration helper
+ long ackTimestamp = copy.getAckTimestamp();
+ long triggerTimestamp = ackTimestamp - 10123;
+ assertEquals(10123, copy.getEndToEndDuration(triggerTimestamp));
+
+ // Trigger timestamp < ack timestamp
+ assertEquals(0, copy.getEndToEndDuration(ackTimestamp + 1));
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
index 94edd9e..dd46fe2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TaskStateStatsTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.checkpoint;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.Test;
@@ -31,6 +32,8 @@ import static org.junit.Assert.assertTrue;
public class TaskStateStatsTest {
+ private final ThreadLocalRandom rand = ThreadLocalRandom.current();
+
/**
* Tests that subtask stats are correctly collected.
*/
@@ -48,8 +51,6 @@ public class TaskStateStatsTest {
assertEquals(-1, taskStats.getLatestAckTimestamp());
assertArrayEquals(subtasks, taskStats.getSubtaskStats());
- ThreadLocalRandom rand = ThreadLocalRandom.current();
-
long stateSize = 0;
long alignmentBuffered = 0;
@@ -90,4 +91,45 @@ public class TaskStateStatsTest {
assertEquals(subtasks.length, summary.getAlignmentBufferedStats().getCount());
assertEquals(subtasks.length, summary.getAlignmentDurationStats().getCount());
}
+
+ @Test
+ public void testIsJavaSerializable() throws Exception {
+ JobVertexID jobVertexId = new JobVertexID();
+ SubtaskStateStats[] subtasks = new SubtaskStateStats[7];
+
+ TaskStateStats taskStats = new TaskStateStats(jobVertexId, subtasks.length);
+
+ long stateSize = 0;
+ long alignmentBuffered = 0;
+
+ for (int i = 0; i < subtasks.length; i++) {
+ subtasks[i] = new SubtaskStateStats(
+ i,
+ rand.nextInt(128),
+ rand.nextInt(128),
+ rand.nextInt(128),
+ rand.nextInt(128),
+ rand.nextInt(128),
+ rand.nextInt(128));
+
+ stateSize += subtasks[i].getStateSize();
+ alignmentBuffered += subtasks[i].getAlignmentBuffered();
+
+ taskStats.reportSubtaskStats(subtasks[i]);
+ }
+
+ TaskStateStats copy = CommonTestUtils.createCopySerializable(taskStats);
+
+ assertEquals(stateSize, copy.getStateSize());
+ assertEquals(alignmentBuffered, copy.getAlignmentBuffered());
+
+ TaskStateStats.TaskStateStatsSummary summary = copy.getSummaryStats();
+ assertEquals(subtasks.length, summary.getStateSizeStats().getCount());
+ assertEquals(subtasks.length, summary.getAckTimestampStats().getCount());
+ assertEquals(subtasks.length, summary.getSyncCheckpointDurationStats().getCount());
+ assertEquals(subtasks.length, summary.getAsyncCheckpointDurationStats().getCount());
+ assertEquals(subtasks.length, summary.getAlignmentBufferedStats().getCount());
+ assertEquals(subtasks.length, summary.getAlignmentDurationStats().getCount());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 6d7427a..9b1064d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -172,11 +172,8 @@ public class ArchivedExecutionGraphTest {
// -------------------------------------------------------------------------------------------------------------
// CheckpointStats
// -------------------------------------------------------------------------------------------------------------
- CheckpointStatsTracker runtimeStats = runtimeGraph.getCheckpointStatsTracker();
- CheckpointStatsTracker archivedStats = archivedGraph.getCheckpointStatsTracker();
-
- CheckpointStatsSnapshot runtimeSnapshot = runtimeStats.createSnapshot();
- CheckpointStatsSnapshot archivedSnapshot = archivedStats.createSnapshot();
+ CheckpointStatsSnapshot runtimeSnapshot = runtimeGraph.getCheckpointStatsSnapshot();
+ CheckpointStatsSnapshot archivedSnapshot = archivedGraph.getCheckpointStatsSnapshot();
assertEquals(runtimeSnapshot.getSummaryStats().getEndToEndDurationStats().getAverage(), archivedSnapshot.getSummaryStats().getEndToEndDurationStats().getAverage());
assertEquals(runtimeSnapshot.getSummaryStats().getEndToEndDurationStats().getMinimum(), archivedSnapshot.getSummaryStats().getEndToEndDurationStats().getMinimum());
http://git-wip-us.apache.org/repos/asf/flink/blob/3e2e49fd/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
new file mode 100644
index 0000000..667dbca
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+public class JobSnapshottingSettingsTest {
+
+ /**
+ * Tests that the settings are actually serializable.
+ */
+ @Test
+ public void testIsJavaSerializable() throws Exception {
+ JobSnapshottingSettings settings = new JobSnapshottingSettings(
+ Arrays.asList(new JobVertexID(), new JobVertexID()),
+ Arrays.asList(new JobVertexID(), new JobVertexID()),
+ Arrays.asList(new JobVertexID(), new JobVertexID()),
+ 1231231,
+ 1231,
+ 112,
+ 12,
+ ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+ false);
+
+ JobSnapshottingSettings copy = CommonTestUtils.createCopySerializable(settings);
+ assertEquals(settings.getVerticesToAcknowledge(), copy.getVerticesToAcknowledge());
+ assertEquals(settings.getVerticesToConfirm(), copy.getVerticesToConfirm());
+ assertEquals(settings.getVerticesToTrigger(), copy.getVerticesToTrigger());
+ assertEquals(settings.getCheckpointInterval(), copy.getCheckpointInterval());
+ assertEquals(settings.getCheckpointTimeout(), copy.getCheckpointTimeout());
+ assertEquals(settings.getMinPauseBetweenCheckpoints(), copy.getMinPauseBetweenCheckpoints());
+ assertEquals(settings.getMaxConcurrentCheckpoints(), copy.getMaxConcurrentCheckpoints());
+ assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(), copy.getExternalizedCheckpointSettings().externalizeCheckpoints());
+ assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation());
+ assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
+ }
+}
|