flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-8368] Migrate org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler to new a REST handler that registered in WebMonitorEndpoint
Date Sun, 14 Jan 2018 13:43:45 GMT
Repository: flink
Updated Branches:
  refs/heads/master 598d96c10 -> 3920e9a47


[FLINK-8368] Migrate org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler to new a REST handler that registered in WebMonitorEndpoint

[FLINK-8368] Add attempts path info that is missing in SubtaskExecutionAttemptDetailsHeaders

This closes #5270.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8b2c0fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8b2c0fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8b2c0fe

Branch: refs/heads/master
Commit: d8b2c0febfdb1c15e4251247a87c1c606ea2a284
Parents: 598d96c
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Sun Jan 14 10:05:59 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Sun Jan 14 10:06:04 2018 +0100

----------------------------------------------------------------------
 .../handler/job/AbstractJobVertexHandler.java   | 101 ++++++++++
 .../job/AbstractSubtaskAttemptHandler.java      | 109 +++++++++++
 .../handler/job/AbstractSubtaskHandler.java     | 103 +++++++++++
 .../rest/handler/job/JobDetailsHandler.java     |   3 +-
 .../job/JobVertexAccumulatorsHandler.java       |  37 ++--
 .../SubtaskExecutionAttemptDetailsHandler.java  | 131 +++++++++++++
 .../rest/handler/job/SubtasksTimesHandler.java  |   9 +-
 .../legacy/AbstractJobVertexRequestHandler.java |   2 +-
 .../rest/messages/job/JobDetailsInfo.java       |  96 +---------
 .../job/SubtaskAttemptMessageParameters.java    |  41 +++++
 .../job/SubtaskAttemptPathParameter.java        |  53 ++++++
 .../SubtaskExecutionAttemptDetailsHeaders.java  |  78 ++++++++
 .../job/SubtaskExecutionAttemptDetailsInfo.java | 122 +++++++++++++
 .../messages/job/SubtaskMessageParameters.java  |  39 ++++
 .../messages/job/metrics/IOMetricsInfo.java     | 114 ++++++++++++
 .../SubtaskMetricsMessageParameters.java        |  14 +-
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  13 ++
 ...btaskExecutionAttemptDetailsHandlerTest.java | 182 +++++++++++++++++++
 .../rest/messages/job/JobDetailsInfoTest.java   |   3 +-
 .../SubtaskExecutionAttemptDetailsInfoTest.java |  63 +++++++
 20 files changed, 1177 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java
new file mode 100644
index 0000000..f0f11bd
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractJobVertexHandler.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for request handlers whose response depends on a specific job vertex (defined
+ * via the "{@link JobVertexIdPathParameter#KEY}" parameter) in a specific job,
+ * defined via (defined via the "{@link JobIDPathParameter#KEY}" parameter).
+ *
+ * @param <R> the response type
+ * @param <M> the message parameters type
+ */
+public abstract class AbstractJobVertexHandler<R extends ResponseBody, M extends JobVertexMessageParameters> extends AbstractExecutionGraphHandler<R, M> {
+
+	/**
+	 * Instantiates a new Abstract job vertex handler.
+	 *
+	 * @param localRestAddress    the local rest address
+	 * @param leaderRetriever     the leader retriever
+	 * @param timeout             the timeout
+	 * @param responseHeaders     the response headers
+	 * @param messageHeaders      the message headers
+	 * @param executionGraphCache the execution graph cache
+	 * @param executor            the executor
+	 */
+	protected AbstractJobVertexHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor) {
+
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
+	}
+
+	@Override
+	protected R handleRequest(
+			HandlerRequest<EmptyRequestBody, M> request,
+			AccessExecutionGraph executionGraph) throws RestHandlerException {
+
+		final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
+		final AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
+
+		if (jobVertex == null) {
+			throw new RestHandlerException("No vertex with ID '" + jobVertexID + "' exists.", HttpResponseStatus.NOT_FOUND);
+		}
+
+		return handleRequest(request, jobVertex);
+	}
+
+	/**
+	 * Called for each request after the corresponding {@link AccessExecutionJobVertex} has been retrieved from the
+	 * {@link AccessExecutionGraph}.
+	 *
+	 * @param request   the request
+	 * @param jobVertex the execution job vertex
+	 * @return the response
+	 * @throws RestHandlerException if the handler could not process the request
+	 */
+	protected abstract R handleRequest(HandlerRequest<EmptyRequestBody, M> request, AccessExecutionJobVertex jobVertex) throws RestHandlerException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java
new file mode 100644
index 0000000..388ba8b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskAttemptHandler.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for request handlers whose response depends on a specific attempt (defined
+ * via the "{@link SubtaskAttemptPathParameter#KEY}" of a specific subtask (defined
+ * via the "{@link SubtaskIndexPathParameter#KEY}" in a specific job vertex, (defined
+ * via the "{@link JobVertexIdPathParameter#KEY}" parameter) in a specific job,
+ * defined via (defined via the "{@link JobIDPathParameter#KEY}" parameter).
+ *
+ * @param <R> the response type
+ * @param <M> the message parameters type
+ */
+public abstract class AbstractSubtaskAttemptHandler<R extends ResponseBody, M extends SubtaskAttemptMessageParameters> extends AbstractSubtaskHandler<R, M>{
+	/**
+	 * Instantiates a new Abstract job vertex handler.
+	 *
+	 * @param localRestAddress    the local rest address
+	 * @param leaderRetriever     the leader retriever
+	 * @param timeout             the timeout
+	 * @param responseHeaders     the response headers
+	 * @param messageHeaders      the message headers
+	 * @param executionGraphCache the execution graph cache
+	 * @param executor            the executor
+	 */
+	protected AbstractSubtaskAttemptHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor) {
+
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
+	}
+
+	@Override
+	protected R handleRequest(HandlerRequest<EmptyRequestBody, M> request, AccessExecutionVertex executionVertex) throws RestHandlerException {
+		final Integer attemptNumber = request.getPathParameter(SubtaskAttemptPathParameter.class);
+
+		final AccessExecution currentAttempt = executionVertex.getCurrentExecutionAttempt();
+		if (attemptNumber == currentAttempt.getAttemptNumber()) {
+			return handleRequest(request, currentAttempt);
+		} else if (attemptNumber >= 0 && attemptNumber < currentAttempt.getAttemptNumber()) {
+			final AccessExecution execution = executionVertex.getPriorExecutionAttempt(attemptNumber);
+
+			if (execution != null) {
+				return handleRequest(request, execution);
+			} else {
+				throw new RestHandlerException("Attempt " + attemptNumber + " not found in subtask " +
+					executionVertex.getTaskNameWithSubtaskIndex(), HttpResponseStatus.NOT_FOUND);
+			}
+		} else {
+			throw new RestHandlerException("Invalid attempt num " + attemptNumber, HttpResponseStatus.NOT_FOUND);
+		}
+	}
+
+	/**
+	 * Called for each request after the corresponding {@link AccessExecution} has been retrieved from the
+	 * {@link AccessExecutionVertex}.
+	 *
+	 * @param request   the request
+	 * @param execution the execution
+	 * @return the response
+	 * @throws RestHandlerException the rest handler exception
+	 */
+	protected abstract R handleRequest(HandlerRequest<EmptyRequestBody, M> request, AccessExecution execution) throws RestHandlerException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskHandler.java
new file mode 100644
index 0000000..8260604
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractSubtaskHandler.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+import org.apache.flink.runtime.rest.messages.job.SubtaskMessageParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+
+/**
+ * Base class for request handlers whose response depends on a specific subtask (defined
+ * via the "{@link SubtaskIndexPathParameter#KEY}" in a specific job vertex, (defined
+ * via the "{@link JobVertexIdPathParameter#KEY}" parameter) in a specific job,
+ * defined via (defined via the "{@link JobIDPathParameter#KEY}" parameter).
+ *
+ *
+ * @param <R> the response type
+ * @param <M> the message parameters type
+ */
+public abstract class AbstractSubtaskHandler<R extends ResponseBody, M extends SubtaskMessageParameters> extends AbstractJobVertexHandler<R, M> {
+
+	/**
+	 * Instantiates a new Abstract job vertex handler.
+	 *
+	 * @param localRestAddress    the local rest address
+	 * @param leaderRetriever     the leader retriever
+	 * @param timeout             the timeout
+	 * @param responseHeaders     the response headers
+	 * @param messageHeaders      the message headers
+	 * @param executionGraphCache the execution graph cache
+	 * @param executor            the executor
+	 */
+	protected AbstractSubtaskHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout, Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor) {
+
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
+	}
+
+	@Override
+	protected R handleRequest(
+			HandlerRequest<EmptyRequestBody, M> request,
+			AccessExecutionJobVertex jobVertex) throws RestHandlerException {
+
+		final Integer subtaskIndex = request.getPathParameter(SubtaskIndexPathParameter.class);
+		final AccessExecutionVertex[] executionVertices = jobVertex.getTaskVertices();
+
+		if (subtaskIndex >= executionVertices.length || subtaskIndex < 0) {
+			throw new RestHandlerException("Invalid subtask index for vertex " + jobVertex.getJobVertexId(), HttpResponseStatus.NOT_FOUND);
+		}
+
+		return handleRequest(request, executionVertices[subtaskIndex]);
+	}
+
+	/**
+	 * Called for each request after the corresponding {@link AccessExecutionVertex} has been retrieved from the
+	 * {@link AccessExecutionJobVertex}.
+	 *
+	 * @param request         the request
+	 * @param executionVertex the execution vertex
+	 * @return the response
+	 * @throws RestHandlerException the rest handler exception
+	 */
+	protected abstract R handleRequest(HandlerRequest<EmptyRequestBody, M> request, AccessExecutionVertex executionVertex) throws RestHandlerException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 647763a..82f24d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
@@ -185,7 +186,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI
 				ejv.getJobVertexId().toString());
 		}
 
-		final JobDetailsInfo.JobVertexMetrics jobVertexMetrics = new JobDetailsInfo.JobVertexMetrics(
+		final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo(
 			counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
 			counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
 			counts.getNumBytesOut(),

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
index 55c465c..52e5632 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
@@ -20,22 +20,17 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsInfo;
-import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -44,7 +39,7 @@ import java.util.concurrent.Executor;
 /**
  * Request handler for the job vertex accumulators.
  */
-public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphHandler<JobVertexAccumulatorsInfo, JobVertexMessageParameters> {
+public class JobVertexAccumulatorsHandler extends AbstractJobVertexHandler<JobVertexAccumulatorsInfo, JobVertexMessageParameters> {
 
 	public JobVertexAccumulatorsHandler(
 			CompletableFuture<String> localRestAddress,
@@ -65,25 +60,21 @@ public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphHandler<
 	}
 
 	@Override
-	protected JobVertexAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException {
-		JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
-		AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
-
-		if (null != jobVertex) {
-			StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
-			ArrayList<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length);
+	protected JobVertexAccumulatorsInfo handleRequest(
+			HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request,
+			AccessExecutionJobVertex jobVertex) throws RestHandlerException {
 
-			for (StringifiedAccumulatorResult acc : accs) {
-				userAccumulatorList.add(
-					new JobVertexAccumulatorsInfo.UserAccumulator(
-						acc.getName(),
-						acc.getType(),
-						acc.getValue()));
-			}
+		StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
+		ArrayList<JobVertexAccumulatorsInfo.UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length);
 
-			return new JobVertexAccumulatorsInfo(jobVertex.getJobVertexId().toString(), userAccumulatorList);
-		} else {
-			throw new RestHandlerException("There is no accumulator for vertex " + jobVertexID + '.', HttpResponseStatus.NOT_FOUND);
+		for (StringifiedAccumulatorResult acc : accs) {
+			userAccumulatorList.add(
+				new JobVertexAccumulatorsInfo.UserAccumulator(
+					acc.getName(),
+					acc.getType(),
+					acc.getValue()));
 		}
+
+		return new JobVertexAccumulatorsInfo(jobVertex.getJobVertexId().toString(), userAccumulatorList);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
new file mode 100644
index 0000000..1669914
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler of specific sub task execution attempt.
+ */
+public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> {
+
+	private final MetricFetcher<?> metricFetcher;
+
+	/**
+	 * Instantiates a new Abstract job vertex handler.
+	 *
+	 * @param localRestAddress    the local rest address
+	 * @param leaderRetriever     the leader retriever
+	 * @param timeout             the timeout
+	 * @param responseHeaders     the response headers
+	 * @param messageHeaders      the message headers
+	 * @param executionGraphCache the execution graph cache
+	 * @param executor            the executor
+	 */
+	public SubtaskExecutionAttemptDetailsHandler(
+			CompletableFuture<String> localRestAddress,
+			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+			Time timeout,
+			Map<String, String> responseHeaders,
+			MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> messageHeaders,
+			ExecutionGraphCache executionGraphCache,
+			Executor executor,
+			MetricFetcher<?> metricFetcher) {
+
+		super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor);
+
+		this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
+	}
+
+	@Override
+	protected SubtaskExecutionAttemptDetailsInfo handleRequest(
+			HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request,
+			AccessExecution execution) throws RestHandlerException {
+
+		final ExecutionState status = execution.getState();
+		final long now = System.currentTimeMillis();
+
+		final TaskManagerLocation location = execution.getAssignedResourceLocation();
+		final String locationString = location == null ? "(unassigned)" : location.getHostname();
+
+		long startTime = execution.getStateTimestamp(ExecutionState.DEPLOYING);
+		if (startTime == 0) {
+			startTime = -1;
+		}
+		final long endTime = status.isTerminal() ? execution.getStateTimestamp(status) : -1;
+		final long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
+
+		final MutableIOMetrics counts = new MutableIOMetrics();
+
+		final JobID jobID = request.getPathParameter(JobIDPathParameter.class);
+		final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
+
+		counts.addIOMetrics(
+			execution,
+			metricFetcher,
+			jobID.toString(),
+			jobVertexID.toString()
+		);
+
+		final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo(
+			counts.getNumBytesInLocal() + counts.getNumBytesInRemote(),
+			counts.isNumBytesInLocalComplete() && counts.isNumBytesInRemoteComplete(),
+			counts.getNumBytesOut(),
+			counts.isNumBytesOutComplete(),
+			counts.getNumRecordsIn(),
+			counts.isNumRecordsInComplete(),
+			counts.getNumRecordsOut(),
+			counts.isNumRecordsOutComplete());
+
+		return new SubtaskExecutionAttemptDetailsInfo(
+			execution.getParallelSubtaskIndex(),
+			status,
+			execution.getAttemptNumber(),
+			locationString,
+			startTime,
+			endTime,
+			duration,
+			ioMetricsInfo
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
index bc72e51..29c0f93 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
@@ -20,14 +20,11 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
-import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.SubtasksTimesInfo;
@@ -45,7 +42,7 @@ import java.util.concurrent.Executor;
 /**
  * Request handler for the subtasks times info.
  */
-public class SubtasksTimesHandler extends AbstractExecutionGraphHandler<SubtasksTimesInfo, JobVertexMessageParameters>  {
+public class SubtasksTimesHandler extends AbstractJobVertexHandler<SubtasksTimesInfo, JobVertexMessageParameters>  {
 	public SubtasksTimesHandler(
 			CompletableFuture<String> localRestAddress,
 			GatewayRetriever<? extends RestfulGateway> leaderRetriever,
@@ -65,9 +62,7 @@ public class SubtasksTimesHandler extends AbstractExecutionGraphHandler<Subtasks
 	}
 
 	@Override
-	protected SubtasksTimesInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionGraph executionGraph) {
-		JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class);
-		AccessExecutionJobVertex jobVertex = executionGraph.getJobVertex(jobVertexID);
+	protected SubtasksTimesInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionJobVertex jobVertex) {
 
 		final String id = jobVertex.getJobVertexId().toString();
 		final String name = jobVertex.getName();

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
index 70606e4..92bf51e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
@@ -28,7 +28,7 @@ import java.util.concurrent.Executor;
 
 /**
  * Base class for request handlers whose response depends on a specific job vertex (defined
- * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter).
+ * via the "vertexid" parameter) in a specific job, defined via (defined via the "jobid" parameter).
  */
 public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
index 551913f..2c74389 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
 import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
 import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
@@ -217,7 +218,7 @@ public class JobDetailsInfo implements ResponseBody {
 		private final Map<ExecutionState, Integer> tasksPerState;
 
 		@JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS)
-		private final JobVertexMetrics jobVertexMetrics;
+		private final IOMetricsInfo jobVertexMetrics;
 
 		@JsonCreator
 		public JobVertexDetailsInfo(
@@ -229,7 +230,7 @@ public class JobDetailsInfo implements ResponseBody {
 				@JsonProperty(FIELD_NAME_JOB_VERTEX_END_TIME) long endTime,
 				@JsonProperty(FIELD_NAME_JOB_VERTEX_DURATION) long duration,
 				@JsonProperty(FIELD_NAME_TASKS_PER_STATE) Map<ExecutionState, Integer> tasksPerState,
-				@JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS) JobVertexMetrics jobVertexMetrics) {
+				@JsonProperty(FIELD_NAME_JOB_VERTEX_METRICS) IOMetricsInfo jobVertexMetrics) {
 			this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
 			this.name = Preconditions.checkNotNull(name);
 			this.parallelism = parallelism;
@@ -273,7 +274,7 @@ public class JobDetailsInfo implements ResponseBody {
 			return tasksPerState;
 		}
 
-		public JobVertexMetrics getJobVertexMetrics() {
+		public IOMetricsInfo getJobVertexMetrics() {
 			return jobVertexMetrics;
 		}
 
@@ -303,93 +304,4 @@ public class JobDetailsInfo implements ResponseBody {
 		}
 	}
 
-	/**
-	 * Metrics of a job vertex.
-	 */
-	public static final class JobVertexMetrics {
-
-		public static final String FIELD_NAME_BYTES_READ = "read-bytes";
-
-		public static final String FIELD_NAME_BYTES_READ_COMPLETE = "read-bytes-complete";
-
-		public static final String FIELD_NAME_BYTES_WRITTEN = "write-bytes";
-
-		public static final String FIELD_NAME_BYTES_WRITTEN_COMPLETE = "write-bytes-complete";
-
-		public static final String FIELD_NAME_RECORDS_READ = "read-records";
-
-		public static final String FIELD_NAME_RECORDS_READ_COMPLETE = "read-records-complete";
-
-		public static final String FIELD_NAME_RECORDS_WRITTEN = "write-records";
-
-		public static final String FIELD_NAME_RECORDS_WRITTEN_COMPLETE = "write-records-complete";
-
-		@JsonProperty(FIELD_NAME_BYTES_READ)
-		private final long bytesRead;
-
-		@JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE)
-		private final boolean bytesReadComplete;
-
-		@JsonProperty(FIELD_NAME_BYTES_WRITTEN)
-		private final long bytesWritten;
-
-		@JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE)
-		private final boolean bytesWrittenComplete;
-
-		@JsonProperty(FIELD_NAME_RECORDS_READ)
-		private final long recordsRead;
-
-		@JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE)
-		private final boolean recordsReadComplete;
-
-		@JsonProperty(FIELD_NAME_RECORDS_WRITTEN)
-		private final long recordsWritten;
-
-		@JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE)
-		private final boolean recordsWrittenComplete;
-
-		@JsonCreator
-		public JobVertexMetrics(
-				@JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead,
-				@JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE) boolean bytesReadComplete,
-				@JsonProperty(FIELD_NAME_BYTES_WRITTEN) long bytesWritten,
-				@JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE) boolean bytesWrittenComplete,
-				@JsonProperty(FIELD_NAME_RECORDS_READ) long recordsRead,
-				@JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) boolean recordsReadComplete,
-				@JsonProperty(FIELD_NAME_RECORDS_WRITTEN) long recordsWritten,
-				@JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete) {
-			this.bytesRead = bytesRead;
-			this.bytesReadComplete = bytesReadComplete;
-			this.bytesWritten = bytesWritten;
-			this.bytesWrittenComplete = bytesWrittenComplete;
-			this.recordsRead = recordsRead;
-			this.recordsReadComplete = recordsReadComplete;
-			this.recordsWritten = recordsWritten;
-			this.recordsWrittenComplete = recordsWrittenComplete;
-		}
-
-		@Override
-		public boolean equals(Object o) {
-			if (this == o) {
-				return true;
-			}
-			if (o == null || getClass() != o.getClass()) {
-				return false;
-			}
-			JobVertexMetrics that = (JobVertexMetrics) o;
-			return bytesRead == that.bytesRead &&
-				bytesReadComplete == that.bytesReadComplete &&
-				bytesWritten == that.bytesWritten &&
-				bytesWrittenComplete == that.bytesWrittenComplete &&
-				recordsRead == that.recordsRead &&
-				recordsReadComplete == that.recordsReadComplete &&
-				recordsWritten == that.recordsWritten &&
-				recordsWrittenComplete == that.recordsWrittenComplete;
-		}
-
-		@Override
-		public int hashCode() {
-			return Objects.hash(bytesRead, bytesReadComplete, bytesWritten, bytesWrittenComplete, recordsRead, recordsReadComplete, recordsWritten, recordsWrittenComplete);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptMessageParameters.java
new file mode 100644
index 0000000..a144264
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptMessageParameters.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * The type Subtask attempt message parameters.
+ */
+public class SubtaskAttemptMessageParameters extends SubtaskMessageParameters {
+
+	protected final SubtaskAttemptPathParameter subtaskAttemptPathParameter = new SubtaskAttemptPathParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Arrays.asList(
+			jobPathParameter,
+			jobVertexIdPathParameter,
+			subtaskIndexPathParameter,
+			subtaskAttemptPathParameter);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptPathParameter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptPathParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptPathParameter.java
new file mode 100644
index 0000000..220a121
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskAttemptPathParameter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+/**
+ * Path parameter identifying subtask attempt.
+ */
+public class SubtaskAttemptPathParameter extends MessagePathParameter<Integer> {
+	/**
+	 * The constant subtask attempt KEY.
+	 */
+	public static final String KEY = "attempt";
+
+	/**
+	 * Instantiates a new Subtask attempt path parameter.
+	 */
+	protected SubtaskAttemptPathParameter() {
+		super(KEY);
+	}
+
+	@Override
+	protected Integer convertFromString(String value) throws ConversionException {
+		try {
+			return Integer.parseInt(value);
+		} catch (NumberFormatException e) {
+			throw new ConversionException("Invalid attempt num " + value);
+		}
+	}
+
+	@Override
+	protected String convertToString(Integer value) {
+		return value.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
new file mode 100644
index 0000000..aa65007
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsHeaders.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link SubtaskExecutionAttemptDetailsHandler}.
+ */
+public class SubtaskExecutionAttemptDetailsHeaders implements MessageHeaders<EmptyRequestBody, SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> {
+
+	private static final SubtaskExecutionAttemptDetailsHeaders INSTANCE = new SubtaskExecutionAttemptDetailsHeaders();
+
+	public static final String URL = String.format(
+		"/jobs/:%s/vertices/:%s/subtasks/:%s/attempts/%s",
+		JobIDPathParameter.KEY,
+		JobVertexIdPathParameter.KEY,
+		SubtaskIndexPathParameter.KEY,
+		SubtaskAttemptPathParameter.KEY);
+
+	@Override
+	public HttpMethodWrapper getHttpMethod() {
+		return HttpMethodWrapper.GET;
+	}
+
+	@Override
+	public String getTargetRestEndpointURL() {
+		return URL;
+	}
+
+	@Override
+	public Class<EmptyRequestBody> getRequestClass() {
+		return EmptyRequestBody.class;
+	}
+
+	@Override
+	public Class<SubtaskExecutionAttemptDetailsInfo> getResponseClass() {
+		return SubtaskExecutionAttemptDetailsInfo.class;
+	}
+
+	@Override
+	public HttpResponseStatus getResponseStatusCode() {
+		return HttpResponseStatus.OK;
+	}
+
+	@Override
+	public SubtaskAttemptMessageParameters getUnresolvedMessageParameters() {
+		return new SubtaskAttemptMessageParameters();
+	}
+
+	public static SubtaskExecutionAttemptDetailsHeaders getInstance() {
+		return INSTANCE;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
new file mode 100644
index 0000000..a34dc10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * The sub task execution attempt response.
+ */
+public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody {
+
+	public static final String FIELD_NAME_SUBTASK_INDEX = "subtask";
+
+	public static final String FIELD_NAME_STATUS = "status";
+
+	public static final String FIELD_NAME_ATTEMPT = "attempt";
+
+	public static final String FIELD_NAME_HOST = "host";
+
+	public static final String FIELD_NAME_START_TIME = "start-time";
+
+	public static final String FIELD_NAME_END_TIME = "end-time";
+
+	public static final String FIELD_NAME_DURATION = "duration";
+
+	public static final String FIELD_NAME_METRICS = "metrics";
+
+	@JsonProperty(FIELD_NAME_SUBTASK_INDEX)
+	private final int subtaskIndex;
+
+	@JsonProperty(FIELD_NAME_STATUS)
+	private final ExecutionState status;
+
+	@JsonProperty(FIELD_NAME_ATTEMPT)
+	private final int attempt;
+
+	@JsonProperty(FIELD_NAME_HOST)
+	private final String host;
+
+	@JsonProperty(FIELD_NAME_START_TIME)
+	private final long startTime;
+
+	@JsonProperty(FIELD_NAME_END_TIME)
+	private final long endTime;
+
+	@JsonProperty(FIELD_NAME_DURATION)
+	private final long duration;
+
+	@JsonProperty(FIELD_NAME_METRICS)
+	private final IOMetricsInfo ioMetricsInfo;
+
+	@JsonCreator
+	public SubtaskExecutionAttemptDetailsInfo(
+			@JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex,
+			@JsonProperty(FIELD_NAME_STATUS) ExecutionState status,
+			@JsonProperty(FIELD_NAME_ATTEMPT) int attempt,
+			@JsonProperty(FIELD_NAME_HOST) String host,
+			@JsonProperty(FIELD_NAME_START_TIME) long startTime,
+			@JsonProperty(FIELD_NAME_END_TIME) long endTime,
+			@JsonProperty(FIELD_NAME_DURATION) long duration,
+			@JsonProperty(FIELD_NAME_METRICS) IOMetricsInfo ioMetricsInfo) {
+
+		this.subtaskIndex = subtaskIndex;
+		this.status = Preconditions.checkNotNull(status);
+		this.attempt = attempt;
+		this.host = Preconditions.checkNotNull(host);
+		this.startTime = startTime;
+		this.endTime = endTime;
+		this.duration = duration;
+		this.ioMetricsInfo = Preconditions.checkNotNull(ioMetricsInfo);
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		SubtaskExecutionAttemptDetailsInfo that = (SubtaskExecutionAttemptDetailsInfo) o;
+
+		return subtaskIndex == that.subtaskIndex &&
+			status == that.status &&
+			attempt == that.attempt &&
+			Objects.equals(host, that.host) &&
+			startTime == that.startTime &&
+			endTime == that.endTime &&
+			duration == that.duration &&
+			Objects.equals(ioMetricsInfo, that.ioMetricsInfo);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(subtaskIndex, status, attempt, host, startTime, endTime, duration, ioMetricsInfo);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskMessageParameters.java
new file mode 100644
index 0000000..872ff17
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskMessageParameters.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Message parameters for subtask REST handlers.
+ */
+public class SubtaskMessageParameters extends JobVertexMessageParameters {
+
+	protected final SubtaskIndexPathParameter subtaskIndexPathParameter = new SubtaskIndexPathParameter();
+
+	@Override
+	public Collection<MessagePathParameter<?>> getPathParameters() {
+		return Arrays.asList(jobPathParameter, jobVertexIdPathParameter, subtaskIndexPathParameter);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
new file mode 100644
index 0000000..d84265d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/IOMetricsInfo.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.metrics;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * IO metrics information.
+ */
+public final class IOMetricsInfo {
+
+	public static final String FIELD_NAME_BYTES_READ = "read-bytes";
+
+	public static final String FIELD_NAME_BYTES_READ_COMPLETE = "read-bytes-complete";
+
+	public static final String FIELD_NAME_BYTES_WRITTEN = "write-bytes";
+
+	public static final String FIELD_NAME_BYTES_WRITTEN_COMPLETE = "write-bytes-complete";
+
+	public static final String FIELD_NAME_RECORDS_READ = "read-records";
+
+	public static final String FIELD_NAME_RECORDS_READ_COMPLETE = "read-records-complete";
+
+	public static final String FIELD_NAME_RECORDS_WRITTEN = "write-records";
+
+	public static final String FIELD_NAME_RECORDS_WRITTEN_COMPLETE = "write-records-complete";
+
+	@JsonProperty(FIELD_NAME_BYTES_READ)
+	private final long bytesRead;
+
+	@JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE)
+	private final boolean bytesReadComplete;
+
+	@JsonProperty(FIELD_NAME_BYTES_WRITTEN)
+	private final long bytesWritten;
+
+	@JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE)
+	private final boolean bytesWrittenComplete;
+
+	@JsonProperty(FIELD_NAME_RECORDS_READ)
+	private final long recordsRead;
+
+	@JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE)
+	private final boolean recordsReadComplete;
+
+	@JsonProperty(FIELD_NAME_RECORDS_WRITTEN)
+	private final long recordsWritten;
+
+	@JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE)
+	private final boolean recordsWrittenComplete;
+
+	@JsonCreator
+	public IOMetricsInfo(
+			@JsonProperty(FIELD_NAME_BYTES_READ) long bytesRead,
+			@JsonProperty(FIELD_NAME_BYTES_READ_COMPLETE) boolean bytesReadComplete,
+			@JsonProperty(FIELD_NAME_BYTES_WRITTEN) long bytesWritten,
+			@JsonProperty(FIELD_NAME_BYTES_WRITTEN_COMPLETE) boolean bytesWrittenComplete,
+			@JsonProperty(FIELD_NAME_RECORDS_READ) long recordsRead,
+			@JsonProperty(FIELD_NAME_RECORDS_READ_COMPLETE) boolean recordsReadComplete,
+			@JsonProperty(FIELD_NAME_RECORDS_WRITTEN) long recordsWritten,
+			@JsonProperty(FIELD_NAME_RECORDS_WRITTEN_COMPLETE) boolean recordsWrittenComplete) {
+		this.bytesRead = bytesRead;
+		this.bytesReadComplete = bytesReadComplete;
+		this.bytesWritten = bytesWritten;
+		this.bytesWrittenComplete = bytesWrittenComplete;
+		this.recordsRead = recordsRead;
+		this.recordsReadComplete = recordsReadComplete;
+		this.recordsWritten = recordsWritten;
+		this.recordsWrittenComplete = recordsWrittenComplete;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		IOMetricsInfo that = (IOMetricsInfo) o;
+		return bytesRead == that.bytesRead &&
+			bytesReadComplete == that.bytesReadComplete &&
+			bytesWritten == that.bytesWritten &&
+			bytesWrittenComplete == that.bytesWrittenComplete &&
+			recordsRead == that.recordsRead &&
+			recordsReadComplete == that.recordsReadComplete &&
+			recordsWritten == that.recordsWritten &&
+			recordsWrittenComplete == that.recordsWrittenComplete;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(bytesRead, bytesReadComplete, bytesWritten, bytesWrittenComplete, recordsRead, recordsReadComplete, recordsWritten, recordsWrittenComplete);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java
index bdfa003..166766f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/metrics/SubtaskMetricsMessageParameters.java
@@ -19,12 +19,10 @@
 package org.apache.flink.runtime.rest.messages.job.metrics;
 
 import org.apache.flink.runtime.rest.handler.job.metrics.SubtaskMetricsHandler;
-import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
-import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
-import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+import org.apache.flink.runtime.rest.messages.job.SubtaskMessageParameters;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -33,20 +31,14 @@ import java.util.Collections;
 /**
  * {@link MessageParameters} for {@link SubtaskMetricsHandler}.
  */
-public class SubtaskMetricsMessageParameters extends MessageParameters {
-
-	private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter();
-
-	private final JobVertexIdPathParameter jobVertexIdPathParameter = new JobVertexIdPathParameter();
-
-	private final SubtaskIndexPathParameter subtaskIndexPathParameter = new SubtaskIndexPathParameter();
+public class SubtaskMetricsMessageParameters extends SubtaskMessageParameters {
 
 	private final MetricsFilterParameter metricsFilterParameter = new MetricsFilterParameter();
 
 	@Override
 	public Collection<MessagePathParameter<?>> getPathParameters() {
 		return Collections.unmodifiableCollection(Arrays.asList(
-			jobIDPathParameter,
+			jobPathParameter,
 			jobVertexIdPathParameter,
 			subtaskIndexPathParameter
 		));

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index f6bc669..faf1ae9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
+import org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptDetailsHandler;
 import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
 import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatisticDetailsHandler;
@@ -74,6 +75,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistic
 import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.JobExecutionResultHeaders;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
 import org.apache.flink.runtime.rest.messages.job.metrics.JobVertexMetricsHeaders;
@@ -365,6 +367,16 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 			timeout,
 			responseHeaders);
 
+		final SubtaskExecutionAttemptDetailsHandler subtaskExecutionAttemptDetailsHandler = new SubtaskExecutionAttemptDetailsHandler(
+			restAddressFuture,
+			leaderRetriever,
+			timeout,
+			responseHeaders,
+			null,
+			executionGraphCache,
+			executor,
+			metricFetcher);
+
 		final File tmpDir = restConfiguration.getTmpDir();
 
 		Optional<StaticFileServerHandler<T>> optWebContent;
@@ -406,6 +418,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
 		handlers.add(Tuple2.of(JobExecutionResultHeaders.getInstance(), jobExecutionResultHandler));
 		handlers.add(Tuple2.of(SavepointTriggerHeaders.getInstance(), savepointTriggerHandler));
 		handlers.add(Tuple2.of(SavepointStatusHeaders.getInstance(), savepointStatusHandler));
+		handlers.add(Tuple2.of(SubtaskExecutionAttemptDetailsHeaders.getInstance(), subtaskExecutionAttemptDetailsHandler));
 
 		// This handler MUST be added last, as it otherwise masks all subsequent GET handlers
 		optWebContent.ifPresent(

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
new file mode 100644
index 0000000..4f32087
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.DummyJobInformation;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter;
+import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests of {@link SubtaskExecutionAttemptDetailsHandler}.
+ */
+public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger {
+
+	@Test
+	public void testHandleRequest() throws Exception {
+
+		// Prepare the execution graph.
+		final JobID jobID = new JobID();
+
+		final ExecutionGraph executionGraph = new ExecutionGraph(
+			new DummyJobInformation(jobID, "job name"),
+			mock(ScheduledExecutorService.class),
+			mock(Executor.class),
+			Time.milliseconds(100),
+			new NoRestartStrategy(),
+			new RestartAllStrategy.Factory(),
+			mock(SlotProvider.class),
+			ExecutionGraph.class.getClassLoader(),
+			VoidBlobWriter.getInstance()
+		);
+
+		final JobVertex jobVertex = new JobVertex("MockVertex");
+		jobVertex.setParallelism(128);
+		jobVertex.setInvokableClass(AbstractInvokable.class);
+
+		executionGraph.attachJobGraph(Collections.singletonList(jobVertex));
+
+		// The testing subtask.
+		final int subtaskIndex = 1;
+		final ExecutionState expectedState = ExecutionState.SCHEDULED;
+
+		// Change some fields so we can make it different from other sub tasks.
+		Execution execution = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[subtaskIndex].getCurrentExecutionAttempt();
+		Whitebox.setInternalState(execution, "state", expectedState);
+
+		// Mock the metric fetcher.
+		final MetricFetcher metricFetcher = mock(MetricFetcher.class);
+		final MetricStore metricStore = mock(MetricStore.class);
+		final MetricStore.ComponentMetricStore componentMetricStore = mock(MetricStore.ComponentMetricStore.class);
+
+		final long bytesInLocal = 1;
+		final long bytesInRemote = 2;
+		final long bytesOut = 10;
+		final long recordsIn = 20;
+		final long recordsOut = 30;
+
+		when(componentMetricStore.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL)).thenReturn(Long.toString(bytesInLocal));
+		when(componentMetricStore.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE)).thenReturn(Long.toString(bytesInRemote));
+		when(componentMetricStore.getMetric(MetricNames.IO_NUM_BYTES_OUT)).thenReturn(Long.toString(bytesOut));
+		when(componentMetricStore.getMetric(MetricNames.IO_NUM_RECORDS_IN)).thenReturn(Long.toString(recordsIn));
+		when(componentMetricStore.getMetric(MetricNames.IO_NUM_RECORDS_OUT)).thenReturn(Long.toString(recordsOut));
+
+		when(metricStore.getSubtaskMetricStore(jobID.toString(), jobVertex.getID().toString(), subtaskIndex))
+			.thenReturn(componentMetricStore);
+		when(metricFetcher.getMetricStore()).thenReturn(metricStore);
+
+		// Instance the handler.
+		final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(new Configuration());
+
+		final SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(
+			CompletableFuture.completedFuture("127.0.0.1:9527"),
+			mock(GatewayRetriever.class),
+			Time.milliseconds(100),
+			restHandlerConfiguration.getResponseHeaders(),
+			null,
+			new ExecutionGraphCache(
+				restHandlerConfiguration.getTimeout(),
+				Time.milliseconds(restHandlerConfiguration.getRefreshInterval())),
+			mock(Executor.class),
+			metricFetcher
+		);
+
+		final int attempt = 0;
+
+		final HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request = new HandlerRequest<>(
+			EmptyRequestBody.getInstance(),
+			new SubtaskAttemptMessageParameters(),
+			new HashMap<String, String>() {{
+				put(JobIDPathParameter.KEY, jobID.toString());
+				put(JobVertexIdPathParameter.KEY, jobVertex.getID().toString());
+				put(SubtaskIndexPathParameter.KEY, Integer.toString(subtaskIndex));
+				put(SubtaskAttemptPathParameter.KEY, Integer.toString(attempt));
+			}},
+			Collections.emptyMap()
+		);
+
+		// Handle request.
+		final SubtaskExecutionAttemptDetailsInfo detailsInfo = handler.handleRequest(
+			request,
+			executionGraph.getJobVertex(jobVertex.getID()));
+
+		// Verify
+		final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo(
+			bytesInLocal + bytesInRemote,
+			true,
+			bytesOut,
+			true,
+			recordsIn,
+			true,
+			recordsOut,
+			true
+		);
+
+		final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo = new SubtaskExecutionAttemptDetailsInfo(
+			subtaskIndex,
+			expectedState,
+			attempt,
+			"(unassigned)",
+			-1,
+			-1,
+			-1,
+			ioMetricsInfo
+		);
+
+		assertEquals(expectedDetailsInfo, detailsInfo);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
index aec8674..8083611 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfoTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -79,7 +80,7 @@ public class JobDetailsInfoTest extends RestResponseMarshallingTestBase<JobDetai
 
 	private JobDetailsInfo.JobVertexDetailsInfo createJobVertexDetailsInfo(Random random) {
 		final Map<ExecutionState, Integer> tasksPerState = new HashMap<>(ExecutionState.values().length);
-		final JobDetailsInfo.JobVertexMetrics jobVertexMetrics = new JobDetailsInfo.JobVertexMetrics(
+		final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo(
 			random.nextLong(),
 			random.nextBoolean(),
 			random.nextLong(),

http://git-wip-us.apache.org/repos/asf/flink/blob/d8b2c0fe/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
new file mode 100644
index 0000000..ee1f484
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+
+import java.util.Random;
+
+/**
+ * Tests (un)marshalling of the {@link SubtaskExecutionAttemptDetailsInfo}.
+ */
+public class SubtaskExecutionAttemptDetailsInfoTest extends RestResponseMarshallingTestBase<SubtaskExecutionAttemptDetailsInfo> {
+
+	@Override
+	protected Class<SubtaskExecutionAttemptDetailsInfo> getTestResponseClass() {
+		return SubtaskExecutionAttemptDetailsInfo.class;
+	}
+
+	@Override
+	protected SubtaskExecutionAttemptDetailsInfo getTestResponseInstance() throws Exception {
+		final Random random = new Random();
+
+		final IOMetricsInfo ioMetricsInfo = new IOMetricsInfo(
+			Math.abs(random.nextLong()),
+			random.nextBoolean(),
+			Math.abs(random.nextLong()),
+			random.nextBoolean(),
+			Math.abs(random.nextLong()),
+			random.nextBoolean(),
+			Math.abs(random.nextLong()),
+			random.nextBoolean()
+		);
+
+		return new SubtaskExecutionAttemptDetailsInfo(
+			Math.abs(random.nextInt()),
+			ExecutionState.values()[random.nextInt(ExecutionState.values().length)],
+			Math.abs(random.nextInt()),
+			"localhost:" + random.nextInt(65536),
+			Math.abs(random.nextLong()),
+			Math.abs(random.nextLong()),
+			Math.abs(random.nextLong()),
+			ioMetricsInfo
+		);
+	}
+}


Mime
View raw message