flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-7707] [flip6] Add TaskCheckpointStatisticDetailsHandler for new REST endpoint
Date Wed, 11 Oct 2017 12:43:09 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7b9967005 -> bc4638a3c


http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java
new file mode 100644
index 0000000..e481da5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/SubtaskCheckpointStatistics.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+import java.util.Objects;
+
+/**
+ * Checkpoint statistics for a subtask.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "@class")
+@JsonSubTypes({
+	@JsonSubTypes.Type(value = SubtaskCheckpointStatistics.CompletedSubtaskCheckpointStatistics.class, name = "completed"),
+	@JsonSubTypes.Type(value = SubtaskCheckpointStatistics.PendingSubtaskCheckpointStatistics.class, name = "pending")})
+public class SubtaskCheckpointStatistics {
+
+	public static final String FIELD_NAME_INDEX = "index";
+
+	public static final String FIELD_NAME_CHECKPOINT_STATUS = "status";
+
+	@JsonProperty(FIELD_NAME_INDEX)
+	private final int index;
+
+	@JsonProperty(FIELD_NAME_CHECKPOINT_STATUS)
+	private final String checkpointStatus;
+
+	@JsonCreator
+	private SubtaskCheckpointStatistics(
+			@JsonProperty(FIELD_NAME_INDEX) int index,
+			@JsonProperty(FIELD_NAME_CHECKPOINT_STATUS) String checkpointStatus) {
+		this.index = index;
+		this.checkpointStatus = checkpointStatus;
+	}
+
+	public int getIndex() {
+		return index;
+	}
+
+	public String getCheckpointStatus() {
+		return checkpointStatus;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		SubtaskCheckpointStatistics that = (SubtaskCheckpointStatistics) o;
+		return index == that.index &&
+			Objects.equals(checkpointStatus, that.checkpointStatus);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(index, checkpointStatus);
+	}
+
+	// ---------------------------------------------------------------------------------
+	// Static inner classes
+	// ---------------------------------------------------------------------------------
+
+	/**
+	 * Checkpoint statistics for a completed subtask checkpoint.
+	 */
+	public static final class CompletedSubtaskCheckpointStatistics extends SubtaskCheckpointStatistics {
+
+		public static final String FIELD_NAME_ACK_TIMESTAMP = "ack_timestamp";
+
+		public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+		public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+		public static final String FIELD_NAME_CHECKPOINT_DURATION = "checkpoint";
+
+		public static final String FIELD_NAME_ALIGNMENT = "alignment";
+
+		@JsonProperty(FIELD_NAME_ACK_TIMESTAMP)
+		private final long ackTimestamp;
+
+		@JsonProperty(FIELD_NAME_DURATION)
+		private final long duration;
+
+		@JsonProperty(FIELD_NAME_STATE_SIZE)
+		private final long stateSize;
+
+		@JsonProperty(FIELD_NAME_CHECKPOINT_DURATION)
+		private final CheckpointDuration checkpointDuration;
+
+		@JsonProperty(FIELD_NAME_ALIGNMENT)
+		private final CheckpointAlignment alignment;
+
+		@JsonCreator
+		public CompletedSubtaskCheckpointStatistics(
+				@JsonProperty(FIELD_NAME_INDEX) int index,
+				@JsonProperty(FIELD_NAME_ACK_TIMESTAMP) long ackTimestamp,
+				@JsonProperty(FIELD_NAME_DURATION) long duration,
+				@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+				@JsonProperty(FIELD_NAME_CHECKPOINT_DURATION) CheckpointDuration checkpointDuration,
+				@JsonProperty(FIELD_NAME_ALIGNMENT) CheckpointAlignment alignment) {
+			super(index, "completed");
+			this.ackTimestamp = ackTimestamp;
+			this.duration = duration;
+			this.stateSize = stateSize;
+			this.checkpointDuration = checkpointDuration;
+			this.alignment = alignment;
+		}
+
+		public long getAckTimestamp() {
+			return ackTimestamp;
+		}
+
+		public long getDuration() {
+			return duration;
+		}
+
+		public long getStateSize() {
+			return stateSize;
+		}
+
+		public CheckpointDuration getCheckpointDuration() {
+			return checkpointDuration;
+		}
+
+		public CheckpointAlignment getAlignment() {
+			return alignment;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			CompletedSubtaskCheckpointStatistics that = (CompletedSubtaskCheckpointStatistics) o;
+			return ackTimestamp == that.ackTimestamp &&
+				duration == that.duration &&
+				stateSize == that.stateSize &&
+				Objects.equals(checkpointDuration, that.checkpointDuration) &&
+				Objects.equals(alignment, that.alignment);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(ackTimestamp, duration, stateSize, checkpointDuration, alignment);
+		}
+
+		/**
+		 * Duration of the checkpoint.
+		 */
+		public static final class CheckpointDuration {
+
+			public static final String FIELD_NAME_SYNC_DURATION = "sync";
+
+			public static final String FIELD_NAME_ASYNC_DURATION = "async";
+
+			@JsonProperty(FIELD_NAME_SYNC_DURATION)
+			private final long syncDuration;
+
+			@JsonProperty(FIELD_NAME_ASYNC_DURATION)
+			private final long asyncDuration;
+
+			@JsonCreator
+			public CheckpointDuration(
+					@JsonProperty(FIELD_NAME_SYNC_DURATION) long syncDuration,
+					@JsonProperty(FIELD_NAME_ASYNC_DURATION) long asyncDuration) {
+				this.syncDuration = syncDuration;
+				this.asyncDuration = asyncDuration;
+			}
+
+			public long getSyncDuration() {
+				return syncDuration;
+			}
+
+			public long getAsyncDuration() {
+				return asyncDuration;
+			}
+
+			@Override
+			public boolean equals(Object o) {
+				if (this == o) {
+					return true;
+				}
+				if (o == null || getClass() != o.getClass()) {
+					return false;
+				}
+				CheckpointDuration that = (CheckpointDuration) o;
+				return syncDuration == that.syncDuration &&
+					asyncDuration == that.asyncDuration;
+			}
+
+			@Override
+			public int hashCode() {
+				return Objects.hash(syncDuration, asyncDuration);
+			}
+		}
+
+		/**
+		 * Alignment statistics of the checkpoint.
+		 */
+		public static final class CheckpointAlignment {
+
+			public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "buffered";
+
+			public static final String FIELD_NAME_ALIGNMENT_DURATION = "duration";
+
+			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+			private final long alignmentBuffered;
+
+			@JsonProperty(FIELD_NAME_ALIGNMENT_DURATION)
+			private final long alignmentDuration;
+
+			@JsonCreator
+			public CheckpointAlignment(
+					@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+					@JsonProperty(FIELD_NAME_ALIGNMENT_DURATION) long alignmentDuration) {
+				this.alignmentBuffered = alignmentBuffered;
+				this.alignmentDuration = alignmentDuration;
+			}
+
+			public long getAlignmentBuffered() {
+				return alignmentBuffered;
+			}
+
+			public long getAlignmentDuration() {
+				return alignmentDuration;
+			}
+
+			@Override
+			public boolean equals(Object o) {
+				if (this == o) {
+					return true;
+				}
+				if (o == null || getClass() != o.getClass()) {
+					return false;
+				}
+				CheckpointAlignment that = (CheckpointAlignment) o;
+				return alignmentBuffered == that.alignmentBuffered &&
+					alignmentDuration == that.alignmentDuration;
+			}
+
+			@Override
+			public int hashCode() {
+				return Objects.hash(alignmentBuffered, alignmentDuration);
+			}
+		}
+	}
+
+	/**
+	 * Checkpoint statistics for a pending subtask checkpoint.
+	 */
+	public static final class PendingSubtaskCheckpointStatistics extends SubtaskCheckpointStatistics {
+
+		@JsonCreator
+		public PendingSubtaskCheckpointStatistics(@JsonProperty(FIELD_NAME_INDEX) int index) {
+			super(index, "pending_or_failed");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointMessageParameters.java
new file mode 100644
index 0000000..ed7620a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointMessageParameters.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Message parameters for subtask related checkpoint message.
+ *
+ * <p>The message requires a JobID, checkpoint ID and a JobVertexID to be specified.
+ */
+public class TaskCheckpointMessageParameters extends CheckpointMessageParameters {
+
+	protected final JobVertexIdPathParameter jobVertexIdPathParameter = new JobVertexIdPathParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Arrays.asList(jobPathParameter, checkpointIdPathParameter, jobVertexIdPathParameter);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatistics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatistics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatistics.java
new file mode 100644
index 0000000..9836b26
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatistics.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Checkpoint statistics for a single task.
+ */
+public class TaskCheckpointStatistics implements ResponseBody {
+
+	public static final String FIELD_NAME_ID = "id";
+
+	public static final String FIELD_NAME_CHECKPOINT_STATUS = "status";
+
+	public static final String FIELD_NAME_LATEST_ACK_TIMESTAMP = "latest_ack_timestamp";
+
+	public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+	public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+	public static final String FIELD_NAME_ALIGNMENT_BUFFERED = "alignment_buffered";
+
+	public static final String FIELD_NAME_NUM_SUBTASKS = "num_subtasks";
+
+	public static final String FIELD_NAME_NUM_ACK_SUBTASKS = "num_acknowledged_subtasks";
+
+	@JsonProperty(FIELD_NAME_ID)
+	private final long checkpointId;
+
+	@JsonProperty(FIELD_NAME_CHECKPOINT_STATUS)
+	private final CheckpointStatsStatus checkpointStatus;
+
+	@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP)
+	private final long latestAckTimestamp;
+
+	@JsonProperty(FIELD_NAME_STATE_SIZE)
+	private final long stateSize;
+
+	@JsonProperty(FIELD_NAME_DURATION)
+	private final long duration;
+
+	@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED)
+	private final long alignmentBuffered;
+
+	@JsonProperty(FIELD_NAME_NUM_SUBTASKS)
+	private final int numSubtasks;
+
+	@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS)
+	private final int numAckSubtasks;
+
+	@JsonCreator
+	public TaskCheckpointStatistics(
+			@JsonProperty(FIELD_NAME_ID) long checkpointId,
+			@JsonProperty(FIELD_NAME_CHECKPOINT_STATUS) CheckpointStatsStatus checkpointStatus,
+			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
+			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+			@JsonProperty(FIELD_NAME_DURATION) long duration,
+			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks) {
+
+		this.checkpointId = checkpointId;
+		this.checkpointStatus = Preconditions.checkNotNull(checkpointStatus);
+		this.latestAckTimestamp = latestAckTimestamp;
+		this.stateSize = stateSize;
+		this.duration = duration;
+		this.alignmentBuffered = alignmentBuffered;
+		this.numSubtasks = numSubtasks;
+		this.numAckSubtasks = numAckSubtasks;
+	}
+
+	public long getLatestAckTimestamp() {
+		return latestAckTimestamp;
+	}
+
+	public long getStateSize() {
+		return stateSize;
+	}
+
+	public long getDuration() {
+		return duration;
+	}
+
+	public long getAlignmentBuffered() {
+		return alignmentBuffered;
+	}
+
+	public int getNumSubtasks() {
+		return numSubtasks;
+	}
+
+	public int getNumAckSubtasks() {
+		return numAckSubtasks;
+	}
+
+	public long getCheckpointId() {
+		return checkpointId;
+	}
+
+	public CheckpointStatsStatus getCheckpointStatus() {
+		return checkpointStatus;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		TaskCheckpointStatistics that = (TaskCheckpointStatistics) o;
+		return checkpointId == that.checkpointId &&
+			latestAckTimestamp == that.latestAckTimestamp &&
+			stateSize == that.stateSize &&
+			duration == that.duration &&
+			alignmentBuffered == that.alignmentBuffered &&
+			numSubtasks == that.numSubtasks &&
+			numAckSubtasks == that.numAckSubtasks &&
+			checkpointStatus == that.checkpointStatus;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(checkpointId, checkpointStatus, latestAckTimestamp, stateSize, duration, alignmentBuffered, numSubtasks, numAckSubtasks);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsHeaders.java
new file mode 100644
index 0000000..3886b1f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.checkpoints.TaskCheckpointStatisticDetailsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Headers for the {@link TaskCheckpointStatisticDetailsHandler}.
+ */
+public class TaskCheckpointStatisticsHeaders implements MessageHeaders<EmptyRequestBody, TaskCheckpointStatisticsWithSubtaskDetails, TaskCheckpointMessageParameters> {
+
+	private static final TaskCheckpointStatisticsHeaders INSTANCE = new TaskCheckpointStatisticsHeaders();
+
+	public static final String URL = "/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid";
+
+	private TaskCheckpointStatisticsHeaders() {}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<TaskCheckpointStatisticsWithSubtaskDetails> getResponseClass() {
+		return TaskCheckpointStatisticsWithSubtaskDetails.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public TaskCheckpointMessageParameters getUnresolvedMessageParameters() {
+		return new TaskCheckpointMessageParameters();
+	}
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	public static TaskCheckpointStatisticsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetails.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetails.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetails.java
new file mode 100644
index 0000000..ad2cab8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetails.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Task checkpoint statistics which also includes information about the sub task
+ * checkpoint statistics.
+ */
+public final class TaskCheckpointStatisticsWithSubtaskDetails extends TaskCheckpointStatistics {
+
+	public static final String FIELD_NAME_SUMMARY = "summary";
+
+	public static final String FIELD_NAME_SUBTASKS_CHECKPOINT_STATISTICS = "subtasks";
+
+	@JsonProperty(FIELD_NAME_SUMMARY)
+	private final Summary summary;
+
+	@JsonProperty(FIELD_NAME_SUBTASKS_CHECKPOINT_STATISTICS)
+	private final List<SubtaskCheckpointStatistics> subtaskCheckpointStatistics;
+
+	@JsonCreator
+	public TaskCheckpointStatisticsWithSubtaskDetails(
+			@JsonProperty(FIELD_NAME_ID) long checkpointId,
+			@JsonProperty(FIELD_NAME_CHECKPOINT_STATUS) CheckpointStatsStatus checkpointStatus,
+			@JsonProperty(FIELD_NAME_LATEST_ACK_TIMESTAMP) long latestAckTimestamp,
+			@JsonProperty(FIELD_NAME_STATE_SIZE) long stateSize,
+			@JsonProperty(FIELD_NAME_DURATION) long duration,
+			@JsonProperty(FIELD_NAME_ALIGNMENT_BUFFERED) long alignmentBuffered,
+			@JsonProperty(FIELD_NAME_NUM_SUBTASKS) int numSubtasks,
+			@JsonProperty(FIELD_NAME_NUM_ACK_SUBTASKS) int numAckSubtasks,
+			@JsonProperty(FIELD_NAME_SUMMARY) Summary summary,
+			@JsonProperty(FIELD_NAME_SUBTASKS_CHECKPOINT_STATISTICS) List<SubtaskCheckpointStatistics> subtaskCheckpointStatistics) {
+		super(
+			checkpointId,
+			checkpointStatus,
+			latestAckTimestamp,
+			stateSize,
+			duration,
+			alignmentBuffered,
+			numSubtasks,
+			numAckSubtasks);
+
+		this.summary = Preconditions.checkNotNull(summary);
+		this.subtaskCheckpointStatistics = Preconditions.checkNotNull(subtaskCheckpointStatistics);
+	}
+
+	public Summary getSummary() {
+		return summary;
+	}
+
+	public List<SubtaskCheckpointStatistics> getSubtaskCheckpointStatistics() {
+		return subtaskCheckpointStatistics;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		if (!super.equals(o)) {
+			return false;
+		}
+		TaskCheckpointStatisticsWithSubtaskDetails that = (TaskCheckpointStatisticsWithSubtaskDetails) o;
+		return Objects.equals(summary, that.summary) &&
+			Objects.equals(subtaskCheckpointStatistics, that.subtaskCheckpointStatistics);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(super.hashCode(), summary, subtaskCheckpointStatistics);
+	}
+
+	// -----------------------------------------------------------------------------------
+	// Static inner classes
+	// -----------------------------------------------------------------------------------
+
+	/**
+	 * Summary of the checkpoint statistics for a given task.
+	 */
+	public static final class Summary {
+
+		public static final String FIELD_NAME_STATE_SIZE = "state_size";
+
+		public static final String FIELD_NAME_DURATION = "end_to_end_duration";
+
+		public static final String FIELD_NAME_CHECKPOINT_DURATION = "checkpoint_duration";
+
+		public static final String FIELD_NAME_ALIGNMENT = "alignment";
+
+		@JsonProperty(FIELD_NAME_STATE_SIZE)
+		private final MinMaxAvgStatistics stateSize;
+
+		@JsonProperty(FIELD_NAME_DURATION)
+		private final MinMaxAvgStatistics duration;
+
+		@JsonProperty(FIELD_NAME_CHECKPOINT_DURATION)
+		private final CheckpointDuration checkpointDuration;
+
+		@JsonProperty(FIELD_NAME_ALIGNMENT)
+		private final CheckpointAlignment checkpointAlignment;
+
+		@JsonCreator
+		public Summary(
+				@JsonProperty(FIELD_NAME_STATE_SIZE) MinMaxAvgStatistics stateSize,
+				@JsonProperty(FIELD_NAME_DURATION) MinMaxAvgStatistics duration,
+				@JsonProperty(FIELD_NAME_CHECKPOINT_DURATION) CheckpointDuration checkpointDuration,
+				@JsonProperty(FIELD_NAME_ALIGNMENT) CheckpointAlignment checkpointAlignment) {
+			this.stateSize = Preconditions.checkNotNull(stateSize);
+			this.duration = Preconditions.checkNotNull(duration);
+			this.checkpointDuration = Preconditions.checkNotNull(checkpointDuration);
+			this.checkpointAlignment = Preconditions.checkNotNull(checkpointAlignment);
+		}
+
+		public MinMaxAvgStatistics getStateSize() {
+			return stateSize;
+		}
+
+		public MinMaxAvgStatistics getDuration() {
+			return duration;
+		}
+
+		public CheckpointDuration getCheckpointDuration() {
+			return checkpointDuration;
+		}
+
+		public CheckpointAlignment getCheckpointAlignment() {
+			return checkpointAlignment;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			Summary summary = (Summary) o;
+			return Objects.equals(stateSize, summary.stateSize) &&
+				Objects.equals(duration, summary.duration) &&
+				Objects.equals(checkpointDuration, summary.checkpointDuration) &&
+				Objects.equals(checkpointAlignment, summary.checkpointAlignment);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(stateSize, duration, checkpointDuration, checkpointAlignment);
+		}
+	}
+
+	/**
+	 * Duration of a checkpoint split up into its synchronous and asynchronous part.
+	 */
+	public static final class CheckpointDuration {
+
+		public static final String FIELD_NAME_SYNCHRONOUS_DURATION = "sync";
+
+		public static final String FIELD_NAME_ASYNCHRONOUS_DURATION = "async";
+
+		@JsonProperty(FIELD_NAME_SYNCHRONOUS_DURATION)
+		private final MinMaxAvgStatistics synchronousDuration;
+
+		@JsonProperty(FIELD_NAME_ASYNCHRONOUS_DURATION)
+		private final MinMaxAvgStatistics asynchronousDuration;
+
+		@JsonCreator
+		public CheckpointDuration(
+			@JsonProperty(FIELD_NAME_SYNCHRONOUS_DURATION) MinMaxAvgStatistics synchronousDuration,
+			@JsonProperty(FIELD_NAME_ASYNCHRONOUS_DURATION) MinMaxAvgStatistics asynchronousDuration) {
+			this.synchronousDuration = Preconditions.checkNotNull(synchronousDuration);
+			this.asynchronousDuration = Preconditions.checkNotNull(asynchronousDuration);
+		}
+
+		public MinMaxAvgStatistics getSynchronousDuration() {
+			return synchronousDuration;
+		}
+
+		public MinMaxAvgStatistics getAsynchronousDuration() {
+			return asynchronousDuration;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			CheckpointDuration that = (CheckpointDuration) o;
+			return Objects.equals(synchronousDuration, that.synchronousDuration) &&
+				Objects.equals(asynchronousDuration, that.asynchronousDuration);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(synchronousDuration, asynchronousDuration);
+		}
+	}
+
+	/**
+	 * Alignment information for a specific checkpoint at a given task.
+	 */
+	public static final class CheckpointAlignment {
+
+		public static final String FIELD_NAME_BUFFERED_DATA = "buffered";
+
+		public static final String FIELD_NAME_DURATION = "duration";
+
+		@JsonProperty(FIELD_NAME_BUFFERED_DATA)
+		private final MinMaxAvgStatistics bufferedData;
+
+		@JsonProperty(FIELD_NAME_DURATION)
+		private final MinMaxAvgStatistics duration;
+
+		@JsonCreator
+		public CheckpointAlignment(
+			@JsonProperty(FIELD_NAME_BUFFERED_DATA) MinMaxAvgStatistics bufferedData,
+			@JsonProperty(FIELD_NAME_DURATION) MinMaxAvgStatistics duration) {
+			this.bufferedData = bufferedData;
+			this.duration = duration;
+		}
+
+		public MinMaxAvgStatistics getBufferedData() {
+			return bufferedData;
+		}
+
+		public MinMaxAvgStatistics getDuration() {
+			return duration;
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			CheckpointAlignment alignment = (CheckpointAlignment) o;
+			return Objects.equals(bufferedData, alignment.bufferedData) &&
+				Objects.equals(duration, alignment.duration);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(bufferedData, duration);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
index 73d9157..09fd310 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandlerTest.java
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.rest.handler.legacy;
 
 import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.rest.handler.legacy.messages.DashboardConfiguration;
 import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.rest.messages.DashboardConfiguration;
 import org.apache.flink.util.TestLogger;
 
 import com.fasterxml.jackson.databind.JsonNode;

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
deleted file mode 100644
index deffaae..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointConfigInfoTest.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.messages;
-
-import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
-
-/**
- * Tests for the {@link CheckpointConfigInfo}.
- */
-public class CheckpointConfigInfoTest extends RestResponseMarshallingTestBase<CheckpointConfigInfo> {
-	@Override
-	protected Class<CheckpointConfigInfo> getTestResponseClass() {
-		return CheckpointConfigInfo.class;
-	}
-
-	@Override
-	protected CheckpointConfigInfo getTestResponseInstance() {
-		final CheckpointConfigInfo.ExternalizedCheckpointInfo externalizedCheckpointInfo = new CheckpointConfigInfo.ExternalizedCheckpointInfo(true, false);
-
-		return new CheckpointConfigInfo(
-			CheckpointConfigInfo.ProcessingMode.AT_LEAST_ONCE,
-			1L,
-			2L,
-			3L,
-			4,
-			externalizedCheckpointInfo);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java
deleted file mode 100644
index 8521d34..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/CheckpointingStatisticsTest.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.messages;
-
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
-import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Tests for {@link CheckpointingStatistics}.
- */
-public class CheckpointingStatisticsTest extends RestResponseMarshallingTestBase<CheckpointingStatistics> {
-	@Override
-	protected Class<CheckpointingStatistics> getTestResponseClass() {
-		return CheckpointingStatistics.class;
-	}
-
-	@Override
-	protected CheckpointingStatistics getTestResponseInstance() throws Exception {
-
-		final CheckpointingStatistics.Counts counts = new CheckpointingStatistics.Counts(1, 2, 3, 4, 5);
-		final CheckpointingStatistics.Summary summary = new CheckpointingStatistics.Summary(
-			new CheckpointingStatistics.MinMaxAvgStatistics(1L, 1L, 1L),
-			new CheckpointingStatistics.MinMaxAvgStatistics(2L, 2L, 2L),
-			new CheckpointingStatistics.MinMaxAvgStatistics(3L, 3L, 3L));
-
-		final Map<JobVertexID, CheckpointStatistics.TaskCheckpointStatistics> checkpointStatisticsPerTask = new HashMap<>(2);
-
-		checkpointStatisticsPerTask.put(
-			new JobVertexID(),
-			new CheckpointStatistics.TaskCheckpointStatistics(
-				1L,
-				2L,
-				3L,
-				4L,
-				5,
-				6));
-
-		checkpointStatisticsPerTask.put(
-			new JobVertexID(),
-			new CheckpointStatistics.TaskCheckpointStatistics(
-				2L,
-				3L,
-				4L,
-				5L,
-				6,
-				7));
-
-		final CheckpointStatistics.CompletedCheckpointStatistics completed = new CheckpointStatistics.CompletedCheckpointStatistics(
-			1L,
-			CheckpointStatsStatus.COMPLETED,
-			false,
-			42L,
-			41L,
-			1337L,
-			1L,
-			0L,
-			10,
-			10,
-			Collections.emptyMap(),
-			null,
-			false);
-
-		final CheckpointStatistics.CompletedCheckpointStatistics savepoint = new CheckpointStatistics.CompletedCheckpointStatistics(
-			2L,
-			CheckpointStatsStatus.COMPLETED,
-			true,
-			11L,
-			10L,
-			43L,
-			1L,
-			0L,
-			9,
-			9,
-			checkpointStatisticsPerTask,
-			"externalPath",
-			false);
-
-		final CheckpointStatistics.FailedCheckpointStatistics failed = new CheckpointStatistics.FailedCheckpointStatistics(
-			3L,
-			CheckpointStatsStatus.FAILED,
-			false,
-			5L,
-			10L,
-			4L,
-			2L,
-			0L,
-			11,
-			9,
-			Collections.emptyMap(),
-			100L,
-			"Test failure");
-
-		CheckpointingStatistics.RestoredCheckpointStatistics restored = new CheckpointingStatistics.RestoredCheckpointStatistics(
-			4L,
-			1445L,
-			true,
-			"foobar");
-
-		final CheckpointingStatistics.LatestCheckpoints latestCheckpoints = new CheckpointingStatistics.LatestCheckpoints(
-			completed,
-			savepoint,
-			failed,
-			restored);
-
-		return new CheckpointingStatistics(
-			counts,
-			summary,
-			latestCheckpoints,
-			Arrays.asList(completed, savepoint, failed));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoTest.java
deleted file mode 100644
index 8e7092b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/ClusterConfigurationInfoTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.messages;
-
-/**
- * Tests for the {@link ClusterConfigurationInfo}.
- */
-public class ClusterConfigurationInfoTest extends RestResponseMarshallingTestBase<ClusterConfigurationInfo> {
-
-	@Override
-	protected Class<ClusterConfigurationInfo> getTestResponseClass() {
-		return ClusterConfigurationInfo.class;
-	}
-
-	@Override
-	protected ClusterConfigurationInfo getTestResponseInstance() {
-		final ClusterConfigurationInfo expected = new ClusterConfigurationInfo(2);
-		expected.add(new ClusterConfigurationInfoEntry("key1", "value1"));
-		expected.add(new ClusterConfigurationInfoEntry("key2", "value2"));
-
-		return expected;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java
deleted file mode 100644
index bb1a6ec..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/DashboardConfigurationTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.messages;
-
-/**
- * Tests for the {@link DashboardConfiguration}.
- */
-public class DashboardConfigurationTest extends RestResponseMarshallingTestBase<DashboardConfiguration> {
-
-	@Override
-	protected Class<DashboardConfiguration> getTestResponseClass() {
-		return DashboardConfiguration.class;
-	}
-
-	@Override
-	protected DashboardConfiguration getTestResponseInstance() {
-		return new DashboardConfiguration(
-			1L,
-			"foobar",
-			42,
-			"version",
-			"revision");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobConfigInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobConfigInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobConfigInfoTest.java
deleted file mode 100644
index 2223d3d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobConfigInfoTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.messages;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.rest.messages.JobConfigInfo;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Tests that the {@link JobConfigInfo} can be marshalled and unmarshalled.
- */
-public class JobConfigInfoTest extends RestResponseMarshallingTestBase<JobConfigInfo> {
-
-	@Override
-	protected Class<JobConfigInfo> getTestResponseClass() {
-		return JobConfigInfo.class;
-	}
-
-	@Override
-	protected JobConfigInfo getTestResponseInstance() {
-		final Map<String, String> globalJobParameters = new HashMap<>(3);
-		globalJobParameters.put("foo", "bar");
-		globalJobParameters.put("bar", "foo");
-		globalJobParameters.put("hi", "ho");
-
-		final JobConfigInfo.ExecutionConfigInfo executionConfigInfo = new JobConfigInfo.ExecutionConfigInfo(
-			"foobar",
-			"always",
-			42,
-			false,
-			globalJobParameters);
-		return new JobConfigInfo(new JobID(), "testJob", executionConfigInfo);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
index 1fe51d0..afaafde 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest.handler.legacy.messages;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
 
 /**
  * Tests that the {@link JobPlanInfo} can be marshalled and unmarshalled.

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestRequestMarshallingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestRequestMarshallingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestRequestMarshallingTestBase.java
deleted file mode 100644
index 2eb37cb..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestRequestMarshallingTestBase.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.messages;
-
-import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.util.TestLogger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test base for verifying that marshalling / unmarshalling REST {@link RequestBody}s work properly.
- */
-public abstract class RestRequestMarshallingTestBase<R extends RequestBody> extends TestLogger {
-
-	/**
-	 * Returns the class of the test response.
-	 *
-	 * @return class of the test response type
-	 */
-	protected abstract Class<R> getTestRequestClass();
-
-	/**
-	 * Returns an instance of a response to be tested.
-	 *
-	 * @return instance of the expected test response
-	 */
-	protected abstract R getTestRequestInstance() throws Exception;
-
-	/**
-	 * Tests that we can marshal and unmarshal the response.
-	 */
-	@Test
-	public void testJsonMarshalling() throws Exception {
-		final R expected = getTestRequestInstance();
-
-		ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
-		JsonNode json = objectMapper.valueToTree(expected);
-
-		final R unmarshalled = objectMapper.treeToValue(json, getTestRequestClass());
-		Assert.assertEquals(expected, unmarshalled);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestResponseMarshallingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestResponseMarshallingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestResponseMarshallingTestBase.java
deleted file mode 100644
index db44b08..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/RestResponseMarshallingTestBase.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.messages;
-
-import org.apache.flink.runtime.rest.messages.ResponseBody;
-import org.apache.flink.runtime.rest.util.RestMapperUtils;
-import org.apache.flink.util.TestLogger;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test base for verifying that marshalling / unmarshalling REST {@link ResponseBody}s work properly.
- */
-public abstract class RestResponseMarshallingTestBase<R extends ResponseBody> extends TestLogger {
-
-	/**
-	 * Returns the class of the test response.
-	 *
-	 * @return class of the test response type
-	 */
-	protected abstract Class<R> getTestResponseClass();
-
-	/**
-	 * Returns an instance of a response to be tested.
-	 *
-	 * @return instance of the expected test response
-	 */
-	protected abstract R getTestResponseInstance() throws Exception;
-
-	/**
-	 * Tests that we can marshal and unmarshal the response.
-	 */
-	@Test
-	public void testJsonMarshalling() throws Exception {
-		final R expected = getTestResponseInstance();
-
-		ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
-		JsonNode json = objectMapper.valueToTree(expected);
-
-		final R unmarshalled = objectMapper.treeToValue(json, getTestResponseClass());
-		Assert.assertEquals(expected, unmarshalled);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java
deleted file mode 100644
index 6b01dbe..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/StatusOverviewWithVersionTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy.messages;
-
-/**
- * Tests for the {@link StatusOverviewWithVersion}.
- */
-public class StatusOverviewWithVersionTest extends RestResponseMarshallingTestBase<StatusOverviewWithVersion> {
-
-	@Override
-	protected Class<StatusOverviewWithVersion> getTestResponseClass() {
-		return StatusOverviewWithVersion.class;
-	}
-
-	@Override
-	protected StatusOverviewWithVersion getTestResponseInstance() {
-		return new StatusOverviewWithVersion(
-			1,
-			3,
-			3,
-			7,
-			4,
-			2,
-			0,
-			"version",
-			"commit");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
index add4e3b..7ad72fc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/BlobServerPortResponseTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.rest.messages;
 
-import org.apache.flink.runtime.rest.handler.legacy.messages.RestResponseMarshallingTestBase;
-
 /**
  * Tests for {@link BlobServerPortResponseBody}.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoTest.java
new file mode 100644
index 0000000..8ab43f2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/ClusterConfigurationInfoTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Tests for the {@link ClusterConfigurationInfo}.
+ */
+public class ClusterConfigurationInfoTest extends RestResponseMarshallingTestBase<ClusterConfigurationInfo> {
+
+	@Override
+	protected Class<ClusterConfigurationInfo> getTestResponseClass() {
+		return ClusterConfigurationInfo.class;
+	}
+
+	@Override
+	protected ClusterConfigurationInfo getTestResponseInstance() {
+		final ClusterConfigurationInfo expected = new ClusterConfigurationInfo(2);
+		expected.add(new ClusterConfigurationInfoEntry("key1", "value1"));
+		expected.add(new ClusterConfigurationInfoEntry("key2", "value2"));
+
+		return expected;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
new file mode 100644
index 0000000..789310e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/DashboardConfigurationTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Tests for the {@link DashboardConfiguration}.
+ */
+public class DashboardConfigurationTest extends RestResponseMarshallingTestBase<DashboardConfiguration> {
+
+	@Override
+	protected Class<DashboardConfiguration> getTestResponseClass() {
+		return DashboardConfiguration.class;
+	}
+
+	@Override
+	protected DashboardConfiguration getTestResponseInstance() {
+		return new DashboardConfiguration(
+			1L,
+			"foobar",
+			42,
+			"version",
+			"revision");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobConfigInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobConfigInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobConfigInfoTest.java
new file mode 100644
index 0000000..88fadb7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobConfigInfoTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests that the {@link JobConfigInfo} can be marshalled and unmarshalled.
+ */
+public class JobConfigInfoTest extends RestResponseMarshallingTestBase<JobConfigInfo> {
+
+	@Override
+	protected Class<JobConfigInfo> getTestResponseClass() {
+		return JobConfigInfo.class;
+	}
+
+	@Override
+	protected JobConfigInfo getTestResponseInstance() {
+		final Map<String, String> globalJobParameters = new HashMap<>(3);
+		globalJobParameters.put("foo", "bar");
+		globalJobParameters.put("bar", "foo");
+		globalJobParameters.put("hi", "ho");
+
+		final JobConfigInfo.ExecutionConfigInfo executionConfigInfo = new JobConfigInfo.ExecutionConfigInfo(
+			"foobar",
+			"always",
+			42,
+			false,
+			globalJobParameters);
+		return new JobConfigInfo(new JobID(), "testJob", executionConfigInfo);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
index e69913c..7627d98 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitRequestBodyTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.rest.messages;
 
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.rest.handler.legacy.messages.RestRequestMarshallingTestBase;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitRequestBody;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java
index 9dc832a..d523716 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobSubmitResponseBodyTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.messages;
 
-import org.apache.flink.runtime.rest.handler.legacy.messages.RestResponseMarshallingTestBase;
 import org.apache.flink.runtime.rest.messages.job.JobSubmitResponseBody;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestRequestMarshallingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestRequestMarshallingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestRequestMarshallingTestBase.java
new file mode 100644
index 0000000..70a8f78
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestRequestMarshallingTestBase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test base for verifying that marshalling / unmarshalling REST {@link RequestBody}s work properly.
+ */
+public abstract class RestRequestMarshallingTestBase<R extends RequestBody> extends TestLogger {
+
+	/**
+	 * Returns the class of the test response.
+	 *
+	 * @return class of the test response type
+	 */
+	protected abstract Class<R> getTestRequestClass();
+
+	/**
+	 * Returns an instance of a response to be tested.
+	 *
+	 * @return instance of the expected test response
+	 */
+	protected abstract R getTestRequestInstance() throws Exception;
+
+	/**
+	 * Tests that we can marshal and unmarshal the response.
+	 */
+	@Test
+	public void testJsonMarshalling() throws Exception {
+		final R expected = getTestRequestInstance();
+
+		ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+		JsonNode json = objectMapper.valueToTree(expected);
+
+		final R unmarshalled = objectMapper.treeToValue(json, getTestRequestClass());
+		Assert.assertEquals(expected, unmarshalled);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java
new file mode 100644
index 0000000..82eb436
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/RestResponseMarshallingTestBase.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.util.RestMapperUtils;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test base for verifying that marshalling / unmarshalling REST {@link ResponseBody}s work properly.
+ */
+public abstract class RestResponseMarshallingTestBase<R extends ResponseBody> extends TestLogger {
+
+	/**
+	 * Returns the class of the test response.
+	 *
+	 * @return class of the test response type
+	 */
+	protected abstract Class<R> getTestResponseClass();
+
+	/**
+	 * Returns an instance of a response to be tested.
+	 *
+	 * @return instance of the expected test response
+	 */
+	protected abstract R getTestResponseInstance() throws Exception;
+
+	/**
+	 * Tests that we can marshal and unmarshal the response.
+	 */
+	@Test
+	public void testJsonMarshalling() throws Exception {
+		final R expected = getTestResponseInstance();
+
+		ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper();
+		JsonNode json = objectMapper.valueToTree(expected);
+
+		final R unmarshalled = objectMapper.treeToValue(json, getTestResponseClass());
+		Assert.assertEquals(expected, unmarshalled);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersionTest.java
new file mode 100644
index 0000000..b2376c5
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/StatusOverviewWithVersionTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+/**
+ * Tests for the {@link StatusOverviewWithVersion}.
+ */
+public class StatusOverviewWithVersionTest extends RestResponseMarshallingTestBase<StatusOverviewWithVersion> {
+
+	@Override
+	protected Class<StatusOverviewWithVersion> getTestResponseClass() {
+		return StatusOverviewWithVersion.class;
+	}
+
+	@Override
+	protected StatusOverviewWithVersion getTestResponseInstance() {
+		return new StatusOverviewWithVersion(
+			1,
+			3,
+			3,
+			7,
+			4,
+			2,
+			0,
+			"version",
+			"commit");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfoTest.java
new file mode 100644
index 0000000..5259165
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointConfigInfoTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+/**
+ * Tests for the {@link CheckpointConfigInfo}.
+ */
+public class CheckpointConfigInfoTest extends RestResponseMarshallingTestBase<CheckpointConfigInfo> {
+	@Override
+	protected Class<CheckpointConfigInfo> getTestResponseClass() {
+		return CheckpointConfigInfo.class;
+	}
+
+	@Override
+	protected CheckpointConfigInfo getTestResponseInstance() {
+		final CheckpointConfigInfo.ExternalizedCheckpointInfo externalizedCheckpointInfo = new CheckpointConfigInfo.ExternalizedCheckpointInfo(true, false);
+
+		return new CheckpointConfigInfo(
+			CheckpointConfigInfo.ProcessingMode.AT_LEAST_ONCE,
+			1L,
+			2L,
+			3L,
+			4,
+			externalizedCheckpointInfo);
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
new file mode 100644
index 0000000..562418e
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/CheckpointingStatisticsTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for {@link CheckpointingStatistics}.
+ */
+public class CheckpointingStatisticsTest extends RestResponseMarshallingTestBase<CheckpointingStatistics> {
+	@Override
+	protected Class<CheckpointingStatistics> getTestResponseClass() {
+		return CheckpointingStatistics.class;
+	}
+
+	@Override
+	protected CheckpointingStatistics getTestResponseInstance() throws Exception {
+
+		final CheckpointingStatistics.Counts counts = new CheckpointingStatistics.Counts(1, 2, 3, 4, 5);
+		final CheckpointingStatistics.Summary summary = new CheckpointingStatistics.Summary(
+			new MinMaxAvgStatistics(1L, 1L, 1L),
+			new MinMaxAvgStatistics(2L, 2L, 2L),
+			new MinMaxAvgStatistics(3L, 3L, 3L));
+
+		final Map<JobVertexID, TaskCheckpointStatistics> checkpointStatisticsPerTask = new HashMap<>(2);
+
+		checkpointStatisticsPerTask.put(
+			new JobVertexID(),
+			new TaskCheckpointStatistics(
+				1L,
+				CheckpointStatsStatus.COMPLETED,
+				1L,
+				2L,
+				3L,
+				4L,
+				5,
+				6));
+
+		checkpointStatisticsPerTask.put(
+			new JobVertexID(),
+			new TaskCheckpointStatistics(
+				1L,
+				CheckpointStatsStatus.COMPLETED,
+				2L,
+				3L,
+				4L,
+				5L,
+				6,
+				7));
+
+		final CheckpointStatistics.CompletedCheckpointStatistics completed = new CheckpointStatistics.CompletedCheckpointStatistics(
+			1L,
+			CheckpointStatsStatus.COMPLETED,
+			false,
+			42L,
+			41L,
+			1337L,
+			1L,
+			0L,
+			10,
+			10,
+			Collections.emptyMap(),
+			null,
+			false);
+
+		final CheckpointStatistics.CompletedCheckpointStatistics savepoint = new CheckpointStatistics.CompletedCheckpointStatistics(
+			2L,
+			CheckpointStatsStatus.COMPLETED,
+			true,
+			11L,
+			10L,
+			43L,
+			1L,
+			0L,
+			9,
+			9,
+			checkpointStatisticsPerTask,
+			"externalPath",
+			false);
+
+		final CheckpointStatistics.FailedCheckpointStatistics failed = new CheckpointStatistics.FailedCheckpointStatistics(
+			3L,
+			CheckpointStatsStatus.FAILED,
+			false,
+			5L,
+			10L,
+			4L,
+			2L,
+			0L,
+			11,
+			9,
+			Collections.emptyMap(),
+			100L,
+			"Test failure");
+
+		CheckpointingStatistics.RestoredCheckpointStatistics restored = new CheckpointingStatistics.RestoredCheckpointStatistics(
+			4L,
+			1445L,
+			true,
+			"foobar");
+
+		final CheckpointingStatistics.LatestCheckpoints latestCheckpoints = new CheckpointingStatistics.LatestCheckpoints(
+			completed,
+			savepoint,
+			failed,
+			restored);
+
+		return new CheckpointingStatistics(
+			counts,
+			summary,
+			latestCheckpoints,
+			Arrays.asList(completed, savepoint, failed));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsTest.java
new file mode 100644
index 0000000..f51d5b1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+/**
+ * Tests the (un)marshalling of {@link TaskCheckpointStatistics}.
+ */
+public class TaskCheckpointStatisticsTest extends RestResponseMarshallingTestBase<TaskCheckpointStatistics> {
+
+	@Override
+	protected Class<TaskCheckpointStatistics> getTestResponseClass() {
+		return TaskCheckpointStatistics.class;
+	}
+
+	@Override
+	protected TaskCheckpointStatistics getTestResponseInstance() throws Exception {
+		return new TaskCheckpointStatistics(
+			1L,
+			CheckpointStatsStatus.FAILED,
+			42L,
+			1L,
+			23L,
+			1337L,
+			9,
+			8);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java
new file mode 100644
index 0000000..29e92ee
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/checkpoints/TaskCheckpointStatisticsWithSubtaskDetailsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests (un)marshalling of {@link TaskCheckpointStatisticsWithSubtaskDetails}.
+ */
+public class TaskCheckpointStatisticsWithSubtaskDetailsTest extends RestResponseMarshallingTestBase<TaskCheckpointStatisticsWithSubtaskDetails> {
+
+	@Override
+	protected Class<TaskCheckpointStatisticsWithSubtaskDetails> getTestResponseClass() {
+		return TaskCheckpointStatisticsWithSubtaskDetails.class;
+	}
+
+	@Override
+	protected TaskCheckpointStatisticsWithSubtaskDetails getTestResponseInstance() throws Exception {
+		final TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = new TaskCheckpointStatisticsWithSubtaskDetails.Summary(
+			new MinMaxAvgStatistics(1L, 2L, 3L),
+			new MinMaxAvgStatistics(1L, 2L, 3L),
+			new TaskCheckpointStatisticsWithSubtaskDetails.CheckpointDuration(
+				new MinMaxAvgStatistics(1L, 2L, 3L),
+				new MinMaxAvgStatistics(1L, 2L, 3L)),
+			new TaskCheckpointStatisticsWithSubtaskDetails.CheckpointAlignment(
+				new MinMaxAvgStatistics(1L, 2L, 3L),
+			new MinMaxAvgStatistics(1L, 2L, 3L)));
+
+		List<SubtaskCheckpointStatistics> subtaskCheckpointStatistics = new ArrayList<>(2);
+
+		subtaskCheckpointStatistics.add(new SubtaskCheckpointStatistics.PendingSubtaskCheckpointStatistics(0));
+		subtaskCheckpointStatistics.add(new SubtaskCheckpointStatistics.CompletedSubtaskCheckpointStatistics(
+			1,
+			4L,
+			13L,
+			1337L,
+			new SubtaskCheckpointStatistics.CompletedSubtaskCheckpointStatistics.CheckpointDuration(1L, 2L),
+			new SubtaskCheckpointStatistics.CompletedSubtaskCheckpointStatistics.CheckpointAlignment(2L, 3L)));
+
+		return new TaskCheckpointStatisticsWithSubtaskDetails(
+			4L,
+			CheckpointStatsStatus.COMPLETED,
+			4L,
+			1337L,
+			1L,
+			2L,
+			8,
+			9,
+			summary,
+			subtaskCheckpointStatistics);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc4638a3/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBodyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBodyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBodyTest.java
index 5f7f918..9ae153a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBodyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerResponseBodyTest.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages.job.savepoints;
 
-import org.apache.flink.runtime.rest.handler.legacy.messages.RestResponseMarshallingTestBase;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
 
 /**
  * Tests for the {@link SavepointTriggerResponseBody}.


Mime
View raw message