flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [5/5] flink git commit: [FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST endpoint
Date Mon, 02 Oct 2017 20:29:46 GMT
[FLINK-7710] [flip6] Add CheckpointStatisticsHandler for the new REST endpoint

This commit also makes the CheckpointStatsHistory object serializable by removing the
CheckpointStatsHistoryIterable and replacing it with a static ArrayList.

This closes #4750.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac82becd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac82becd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac82becd

Branch: refs/heads/master
Commit: ac82becd21b7766c18d16abfc7e08334c644507e
Parents: b41f5a6
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Fri Sep 29 15:09:06 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Oct 2 19:59:44 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointStatsHistory.java      | 125 +--
 .../checkpoint/CheckpointStatsSnapshot.java     |   1 +
 .../dispatcher/DispatcherRestEndpoint.java      |  11 +
 .../CheckpointStatisticsHandler.java            | 181 +++++
 .../checkpoints/CheckpointStatsHandler.java     |  95 +--
 .../rest/messages/CheckpointStatistics.java     | 763 +++++++++++++++++++
 .../messages/CheckpointStatisticsHeaders.java   |  68 ++
 .../messages/CheckpointStatisticsTest.java      | 104 +++
 8 files changed, 1200 insertions(+), 148 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac82becd/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
index ce14c2d..9db302c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsHistory.java
@@ -19,13 +19,13 @@
 package org.apache.flink.runtime.checkpoint;
 
 import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.NoSuchElementException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -38,7 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * the maximum number of elements. At this point, the elements wrap around
  * and the least recently added entry is overwritten.
  *
- * <p>Access happens via an checkpointsIterable over the statistics and a map that
+ * <p>Access happens via checkpointsHistory over the statistics and a map that
  * exposes the checkpoint by their ID. Both of these are only guaranteed
  * to reflect the latest state after a call to {@link #createSnapshot()}.
  *
@@ -49,8 +49,8 @@ public class CheckpointStatsHistory implements Serializable {
 
 	private static final long serialVersionUID = 7090320677606528415L;
 
-	/** Iterable over all available stats. Only updated on {@link #createSnapshot()}. */
-	private final Iterable<AbstractCheckpointStats> checkpointsIterable;
+	/** List over all available stats. Only updated on {@link #createSnapshot()}. */
+	private final List<AbstractCheckpointStats> checkpointsHistory;
 
 	/** Map of all available stats keyed by their ID. Only updated on {@link #createSnapshot()}. */
 	private final Map<Long, AbstractCheckpointStats> checkpointsById;
@@ -61,7 +61,7 @@ public class CheckpointStatsHistory implements Serializable {
 	/** Flag indicating whether this the history is read-only. */
 	private final boolean readOnly;
 
-	/** Array of checkpointsArray. Writes go aginst this array. */
+	/** Array of checkpointsArray. Writes go against this array. */
 	private transient AbstractCheckpointStats[] checkpointsArray;
 
 	/** Next position in {@link #checkpointsArray} to write to. */
@@ -107,14 +107,14 @@ public class CheckpointStatsHistory implements Serializable {
 	 *
 	 * @param readOnly Flag indicating whether the history is read-only.
 	 * @param maxSize Maximum history size.
-	 * @param checkpointsIterable Checkpoints iterable.
+	 * @param checkpointsHistory Checkpoints iterable.
 	 * @param checkpointsById Checkpoints by ID.
 	 */
 	private CheckpointStatsHistory(
 			boolean readOnly,
 			int maxSize,
 			AbstractCheckpointStats[] checkpointArray,
-			Iterable<AbstractCheckpointStats> checkpointsIterable,
+			List<AbstractCheckpointStats> checkpointsHistory,
 			Map<Long, AbstractCheckpointStats> checkpointsById,
 			CompletedCheckpointStats latestCompletedCheckpoint,
 			FailedCheckpointStats latestFailedCheckpoint,
@@ -124,15 +124,15 @@ public class CheckpointStatsHistory implements Serializable {
 		checkArgument(maxSize >= 0, "Negative maximum size");
 		this.maxSize = maxSize;
 		this.checkpointsArray = checkpointArray;
-		this.checkpointsIterable = checkNotNull(checkpointsIterable);
+		this.checkpointsHistory = checkNotNull(checkpointsHistory);
 		this.checkpointsById = checkNotNull(checkpointsById);
 		this.latestCompletedCheckpoint = latestCompletedCheckpoint;
 		this.latestFailedCheckpoint = latestFailedCheckpoint;
 		this.latestSavepoint = latestSavepoint;
 	}
 
-	public Iterable<AbstractCheckpointStats> getCheckpoints() {
-		return checkpointsIterable;
+	public List<AbstractCheckpointStats> getCheckpoints() {
+		return checkpointsHistory;
 	}
 
 	public AbstractCheckpointStats getCheckpointById(long checkpointId) {
@@ -164,18 +164,25 @@ public class CheckpointStatsHistory implements Serializable {
 			throw new UnsupportedOperationException("Can't create a snapshot of a read-only history.");
 		}
 
-		Iterable<AbstractCheckpointStats> checkpointsIterable;
+		List<AbstractCheckpointStats> checkpointsHistory;
 		Map<Long, AbstractCheckpointStats> checkpointsById;
 
 		checkpointsById = new HashMap<>(checkpointsArray.length);
 
 		if (maxSize == 0) {
-			checkpointsIterable = Collections.emptyList();
+			checkpointsHistory = Collections.emptyList();
 		} else {
-			// Create snapshot iterator (copies the array)
-			checkpointsIterable = new CheckpointsStatsHistoryIterable(checkpointsArray, nextPos);
+			AbstractCheckpointStats[] newCheckpointsArray = new AbstractCheckpointStats[checkpointsArray.length];
+
+			System.arraycopy(checkpointsArray, nextPos, newCheckpointsArray, 0, checkpointsArray.length - nextPos);
+			System.arraycopy(checkpointsArray, 0, newCheckpointsArray, checkpointsArray.length - nextPos, nextPos);
+
+			checkpointsHistory = Arrays.asList(newCheckpointsArray);
 
-			for (AbstractCheckpointStats checkpoint : checkpointsIterable) {
+			// reverse the order such that we start with the youngest checkpoint
+			Collections.reverse(checkpointsHistory);
+
+			for (AbstractCheckpointStats checkpoint : checkpointsHistory) {
 				checkpointsById.put(checkpoint.getCheckpointId(), checkpoint);
 			}
 		}
@@ -196,7 +203,7 @@ public class CheckpointStatsHistory implements Serializable {
 			true,
 			maxSize,
 			null,
-			checkpointsIterable,
+			checkpointsHistory,
 			checkpointsById,
 			latestCompletedCheckpoint,
 			latestFailedCheckpoint,
@@ -301,88 +308,4 @@ public class CheckpointStatsHistory implements Serializable {
 
 		return false;
 	}
-
-	/**
-	 * Iterable over the current checkpoint history.
-	 *
-	 * <p>The iteration order is in reverse insertion order.
-	 */
-	private static class CheckpointsStatsHistoryIterable implements Iterable<AbstractCheckpointStats>, Serializable {
-
-		private static final long serialVersionUID = 726376482426055490L;
-
-		/** Copy of the checkpointsArray array at the point when this iterable was created. */
-		private final AbstractCheckpointStats[] checkpointsArray;
-
-		/** The starting position from which to iterate over the array. */
-		private final int startPos;
-
-		/**
-		 * Creates the iterable by creating a copy of the checkpoints array.
-		 *
-		 * @param checkpointsArray Checkpoints to iterate over. This array is copied.
-		 * @param nextPos The next write position for the array
-		 */
-		CheckpointsStatsHistoryIterable(AbstractCheckpointStats[] checkpointsArray, int nextPos) {
-			// Copy the array
-			this.checkpointsArray = Arrays.copyOf(checkpointsArray, checkpointsArray.length);
-
-			// Start from nextPos, because that's were the oldest element is
-			this.startPos = nextPos == checkpointsArray.length ? checkpointsArray.length - 1 : nextPos - 1;
-		}
-
-		@Override
-		public Iterator<AbstractCheckpointStats> iterator() {
-			return new CheckpointsSnapshotIterator();
-		}
-
-		/**
-		 * Iterator over the checkpoints array.
-		 */
-		private class CheckpointsSnapshotIterator implements Iterator<AbstractCheckpointStats> {
-
-			/** The current position. */
-			private int currentPos;
-
-			/** The remaining number of elements to iterate over. */
-			private int remaining;
-
-			/**
-			 * Creates the iterator.
-			 */
-			CheckpointsSnapshotIterator() {
-				this.currentPos = startPos;
-				this.remaining = checkpointsArray.length;
-			}
-
-			@Override
-			public boolean hasNext() {
-				return remaining > 0;
-			}
-
-			@Override
-			public AbstractCheckpointStats next() {
-				if (hasNext()) {
-					AbstractCheckpointStats stats = checkpointsArray[currentPos--];
-
-					// Wrap around if needed
-					if (currentPos == -1) {
-						currentPos = checkpointsArray.length - 1;
-					}
-
-					remaining--;
-
-					return stats;
-				} else {
-					throw new NoSuchElementException();
-				}
-			}
-
-			@Override
-			public void remove() {
-				throw new UnsupportedOperationException();
-			}
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ac82becd/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
index e0bfed7..7d787c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsSnapshot.java
@@ -96,6 +96,7 @@ public class CheckpointStatsSnapshot implements Serializable {
 	 *
 	 * @return Latest restored checkpoint or <code>null</code>.
 	 */
+	@Nullable
 	public RestoredCheckpointStats getLatestRestoredCheckpoint() {
 		return latestRestoredCheckpoint;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac82becd/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 7d7e32c..8471078 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
@@ -41,6 +42,7 @@ import org.apache.flink.runtime.rest.handler.legacy.messages.ClusterConfiguratio
 import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.messages.StatusOverviewWithVersion;
 import org.apache.flink.runtime.rest.messages.CheckpointConfigHeaders;
+import org.apache.flink.runtime.rest.messages.CheckpointStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterConfigurationInfoHeaders;
 import org.apache.flink.runtime.rest.messages.ClusterOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
@@ -158,6 +160,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 			executionGraphCache,
 			executor);
 
+		CheckpointStatisticsHandler checkpointStatisticsHandler = new CheckpointStatisticsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			CheckpointStatisticsHeaders.getInstance(),
+			executionGraphCache,
+			executor);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent;
@@ -180,6 +190,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint {
 		handlers.add(Tuple2.of(JobTerminationHeaders.getInstance(), jobTerminationHandler));
 		handlers.add(Tuple2.of(JobConfigHeaders.getInstance(), jobConfigHandler));
 		handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
+		handlers.add(Tuple2.of(CheckpointStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
 
 		// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
 		optWebContent.ifPresent(

http://git-wip-us.apache.org/repos/asf/flink/blob/ac82becd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java
new file mode 100644
index 0000000..21ded78
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticsHandler.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.checkpoints;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler which serves the checkpoint statistics.
+ */
+public class CheckpointStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointStatistics> {
+
+	public CheckpointStatisticsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			MessageHeaders<EmptyRequestBody, CheckpointStatistics, JobMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor) {
+		super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor);
+	}
+
+	@Override
+	protected CheckpointStatistics handleRequest(AccessExecutionGraph executionGraph) throws RestHandlerException {
+
+		final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot();
+
+		if (checkpointStatsSnapshot == null) {
+			throw new RestHandlerException("Checkpointing has not been enabled.", HttpResponseStatus.NOT_FOUND);
+		} else {
+			final CheckpointStatsCounts checkpointStatsCounts = checkpointStatsSnapshot.getCounts();
+
+			final CheckpointStatistics.Counts counts = new CheckpointStatistics.Counts(
+				checkpointStatsCounts.getNumberOfRestoredCheckpoints(),
+				checkpointStatsCounts.getTotalNumberOfCheckpoints(),
+				checkpointStatsCounts.getNumberOfInProgressCheckpoints(),
+				checkpointStatsCounts.getNumberOfCompletedCheckpoints(),
+				checkpointStatsCounts.getNumberOfFailedCheckpoints());
+
+			final CompletedCheckpointStatsSummary checkpointStatsSummary = checkpointStatsSnapshot.getSummaryStats();
+			final MinMaxAvgStats stateSize = checkpointStatsSummary.getStateSizeStats();
+			final MinMaxAvgStats duration = checkpointStatsSummary.getEndToEndDurationStats();
+			final MinMaxAvgStats alignment = checkpointStatsSummary.getAlignmentBufferedStats();
+
+			final CheckpointStatistics.Summary summary = new CheckpointStatistics.Summary(
+				new CheckpointStatistics.MinMaxAvgStatistics(
+					stateSize.getMinimum(),
+					stateSize.getMaximum(),
+					stateSize.getAverage()),
+				new CheckpointStatistics.MinMaxAvgStatistics(
+					duration.getMinimum(),
+					duration.getMaximum(),
+					duration.getAverage()),
+				new CheckpointStatistics.MinMaxAvgStatistics(
+					alignment.getMinimum(),
+					alignment.getMaximum(),
+					alignment.getAverage()));
+
+			final CheckpointStatsHistory checkpointStatsHistory = checkpointStatsSnapshot.getHistory();
+
+			final CheckpointStatistics.CompletedCheckpointStatistics completed = (CheckpointStatistics.CompletedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestCompletedCheckpoint());
+			final CheckpointStatistics.CompletedCheckpointStatistics savepoint = (CheckpointStatistics.CompletedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestSavepoint());
+			final CheckpointStatistics.FailedCheckpointStatistics failed = (CheckpointStatistics.FailedCheckpointStatistics) generateCheckpointStatistics(checkpointStatsHistory.getLatestFailedCheckpoint());
+
+			final RestoredCheckpointStats restoredCheckpointStats = checkpointStatsSnapshot.getLatestRestoredCheckpoint();
+
+			final CheckpointStatistics.RestoredCheckpointStatistics restored;
+
+			if (restoredCheckpointStats == null) {
+				restored = null;
+			} else {
+				restored = new CheckpointStatistics.RestoredCheckpointStatistics(
+					restoredCheckpointStats.getCheckpointId(),
+					restoredCheckpointStats.getRestoreTimestamp(),
+					restoredCheckpointStats.getProperties().isSavepoint(),
+					restoredCheckpointStats.getExternalPath());
+			}
+
+			final CheckpointStatistics.LatestCheckpoints latestCheckpoints = new CheckpointStatistics.LatestCheckpoints(
+				completed,
+				savepoint,
+				failed,
+				restored);
+
+			final List<CheckpointStatistics.BaseCheckpointStatistics> history = new ArrayList<>(16);
+
+			for (AbstractCheckpointStats abstractCheckpointStats : checkpointStatsSnapshot.getHistory().getCheckpoints()) {
+				history.add(generateCheckpointStatistics(abstractCheckpointStats));
+			}
+
+			return new CheckpointStatistics(
+				counts,
+				summary,
+				latestCheckpoints,
+				history);
+		}
+	}
+
+	private static CheckpointStatistics.BaseCheckpointStatistics generateCheckpointStatistics(AbstractCheckpointStats checkpointStats) {
+		if (checkpointStats != null) {
+			if (checkpointStats instanceof CompletedCheckpointStats) {
+				final CompletedCheckpointStats completedCheckpointStats = ((CompletedCheckpointStats) checkpointStats);
+
+				return new CheckpointStatistics.CompletedCheckpointStatistics(
+					completedCheckpointStats.getCheckpointId(),
+					completedCheckpointStats.getStatus(),
+					completedCheckpointStats.getProperties().isSavepoint(),
+					completedCheckpointStats.getTriggerTimestamp(),
+					completedCheckpointStats.getLatestAckTimestamp(),
+					completedCheckpointStats.getStateSize(),
+					completedCheckpointStats.getEndToEndDuration(),
+					completedCheckpointStats.getAlignmentBuffered(),
+					completedCheckpointStats.getNumberOfSubtasks(),
+					completedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+					completedCheckpointStats.getExternalPath(),
+					completedCheckpointStats.isDiscarded());
+			} else if (checkpointStats instanceof FailedCheckpointStats) {
+				final FailedCheckpointStats failedCheckpointStats = ((FailedCheckpointStats) checkpointStats);
+
+				return new CheckpointStatistics.FailedCheckpointStatistics(
+					failedCheckpointStats.getCheckpointId(),
+					failedCheckpointStats.getStatus(),
+					failedCheckpointStats.getProperties().isSavepoint(),
+					failedCheckpointStats.getTriggerTimestamp(),
+					failedCheckpointStats.getLatestAckTimestamp(),
+					failedCheckpointStats.getStateSize(),
+					failedCheckpointStats.getEndToEndDuration(),
+					failedCheckpointStats.getAlignmentBuffered(),
+					failedCheckpointStats.getNumberOfSubtasks(),
+					failedCheckpointStats.getNumberOfAcknowledgedSubtasks(),
+					failedCheckpointStats.getFailureTimestamp(),
+					failedCheckpointStats.getFailureMessage());
+			} else {
+				throw new IllegalArgumentException("Given checkpoint stats object of type " + checkpointStats.getClass().getName() + " cannot be converted.");
+			}
+		} else {
+			return null;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac82becd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
index bbfcd8a..5b35c7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.FlinkException;
@@ -128,37 +129,37 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 	}
 
 	private static void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException {
-		gen.writeObjectFieldStart("counts");
-		gen.writeNumberField("restored", counts.getNumberOfRestoredCheckpoints());
-		gen.writeNumberField("total", counts.getTotalNumberOfCheckpoints());
-		gen.writeNumberField("in_progress", counts.getNumberOfInProgressCheckpoints());
-		gen.writeNumberField("completed", counts.getNumberOfCompletedCheckpoints());
-		gen.writeNumberField("failed", counts.getNumberOfFailedCheckpoints());
+		gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_COUNTS);
+		gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_RESTORED_CHECKPOINTS, counts.getNumberOfRestoredCheckpoints());
+		gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_TOTAL_CHECKPOINTS, counts.getTotalNumberOfCheckpoints());
+		gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_IN_PROGRESS_CHECKPOINTS, counts.getNumberOfInProgressCheckpoints());
+		gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_COMPLETED_CHECKPOINTS, counts.getNumberOfCompletedCheckpoints());
+		gen.writeNumberField(CheckpointStatistics.Counts.FIELD_NAME_FAILED_CHECKPOINTS, counts.getNumberOfFailedCheckpoints());
 		gen.writeEndObject();
 	}
 
 	private static void writeSummary(
 		JsonGenerator gen,
 		CompletedCheckpointStatsSummary summary) throws IOException {
-		gen.writeObjectFieldStart("summary");
-		gen.writeObjectFieldStart("state_size");
+		gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_SUMMARY);
+		gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_STATE_SIZE);
 		writeMinMaxAvg(gen, summary.getStateSizeStats());
 		gen.writeEndObject();
 
-		gen.writeObjectFieldStart("end_to_end_duration");
+		gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_DURATION);
 		writeMinMaxAvg(gen, summary.getEndToEndDurationStats());
 		gen.writeEndObject();
 
-		gen.writeObjectFieldStart("alignment_buffered");
+		gen.writeObjectFieldStart(CheckpointStatistics.Summary.FIELD_NAME_ALIGNMENT_BUFFERED);
 		writeMinMaxAvg(gen, summary.getAlignmentBufferedStats());
 		gen.writeEndObject();
 		gen.writeEndObject();
 	}
 
 	static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
-		gen.writeNumberField("min", minMaxAvg.getMinimum());
-		gen.writeNumberField("max", minMaxAvg.getMaximum());
-		gen.writeNumberField("avg", minMaxAvg.getAverage());
+		gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MINIMUM, minMaxAvg.getMinimum());
+		gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_MAXIMUM, minMaxAvg.getMaximum());
+		gen.writeNumberField(CheckpointStatistics.MinMaxAvgStatistics.FIELD_NAME_AVERAGE, minMaxAvg.getAverage());
 	}
 
 	private static void writeLatestCheckpoints(
@@ -168,15 +169,15 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 		@Nullable FailedCheckpointStats failed,
 		@Nullable RestoredCheckpointStats restored) throws IOException {
 
-		gen.writeObjectFieldStart("latest");
+		gen.writeObjectFieldStart(CheckpointStatistics.FIELD_NAME_LATEST_CHECKPOINTS);
 		// Completed checkpoint
 		if (completed != null) {
-			gen.writeObjectFieldStart("completed");
+			gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_COMPLETED);
 			writeCheckpoint(gen, completed);
 
 			String externalPath = completed.getExternalPath();
 			if (externalPath != null) {
-				gen.writeStringField("external_path", completed.getExternalPath());
+				gen.writeStringField(CheckpointStatistics.CompletedCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, completed.getExternalPath());
 			}
 
 			gen.writeEndObject();
@@ -184,39 +185,39 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 
 		// Completed savepoint
 		if (savepoint != null) {
-			gen.writeObjectFieldStart("savepoint");
+			gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_SAVEPOINT);
 			writeCheckpoint(gen, savepoint);
 
 			String externalPath = savepoint.getExternalPath();
 			if (externalPath != null) {
-				gen.writeStringField("external_path", savepoint.getExternalPath());
+				gen.writeStringField(CheckpointStatistics.CompletedCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, savepoint.getExternalPath());
 			}
 			gen.writeEndObject();
 		}
 
 		// Failed checkpoint
 		if (failed != null) {
-			gen.writeObjectFieldStart("failed");
+			gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_FAILED);
 			writeCheckpoint(gen, failed);
 
-			gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+			gen.writeNumberField(CheckpointStatistics.FailedCheckpointStatistics.FIELD_NAME_FAILURE_TIMESTAMP, failed.getFailureTimestamp());
 			String failureMsg = failed.getFailureMessage();
 			if (failureMsg != null) {
-				gen.writeStringField("failure_message", failureMsg);
+				gen.writeStringField(CheckpointStatistics.FailedCheckpointStatistics.FIELD_NAME_FAILURE_MESSAGE, failureMsg);
 			}
 			gen.writeEndObject();
 		}
 
 		// Restored checkpoint
 		if (restored != null) {
-			gen.writeObjectFieldStart("restored");
-			gen.writeNumberField("id", restored.getCheckpointId());
-			gen.writeNumberField("restore_timestamp", restored.getRestoreTimestamp());
-			gen.writeBooleanField("is_savepoint", restored.getProperties().isSavepoint());
+			gen.writeObjectFieldStart(CheckpointStatistics.LatestCheckpoints.FIELD_NAME_RESTORED);
+			gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_ID, restored.getCheckpointId());
+			gen.writeNumberField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_RESTORE_TIMESTAMP, restored.getRestoreTimestamp());
+			gen.writeBooleanField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, restored.getProperties().isSavepoint());
 
 			String externalPath = restored.getExternalPath();
 			if (externalPath != null) {
-				gen.writeStringField("external_path", externalPath);
+				gen.writeStringField(CheckpointStatistics.RestoredCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, externalPath);
 			}
 			gen.writeEndObject();
 		}
@@ -224,29 +225,29 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 	}
 
 	private static void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException {
-		gen.writeNumberField("id", checkpoint.getCheckpointId());
-		gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
-		gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
-		gen.writeNumberField("state_size", checkpoint.getStateSize());
-		gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
-		gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
+		gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
+		gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
+		gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
+		gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
+		gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
+		gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
 
 	}
 
 	private static void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException {
-		gen.writeArrayFieldStart("history");
+		gen.writeArrayFieldStart(CheckpointStatistics.FIELD_NAME_HISTORY);
 		for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
 			gen.writeStartObject();
-			gen.writeNumberField("id", checkpoint.getCheckpointId());
-			gen.writeStringField("status", checkpoint.getStatus().toString());
-			gen.writeBooleanField("is_savepoint", checkpoint.getProperties().isSavepoint());
-			gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
-			gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
-			gen.writeNumberField("state_size", checkpoint.getStateSize());
-			gen.writeNumberField("end_to_end_duration", checkpoint.getEndToEndDuration());
-			gen.writeNumberField("alignment_buffered", checkpoint.getAlignmentBuffered());
-			gen.writeNumberField("num_subtasks", checkpoint.getNumberOfSubtasks());
-			gen.writeNumberField("num_acknowledged_subtasks", checkpoint.getNumberOfAcknowledgedSubtasks());
+			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ID, checkpoint.getCheckpointId());
+			gen.writeStringField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATUS, checkpoint.getStatus().toString());
+			gen.writeBooleanField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_IS_SAVEPOINT, checkpoint.getProperties().isSavepoint());
+			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_TRIGGER_TIMESTAMP, checkpoint.getTriggerTimestamp());
+			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_LATEST_ACK_TIMESTAMP, checkpoint.getLatestAckTimestamp());
+			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_STATE_SIZE, checkpoint.getStateSize());
+			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_DURATION, checkpoint.getEndToEndDuration());
+			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_ALIGNMENT_BUFFERED, checkpoint.getAlignmentBuffered());
+			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_SUBTASKS, checkpoint.getNumberOfSubtasks());
+			gen.writeNumberField(CheckpointStatistics.BaseCheckpointStatistics.FIELD_NAME_NUM_ACK_SUBTASKS, checkpoint.getNumberOfAcknowledgedSubtasks());
 
 			if (checkpoint.getStatus().isCompleted()) {
 				// --- Completed ---
@@ -254,20 +255,20 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 
 				String externalPath = completed.getExternalPath();
 				if (externalPath != null) {
-					gen.writeStringField("external_path", externalPath);
+					gen.writeStringField(CheckpointStatistics.CompletedCheckpointStatistics.FIELD_NAME_EXTERNAL_PATH, externalPath);
 				}
 
-				gen.writeBooleanField("discarded", completed.isDiscarded());
+				gen.writeBooleanField(CheckpointStatistics.CompletedCheckpointStatistics.FIELD_NAME_DISCARDED, completed.isDiscarded());
 			}
 			else if (checkpoint.getStatus().isFailed()) {
 				// --- Failed ---
 				FailedCheckpointStats failed = (FailedCheckpointStats) checkpoint;
 
-				gen.writeNumberField("failure_timestamp", failed.getFailureTimestamp());
+				gen.writeNumberField(CheckpointStatistics.FailedCheckpointStatistics.FIELD_NAME_FAILURE_TIMESTAMP, failed.getFailureTimestamp());
 
 				String failureMsg = failed.getFailureMessage();
 				if (failureMsg != null) {
-					gen.writeStringField("failure_message", failureMsg);
+					gen.writeStringField(CheckpointStatistics.FailedCheckpointStatistics.FIELD_NAME_FAILURE_MESSAGE, failureMsg);
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ac82becd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
new file mode 100644
index 0000000..ade8c7a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatistics.java
@@ -0,0 +1,763 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Response of the {@link CheckpointStatisticsHandler}.
+ */
+public class CheckpointStatistics implements ResponseBody {
+
+	public static final String FIELD_NAME_COUNTS = "counts";
+
+	public static final String FIELD_NAME_SUMMARY = "summary";
+
+	public static final String FIELD_NAME_LATEST_CHECKPOINTS = "latest";
+
+	public static final String FIELD_NAME_HISTORY = "history";
+
+	@JsonProperty(FIELD_NAME_COUNTS)
+	private final Counts counts;
+
+	@JsonProperty(FIELD_NAME_SUMMARY)
+	private final Summary summary;
+
+	@JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS)
+	private final LatestCheckpoints latestCheckpoints;
+
+	@JsonProperty(FIELD_NAME_HISTORY)
+	private final List<BaseCheckpointStatistics> history;
+
+	@JsonCreator
+	public CheckpointStatistics(
+			@JsonProperty(FIELD_NAME_COUNTS) Counts counts,
+			@JsonProperty(FIELD_NAME_SUMMARY) Summary summary,
+			@JsonProperty(FIELD_NAME_LATEST_CHECKPOINTS) LatestCheckpoints latestCheckpoints,
+			@JsonProperty(FIELD_NAME_HISTORY) List<BaseCheckpointStatistics> history) {
+		this.counts = Preconditions.checkNotNull(counts);
+		this.summary = Preconditions.checkNotNull(summary);
+		this.latestCheckpoints = Preconditions.checkNotNull(latestCheckpoints);
+		this.history = Preconditions.checkNotNull(history);
+	}
+
+	public Counts getCounts() {
+		return counts;
+	}
+
+	public Summary getSummary() {
+		return summary;
+	}
+
+	public LatestCheckpoints getLatestCheckpoints() {
+		return latestCheckpoints;
+	}
+
+	public List<BaseCheckpointStatistics> getHistory() {
+		return history;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		CheckpointStatistics that = (CheckpointStatistics) o;
+		return Objects.equals(counts, that.counts) &&
+			Objects.equals(summary, that.summary) &&
+			Objects.equals(latestCheckpoints, that.latestCheckpoints) &&
+			Objects.equals(history, that.history);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(counts, summary, latestCheckpoints, history);
+	}
+
+	// ------------------------------------------------------------------
+	// Inner classes
+	// ------------------------------------------------------------------
+
+	/**
+	 * Checkpoint counts.
+	 */
+	public static final class Counts {
+
+		public static final String FIELD_NAME_RESTORED_CHECKPOINTS = "restored";
+
+		public static final String FIELD_NAME_TOTAL_CHECKPOINTS = "total";
+
+		public static final String FIELD_NAME_IN_PROGRESS_CHECKPOINTS = "in_progress";
+
+		public static final String FIELD_NAME_COMPLETED_CHECKPOINTS = "completed";
+
+		public static final String FIELD_NAME_FAILED_CHECKPOINTS = "failed";
+
+		@JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS)
+		private final long numberRestoredCheckpoints;
+
+		@JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS)
+		private final long totalNumberCheckpoints;
+
+		@JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS)
+		private final int numberInProgressCheckpoints;
+
+		@JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS)
+		private final long numberCompletedCheckpoints;
+
+		@JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS)
+		private final long numberFailedCheckpoints;
+
+		@JsonCreator
+		public Counts(
+				@JsonProperty(FIELD_NAME_RESTORED_CHECKPOINTS) long numberRestoredCheckpoints,
+				@JsonProperty(FIELD_NAME_TOTAL_CHECKPOINTS) long totalNumberCheckpoints,
+				@JsonProperty(FIELD_NAME_IN_PROGRESS_CHECKPOINTS) int numberInProgressCheckpoints,
+				@JsonProperty(FIELD_NAME_COMPLETED_CHECKPOINTS) long numberCompletedCheckpoints,
+				@JsonProperty(FIELD_NAME_FAILED_CHECKPOINTS) long numberFailedCheckpoints) {
+			this.numberRestoredCheckpoints = numberRestoredCheckpoints;
+			this.totalNumberCheckpoints = totalNumberCheckpoints;
+			this.numberInProgressCheckpoints = numberInProgressCheckpoints;
+			this.numberCompletedCheckpoints = numberCompletedCheckpoints;
+			this.numberFailedCheckpoints = numberFailedCheckpoints;
+		}
+
+		public long getNumberRestoredCheckpoints() {
+			return numberRestoredCheckpoints;
+		}
+
+		public long getTotalNumberCheckpoints() {
+			return totalNumberCheckpoints;
+		}
+
+		public int getNumberInProgressCheckpoints() {
+			return numberInProgressCheckpoints;
+		}
+
+		public long getNumberCompletedCheckpoints() {
+			return numberCompletedCheckpoints;
+		}
+
+		public long getNumberFailedCheckpoints() {
+			return numberFailedCheckpoints;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			Counts counts = (Counts) o;
+			return numberRestoredCheckpoints == counts.numberRestoredCheckpoints &&
+				totalNumberCheckpoints == counts.totalNumberCheckpoints &&
+				numberInProgressCheckpoints == counts.numberInProgressCheckpoints &&
+				numberCompletedCheckpoints == counts.numberCompletedCheckpoints &&
+				numberFailedCheckpoints == counts.numberFailedCheckpoints;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(numberRestoredCheckpoints, totalNumberCheckpoints, numberInProgressCheckpoints, numberCompletedCheckpoints, numberFailedCheckpoints);
+		}
+	}
+
+	/**
+	 * Checkpoint summary.
+	 */
+	public static final class Summary {
+
+		public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+		public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+		public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
+
+		@JsonProperty(FIELD_NAME_STATE_SIZE)
+		private final MinMaxAvgStatistics stateSize;
+
+		@JsonProperty(FIELD_NAME_DURATION)
+		private final MinMaxAvgStatistics duration;
+
+		@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+		private final MinMaxAvgStatistics alignmentBuffered;
+
+		@JsonCreator
+		public Summary(
+				@JsonProperty(FIELD_NAME_STATE_SIZE) MinMaxAvgStatistics stateSize,
+				@JsonProperty(FIELD_NAME_DURATION) MinMaxAvgStatistics duration,
+				@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) MinMaxAvgStatistics alignmentBuffered) {
+			this.stateSize = stateSize;
+			this.duration = duration;
+			this.alignmentBuffered = alignmentBuffered;
+		}
+
+		public MinMaxAvgStatistics getStateSize() {
+			return stateSize;
+		}
+
+		public MinMaxAvgStatistics getDuration() {
+			return duration;
+		}
+
+		public MinMaxAvgStatistics getAlignmentBuffered() {
+			return alignmentBuffered;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			Summary summary = (Summary) o;
+			return Objects.equals(stateSize, summary.stateSize) &&
+				Objects.equals(duration, summary.duration) &&
+				Objects.equals(alignmentBuffered, summary.alignmentBuffered);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(stateSize, duration, alignmentBuffered);
+		}
+	}
+
+	/**
+	 * Minimum, maximum and average statistics.
+	 */
+	public static final class MinMaxAvgStatistics {
+
+		public static final String FIELD_NAME_MINIMUM = "min";
+
+		public static final String FIELD_NAME_MAXIMUM = "max";
+
+		public static final String FIELD_NAME_AVERAGE = "avg";
+
+		@JsonProperty(FIELD_NAME_MINIMUM)
+		private final long minimum;
+
+		@JsonProperty(FIELD_NAME_MAXIMUM)
+		private final long maximum;
+
+		@JsonProperty(FIELD_NAME_AVERAGE)
+		private final long average;
+
+		@JsonCreator
+		public MinMaxAvgStatistics(
+				@JsonProperty(FIELD_NAME_MINIMUM) long minimum,
+				@JsonProperty(FIELD_NAME_MAXIMUM) long maximum,
+				@JsonProperty(FIELD_NAME_AVERAGE) long average) {
+			this.minimum = minimum;
+			this.maximum = maximum;
+			this.average = average;
+		}
+
+		public long getMinimum() {
+			return minimum;
+		}
+
+		public long getMaximum() {
+			return maximum;
+		}
+
+		public long getAverage() {
+			return average;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			MinMaxAvgStatistics that = (MinMaxAvgStatistics) o;
+			return minimum == that.minimum &&
+				maximum == that.maximum &&
+				average == that.average;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(minimum, maximum, average);
+		}
+	}
+
+	/**
+	 * Statistics about the latest checkpoints.
+	 */
+	public static final class LatestCheckpoints {
+
+		public static final String FIELD_NAME_COMPLETED = "completed";
+
+		public static final String FIELD_NAME_SAVEPOINT = "savepoint";
+
+		public static final String FIELD_NAME_FAILED = "failed";
+
+		public static final String FIELD_NAME_RESTORED = "restored";
+
+		@JsonProperty(FIELD_NAME_COMPLETED)
+		@Nullable
+		private final CompletedCheckpointStatistics completedCheckpointStatistics;
+
+		@JsonProperty(FIELD_NAME_SAVEPOINT)
+		@Nullable
+		private final CompletedCheckpointStatistics savepointStatistics;
+
+		@JsonProperty(FIELD_NAME_FAILED)
+		@Nullable
+		private final FailedCheckpointStatistics failedCheckpointStatistics;
+
+		@JsonProperty(FIELD_NAME_RESTORED)
+		@Nullable
+		private final RestoredCheckpointStatistics restoredCheckpointStatistics;
+
+		@JsonCreator
+		public LatestCheckpoints(
+				@JsonProperty(FIELD_NAME_COMPLETED) @Nullable CompletedCheckpointStatistics completedCheckpointStatistics,
+				@JsonProperty(FIELD_NAME_SAVEPOINT) @Nullable CompletedCheckpointStatistics savepointStatistics,
+				@JsonProperty(FIELD_NAME_FAILED) @Nullable FailedCheckpointStatistics failedCheckpointStatistics,
+				@JsonProperty(FIELD_NAME_RESTORED) @Nullable RestoredCheckpointStatistics restoredCheckpointStatistics) {
+			this.completedCheckpointStatistics = completedCheckpointStatistics;
+			this.savepointStatistics = savepointStatistics;
+			this.failedCheckpointStatistics = failedCheckpointStatistics;
+			this.restoredCheckpointStatistics = restoredCheckpointStatistics;
+		}
+
+		@Nullable
+		public CompletedCheckpointStatistics getCompletedCheckpointStatistics() {
+			return completedCheckpointStatistics;
+		}
+
+		@Nullable
+		public CompletedCheckpointStatistics getSavepointStatistics() {
+			return savepointStatistics;
+		}
+
+		@Nullable
+		public FailedCheckpointStatistics getFailedCheckpointStatistics() {
+			return failedCheckpointStatistics;
+		}
+
+		@Nullable
+		public RestoredCheckpointStatistics getRestoredCheckpointStatistics() {
+			return restoredCheckpointStatistics;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			LatestCheckpoints that = (LatestCheckpoints) o;
+			return Objects.equals(completedCheckpointStatistics, that.completedCheckpointStatistics) &&
+				Objects.equals(savepointStatistics, that.savepointStatistics) &&
+				Objects.equals(failedCheckpointStatistics, that.failedCheckpointStatistics) &&
+				Objects.equals(restoredCheckpointStatistics, that.restoredCheckpointStatistics);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(completedCheckpointStatistics, savepointStatistics, failedCheckpointStatistics, restoredCheckpointStatistics);
+		}
+	}
+
+	/**
+	 * Statistics for a checkpoint.
+	 */
+	@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
+	@JsonSubTypes({
+		@JsonSubTypes.Type(value = CompletedCheckpointStatistics.class, name = "completed"),
+		@JsonSubTypes.Type(value = FailedCheckpointStatistics.class, name = "failed")})
+	public static class BaseCheckpointStatistics {
+
+		public static final String FIELD_NAME_ID = "id";
+
+		public static final String FIELD_NAME_STATUS = "status";
+
+		public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+		public static final String FIELD_NAME_TRIGGER_TIMESTAMP = "trigger_timestamp";
+
+		public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
+
+		public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+		public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+		public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
+
+		public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+		public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
+
+		@JsonProperty(FIELD_NAME_ID)
+		private final long id;
+
+		@JsonProperty(FIELD_NAME_STATUS)
+		private final CheckpointStatsStatus status;
+
+		@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+		private final boolean savepoint;
+
+		@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP)
+		private final long triggerTimestamp;
+
+		@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+		private final long latestAckTimestamp;
+
+		@JsonProperty(FIELD_NAME_STATE_SIZE)
+		private final long stateSize;
+
+		@JsonProperty(FIELD_NAME_DURATION)
+		private final long duration;
+
+		@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+		private final long alignmentBuffered;
+
+		@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+		private final int numSubtasks;
+
+		@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+		private final int numAckSubtasks;
+
+		@JsonCreator
+		protected BaseCheckpointStatistics(
+				@JsonProperty(FIELD_NAME_ID) long id,
+				@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
+				@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+				@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
+				@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
+				@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+				@JsonProperty(FIELD_NAME_DURATION) long duration,
+				@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+				@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+				@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) {
+			this.id = id;
+			this.status = Preconditions.checkNotNull(status);
+			this.savepoint = savepoint;
+			this.triggerTimestamp = triggerTimestamp;
+			this.latestAckTimestamp = latestAckTimestamp;
+			this.stateSize = stateSize;
+			this.duration = duration;
+			this.alignmentBuffered = alignmentBuffered;
+			this.numSubtasks = numSubtasks;
+			this.numAckSubtasks = numAckSubtasks;
+		}
+
+		public long getId() {
+			return id;
+		}
+
+		public CheckpointStatsStatus getStatus() {
+			return status;
+		}
+
+		public boolean isSavepoint() {
+			return savepoint;
+		}
+
+		public long getTriggerTimestamp() {
+			return triggerTimestamp;
+		}
+
+		public long getLatestAckTimestamp() {
+			return latestAckTimestamp;
+		}
+
+		public long getStateSize() {
+			return stateSize;
+		}
+
+		public long getDuration() {
+			return duration;
+		}
+
+		public long getAlignmentBuffered() {
+			return alignmentBuffered;
+		}
+
+		public int getNumSubtasks() {
+			return numSubtasks;
+		}
+
+		public int getNumAckSubtasks() {
+			return numAckSubtasks;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			BaseCheckpointStatistics that = (BaseCheckpointStatistics) o;
+			return id == that.id &&
+				savepoint == that.savepoint &&
+				triggerTimestamp == that.triggerTimestamp &&
+				latestAckTimestamp == that.latestAckTimestamp &&
+				stateSize == that.stateSize &&
+				duration == that.duration &&
+				alignmentBuffered == that.alignmentBuffered &&
+				numSubtasks == that.numSubtasks &&
+				numAckSubtasks == that.numAckSubtasks &&
+				status == that.status;
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
+		}
+	}
+
+	/**
+	 * Statistics for a completed checkpoint.
+	 */
+	public static final class CompletedCheckpointStatistics extends BaseCheckpointStatistics {
+
+		public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
+
+		public static final String FIELD_NAME_DISCARDED = "discarded";
+
+		@JsonProperty(FIELD_NAME_EXTERNAL_PATH)
+		@Nullable
+		private final String externalPath;
+
+		@JsonProperty(FIELD_NAME_DISCARDED)
+		private final boolean discarded;
+
+		@JsonCreator
+		public CompletedCheckpointStatistics(
+				@JsonProperty(FIELD_NAME_ID) long id,
+				@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
+				@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+				@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
+				@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
+				@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+				@JsonProperty(FIELD_NAME_DURATION) long duration,
+				@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+				@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+				@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
+				@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath,
+				@JsonProperty(FIELD_NAME_DISCARDED) boolean discarded) {
+			super(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
+
+			this.externalPath = externalPath;
+			this.discarded = discarded;
+		}
+
+		@Nullable
+		public String getExternalPath() {
+			return externalPath;
+		}
+
+		public boolean isDiscarded() {
+			return discarded;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			if (!super.equals(o)) {
+				return false;
+			}
+			CompletedCheckpointStatistics that = (CompletedCheckpointStatistics) o;
+			return discarded == that.discarded &&
+				Objects.equals(externalPath, that.externalPath);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(super.hashCode(), externalPath, discarded);
+		}
+	}
+
+	/**
+	 * Statistics for a failed checkpoint.
+	 */
+	public static final class FailedCheckpointStatistics extends BaseCheckpointStatistics {
+
+		public static final String FIELD_NAME_FAILURE_TIMESTAMP = "failure_timestamp";
+
+		public static final String FIELD_NAME_FAILURE_MESSAGE = "failure_message";
+
+		@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP)
+		private final long failureTimestamp;
+
+		@JsonProperty(FIELD_NAME_FAILURE_MESSAGE)
+		@Nullable
+		private final String failureMessage;
+
+		@JsonCreator
+		public FailedCheckpointStatistics(
+				@JsonProperty(FIELD_NAME_ID) long id,
+				@JsonProperty(FIELD_NAME_STATUS) CheckpointStatsStatus status,
+				@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+				@JsonProperty(FIELD_NAME_TRIGGER_TIMESTAMP) long triggerTimestamp,
+				@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
+				@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+				@JsonProperty(FIELD_NAME_DURATION) long duration,
+				@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+				@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+				@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
+				@JsonProperty(FIELD_NAME_FAILURE_TIMESTAMP) long failureTimestamp,
+				@JsonProperty(FIELD_NAME_FAILURE_MESSAGE) @Nullable String failureMessage) {
+			super(id, status, savepoint, triggerTimestamp, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
+
+			this.failureTimestamp = failureTimestamp;
+			this.failureMessage = failureMessage;
+		}
+
+		public long getFailureTimestamp() {
+			return failureTimestamp;
+		}
+
+		@Nullable
+		public String getFailureMessage() {
+			return failureMessage;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			if (!super.equals(o)) {
+				return false;
+			}
+			FailedCheckpointStatistics that = (FailedCheckpointStatistics) o;
+			return failureTimestamp == that.failureTimestamp &&
+				Objects.equals(failureMessage, that.failureMessage);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(super.hashCode(), failureTimestamp, failureMessage);
+		}
+	}
+
+	/**
+	 * Statistics for a restored checkpoint.
+	 */
+	public static final class RestoredCheckpointStatistics {
+
+		public static final String FIELD_NAME_ID = "id";
+
+		public static final String FIELD_NAME_RESTORE_TIMESTAMP = "restore_timestamp";
+
+		public static final String FIELD_NAME_IS_SAVEPOINT = "is_savepoint";
+
+		public static final String FIELD_NAME_EXTERNAL_PATH = "external_path";
+
+		@JsonProperty(FIELD_NAME_ID)
+		private final long id;
+
+		@JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP)
+		private final long restoreTimestamp;
+
+		@JsonProperty(FIELD_NAME_IS_SAVEPOINT)
+		private final boolean savepoint;
+
+		@JsonProperty(FIELD_NAME_EXTERNAL_PATH)
+		@Nullable
+		private final String externalPath;
+
+		@JsonCreator
+		public RestoredCheckpointStatistics(
+				@JsonProperty(FIELD_NAME_ID) long id,
+				@JsonProperty(FIELD_NAME_RESTORE_TIMESTAMP) long restoreTimestamp,
+				@JsonProperty(FIELD_NAME_IS_SAVEPOINT) boolean savepoint,
+				@JsonProperty(FIELD_NAME_EXTERNAL_PATH) @Nullable String externalPath) {
+			this.id = id;
+			this.restoreTimestamp = restoreTimestamp;
+			this.savepoint = savepoint;
+			this.externalPath = externalPath;
+		}
+
+		public long getId() {
+			return id;
+		}
+
+		public long getRestoreTimestamp() {
+			return restoreTimestamp;
+		}
+
+		public boolean isSavepoint() {
+			return savepoint;
+		}
+
+		@Nullable
+		public String getExternalPath() {
+			return externalPath;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			RestoredCheckpointStatistics that = (RestoredCheckpointStatistics) o;
+			return id == that.id &&
+				restoreTimestamp == that.restoreTimestamp &&
+				savepoint == that.savepoint &&
+				Objects.equals(externalPath, that.externalPath);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(id, restoreTimestamp, savepoint, externalPath);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac82becd/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java
new file mode 100644
index 0000000..b062d0d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CheckpointStatisticsHeaders.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticsHandler;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link CheckpointStatisticsHandler}.
+ */
+public class CheckpointStatisticsHeaders implements MessageHeaders<EmptyRequestBody, CheckpointStatistics, JobMessageParameters> {
+
+	private static final CheckpointStatisticsHeaders INSTANCE = new CheckpointStatisticsHeaders();
+
+	public static final String URL = "/jobs/:jobid/checkpoints";
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<CheckpointStatistics> getResponseClass() {
+		return CheckpointStatistics.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public JobMessageParameters getUnresolvedMessageParameters() {
+		return new JobMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static CheckpointStatisticsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ac82becd/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java
new file mode 100644
index 0000000..8e8a50f
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointStatisticsTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.rest.messages.CheckpointStatistics;
+
+import java.util.Arrays;
+
+/**
+ * Tests for {@link CheckpointStatistics}.
+ */
+public class CheckpointStatisticsTest extends RestResponseMarshallingTestBase<CheckpointStatistics> {
+	@Override
+	protected Class<CheckpointStatistics> getTestResponseClass() {
+		return CheckpointStatistics.class;
+	}
+
+	@Override
+	protected CheckpointStatistics getTestResponseInstance() throws Exception {
+
+		final CheckpointStatistics.Counts counts = new CheckpointStatistics.Counts(1, 2, 3, 4, 5);
+		final CheckpointStatistics.Summary summary = new CheckpointStatistics.Summary(
+			new CheckpointStatistics.MinMaxAvgStatistics(1L, 1L, 1L),
+			new CheckpointStatistics.MinMaxAvgStatistics(2L, 2L, 2L),
+			new CheckpointStatistics.MinMaxAvgStatistics(3L, 3L, 3L));
+
+		final CheckpointStatistics.CompletedCheckpointStatistics completed = new CheckpointStatistics.CompletedCheckpointStatistics(
+			1L,
+			CheckpointStatsStatus.COMPLETED,
+			false,
+			42L,
+			41L,
+			1337L,
+			1L,
+			0L,
+			10,
+			10,
+			null,
+			false);
+
+		final CheckpointStatistics.CompletedCheckpointStatistics savepoint = new CheckpointStatistics.CompletedCheckpointStatistics(
+			2L,
+			CheckpointStatsStatus.COMPLETED,
+			true,
+			11L,
+			10L,
+			43L,
+			1L,
+			0L,
+			9,
+			9,
+			"externalPath",
+			false);
+
+		final CheckpointStatistics.FailedCheckpointStatistics failed = new CheckpointStatistics.FailedCheckpointStatistics(
+			3L,
+			CheckpointStatsStatus.FAILED,
+			false,
+			5L,
+			10L,
+			4L,
+			2L,
+			0L,
+			11,
+			9,
+			100L,
+			"Test failure");
+
+		CheckpointStatistics.RestoredCheckpointStatistics restored = new CheckpointStatistics.RestoredCheckpointStatistics(
+			4L,
+			1445L,
+			true,
+			"foobar");
+
+		final CheckpointStatistics.LatestCheckpoints latestCheckpoints = new CheckpointStatistics.LatestCheckpoints(
+			completed,
+			savepoint,
+			failed,
+			restored);
+
+		return new CheckpointStatistics(
+			counts,
+			summary,
+			latestCheckpoints,
+			Arrays.asList(completed, savepoint, failed));
+	}
+}


Mime
View raw message