flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [2/4] flink git commit: [FLINK-5852] Move handler JSON generation code into static methods
Date Thu, 02 Mar 2017 10:40:36 GMT
[FLINK-5852] Move handler JSON generation code into static methods

This closes #3365.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a552d674
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a552d674
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a552d674

Branch: refs/heads/master
Commit: a552d6746636e26c634c86d6a11732ea9d2f239e
Parents: 999bace
Author: zentol <chesnay@apache.org>
Authored: Tue Feb 28 18:01:14 2017 +0100
Committer: zentol <chesnay@apache.org>
Committed: Thu Mar 2 11:39:04 2017 +0100

----------------------------------------------------------------------
 .../api/common/ArchivedExecutionConfig.java     |  13 ++
 .../handlers/CurrentJobsOverviewHandler.java    |   9 +-
 .../handlers/DashboardConfigHandler.java        |  50 +++---
 .../handlers/JobAccumulatorsHandler.java        |   9 +-
 .../webmonitor/handlers/JobConfigHandler.java   |   4 +
 .../webmonitor/handlers/JobDetailsHandler.java  |  44 ++----
 .../handlers/JobExceptionsHandler.java          |  11 +-
 .../handlers/JobVertexAccumulatorsHandler.java  |   9 +-
 .../handlers/JobVertexDetailsHandler.java       |  50 +++---
 .../handlers/JobVertexTaskManagersHandler.java  |  55 +++----
 ...taskExecutionAttemptAccumulatorsHandler.java |   7 +-
 .../SubtaskExecutionAttemptDetailsHandler.java  |  55 +++----
 .../SubtasksAllAccumulatorsHandler.java         |   5 +
 .../handlers/SubtasksTimesHandler.java          |   5 +
 .../checkpoints/CheckpointConfigHandler.java    |   6 +-
 .../CheckpointStatsDetailsHandler.java          |   4 +-
 .../CheckpointStatsDetailsSubtasksHandler.java  |  23 ++-
 .../checkpoints/CheckpointStatsHandler.java     |  16 +-
 .../webmonitor/utils/MutableIOMetrics.java      | 106 +++++++++++++
 .../CurrentJobsOverviewHandlerTest.java         |  42 +++++
 .../handlers/DashboardConfigHandlerTest.java    |  22 +++
 .../handlers/JobAccumulatorsHandlerTest.java    |  20 +++
 .../handlers/JobConfigHandlerTest.java          |  31 ++++
 .../handlers/JobDetailsHandlerTest.java         |  95 +++++++++++
 .../handlers/JobExceptionsHandlerTest.java      |  35 ++++
 .../JobVertexAccumulatorsHandlerTest.java       |  20 +++
 .../handlers/JobVertexDetailsHandlerTest.java   |  44 ++++++
 .../JobVertexTaskManagersHandlerTest.java       |  66 ++++++++
 ...ExecutionAttemptAccumulatorsHandlerTest.java |  18 +++
 ...btaskExecutionAttemptDetailsHandlerTest.java |  29 ++++
 .../SubtasksAllAccumulatorsHandlerTest.java     |  32 ++++
 .../handlers/SubtasksTimesHandlerTest.java      |  37 +++++
 .../utils/ArchivedExecutionBuilder.java         | 146 +++++++++++++++++
 .../utils/ArchivedExecutionConfigBuilder.java   |  67 ++++++++
 .../utils/ArchivedExecutionGraphBuilder.java    | 135 ++++++++++++++++
 .../ArchivedExecutionJobVertexBuilder.java      |  80 ++++++++++
 .../utils/ArchivedExecutionVertexBuilder.java   |  69 ++++++++
 .../utils/ArchivedJobGenerationUtils.java       | 158 +++++++++++++++++++
 .../executiongraph/ArchivedExecution.java       |  15 ++
 .../ArchivedExecutionJobVertex.java             |  15 ++
 .../executiongraph/ArchivedExecutionVertex.java |   9 ++
 .../flink/runtime/executiongraph/IOMetrics.java |  36 +++--
 .../ArchivedExecutionGraphTest.java             |   6 +-
 43 files changed, 1512 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
index f267e91..700d65f 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
@@ -54,6 +54,19 @@ public class ArchivedExecutionConfig implements Serializable {
 		}
 	}
 
+	public ArchivedExecutionConfig(
+			String executionMode,
+			String restartStrategyDescription,
+			int parallelism,
+			boolean objectReuseEnabled,
+			Map<String, String> globalJobParameters) {
+		this.executionMode = executionMode;
+		this.restartStrategyDescription = restartStrategyDescription;
+		this.parallelism = parallelism;
+		this.objectReuseEnabled = objectReuseEnabled;
+		this.globalJobParameters = globalJobParameters;
+	}
+
 	public String getExecutionMode() {
 		return executionMode;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 8486a9c..00cf138 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -28,6 +28,7 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -89,20 +90,20 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 				if (includeRunningJobs && includeFinishedJobs) {
 					gen.writeArrayFieldStart("running");
 					for (JobDetails detail : result.getRunningJobs()) {
-						generateSingleJobDetails(detail, gen, now);
+						writeJobDetailOverviewAsJson(detail, gen, now);
 					}
 					gen.writeEndArray();
 	
 					gen.writeArrayFieldStart("finished");
 					for (JobDetails detail : result.getFinishedJobs()) {
-						generateSingleJobDetails(detail, gen, now);
+						writeJobDetailOverviewAsJson(detail, gen, now);
 					}
 					gen.writeEndArray();
 				}
 				else {
 					gen.writeArrayFieldStart("jobs");
 					for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
-						generateSingleJobDetails(detail, gen, now);
+						writeJobDetailOverviewAsJson(detail, gen, now);
 					}
 					gen.writeEndArray();
 				}
@@ -120,7 +121,7 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 		}
 	}
 
-	private static void generateSingleJobDetails(JobDetails details, JsonGenerator gen, long now) throws Exception {
+	public static void writeJobDetailOverviewAsJson(JobDetails details, JsonGenerator gen, long now) throws IOException {
 		gen.writeStartObject();
 
 		gen.writeStringField("jid", details.getJobId().toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index 49f4c26..6ad024f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.TimeZone;
@@ -38,29 +39,8 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
 	private final String configString;
 	
 	public DashboardConfigHandler(long refreshInterval) {
-		TimeZone timeZome = TimeZone.getDefault();
-		String timeZoneName = timeZome.getDisplayName();
-		long timeZoneOffset= timeZome.getRawOffset();
-
 		try {
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
-	
-			gen.writeStartObject();
-			gen.writeNumberField("refresh-interval", refreshInterval);
-			gen.writeNumberField("timezone-offset", timeZoneOffset);
-			gen.writeStringField("timezone-name", timeZoneName);
-			gen.writeStringField("flink-version", EnvironmentInformation.getVersion());
-
-			EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
-			if (revision != null) {
-				gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate);
-			}
-
-			gen.writeEndObject();
-	
-			gen.close();
-			this.configString = writer.toString();
+			this.configString = createConfigJson(refreshInterval);
 		}
 		catch (Exception e) {
 			// should never happen
@@ -77,4 +57,30 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
 	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
 		return this.configString;
 	}
+
+	public static String createConfigJson(long refreshInterval) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+	
+		TimeZone timeZone = TimeZone.getDefault();
+		String timeZoneName = timeZone.getDisplayName();
+		long timeZoneOffset = timeZone.getRawOffset();
+
+		gen.writeStartObject();
+		gen.writeNumberField("refresh-interval", refreshInterval);
+		gen.writeNumberField("timezone-offset", timeZoneOffset);
+		gen.writeStringField("timezone-name", timeZoneName);
+		gen.writeStringField("flink-version", EnvironmentInformation.getVersion());
+
+		EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
+		if (revision != null) {
+			gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate);
+		}
+
+		gen.writeEndObject();
+
+		gen.close();
+
+		return writer.toString();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index 7664153..dfc654e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -44,11 +45,15 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
 
 	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified();
-		
+		return createJobAccumulatorsJson(graph);
+	}
+
+	public static String createJobAccumulatorsJson(AccessExecutionGraph graph) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 
+		StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified();
+
 		gen.writeStartObject();
 
 		gen.writeArrayFieldStart("job-accumulators");

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 459ca2a..7d72235 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -44,7 +45,10 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 
 	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+		return createJobConfigJson(graph);
+	}
 
+	public static String createJobConfigJson(AccessExecutionGraph graph) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 7780e66..6d1f82f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -25,13 +25,13 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
+import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -64,6 +64,10 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 
 	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+		return createJobDetailsJson(graph, fetcher);
+	}
+
+	public static String createJobDetailsJson(AccessExecutionGraph graph, @Nullable MetricFetcher fetcher) throws IOException {
 		final StringWriter writer = new StringWriter();
 		final JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 
@@ -153,37 +157,17 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 			}
 			gen.writeEndObject();
 			
-			long numBytesIn = 0;
-			long numBytesOut = 0;
-			long numRecordsIn = 0;
-			long numRecordsOut = 0;
+			MutableIOMetrics counts = new MutableIOMetrics();
 
 			for (AccessExecutionVertex vertex : ejv.getTaskVertices()) {
-				IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
-
-				if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
-					numBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
-					numBytesOut += ioMetrics.getNumBytesOut();
-					numRecordsIn += ioMetrics.getNumRecordsIn();
-					numRecordsOut += ioMetrics.getNumRecordsOut();
-				} else { // execAttempt is still running, use MetricQueryService instead
-					fetcher.update();
-					MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(graph.getJobID().toString(), ejv.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
-					if (metrics != null) {
-						numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
-						numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
-						numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
-						numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
-					}
-				}
+				counts.addIOMetrics(
+					vertex.getCurrentExecutionAttempt(),
+					fetcher,
+					graph.getJobID().toString(),
+					ejv.getJobVertexId().toString());
 			}
 
-			gen.writeObjectFieldStart("metrics");
-			gen.writeNumberField("read-bytes", numBytesIn);
-			gen.writeNumberField("write-bytes", numBytesOut);
-			gen.writeNumberField("read-records", numRecordsIn);
-			gen.writeNumberField("write-records", numRecordsOut);
-			gen.writeEndObject();
+			counts.writeIOMetricsAsJson(gen);
 			
 			gen.writeEndObject();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index 3720dac..0cce61f 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.util.ExceptionUtils;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -35,7 +36,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 
 	private static final String JOB_EXCEPTIONS_REST_PATH = "/jobs/:jobid/exceptions";
 
-	private static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
+	static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
 	
 	public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
 		super(executionGraphHolder);
@@ -48,6 +49,10 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 
 	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+		return createJobExceptionsJson(graph);
+	}
+
+	public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 
@@ -55,7 +60,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 		
 		// most important is the root failure cause
 		String rootException = graph.getFailureCauseAsString();
-		if (rootException != null) {
+		if (rootException != null && !rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
 			gen.writeStringField("root-exception", rootException);
 		}
 
@@ -67,7 +72,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 		
 		for (AccessExecutionVertex task : graph.getAllExecutionVertices()) {
 			String t = task.getFailureCauseAsString();
-			if (!t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+			if (t != null && !t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
 				if (numExceptionsSoFar >= MAX_NUMBER_EXCEPTION_TO_REPORT) {
 					truncated = true;
 					break;

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index ccfcbba..ca0488b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -43,11 +44,15 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
 
 	@Override
 	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
-		StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
-		
+		return createVertexAccumulatorsJson(jobVertex);
+	}
+
+	public static String createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 
+		StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified();
+
 		gen.writeStartObject();
 		gen.writeStringField("id", jobVertex.getJobVertexId().toString());
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 0a07896..6e7e47c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -23,13 +23,13 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
+import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -55,6 +55,13 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 
 	@Override
 	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+		return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
+	}
+
+	public static String createVertexDetailsJson(
+			AccessExecutionJobVertex jobVertex,
+			String jobID,
+			@Nullable MetricFetcher fetcher) throws IOException {
 		final long now = System.currentTimeMillis();
 		
 		StringWriter writer = new StringWriter();
@@ -91,35 +98,16 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 			gen.writeNumberField("end-time", endTime);
 			gen.writeNumberField("duration", duration);
 
-			IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
-
-			long numBytesIn = 0;
-			long numBytesOut = 0;
-			long numRecordsIn = 0;
-			long numRecordsOut = 0;
-
-			if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
-				numBytesIn = ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
-				numBytesOut = ioMetrics.getNumBytesOut();
-				numRecordsIn = ioMetrics.getNumRecordsIn();
-				numRecordsOut = ioMetrics.getNumRecordsOut();
-			} else { // execAttempt is still running, use MetricQueryService instead
-				fetcher.update();
-				MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), jobVertex.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
-				if (metrics != null) {
-					numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
-					numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
-					numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
-					numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
-				}
-			}
+			MutableIOMetrics counts = new MutableIOMetrics();
 
-			gen.writeObjectFieldStart("metrics");
-			gen.writeNumberField("read-bytes", numBytesIn);
-			gen.writeNumberField("write-bytes", numBytesOut);
-			gen.writeNumberField("read-records", numRecordsIn);
-			gen.writeNumberField("write-records", numRecordsOut);
-			gen.writeEndObject();
+			counts.addIOMetrics(
+				vertex.getCurrentExecutionAttempt(),
+				fetcher,
+				jobID,
+				jobVertex.getJobVertexId().toString()
+			);
+
+			counts.writeIOMetricsAsJson(gen);
 			
 			gen.writeEndObject();
 			

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index b3dabea..4fa54bd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -23,19 +23,18 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
+import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 /**
  * A request handler that provides the details of a job vertex, including id, name, and the
@@ -59,6 +58,16 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 
 	@Override
 	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+		return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
+	}
+
+	public static String createVertexDetailsByTaskManagerJson(
+			AccessExecutionJobVertex jobVertex,
+			String jobID,
+			@Nullable MetricFetcher fetcher) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+
 		// Build a map that groups tasks by TaskManager
 		Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>();
 
@@ -79,8 +88,6 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 		// Build JSON response
 		final long now = System.currentTimeMillis();
 
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 
 		gen.writeStartObject();
 
@@ -89,7 +96,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 		gen.writeNumberField("now", now);
 
 		gen.writeArrayFieldStart("taskmanagers");
-		for (Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) {
+		for (Map.Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) {
 			String host = entry.getKey();
 			List<AccessExecutionVertex> taskVertices = entry.getValue();
 
@@ -99,10 +106,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 			long endTime = 0;
 			boolean allFinished = true;
 
-			long numBytesIn = 0;
-			long numBytesOut = 0;
-			long numRecordsIn = 0;
-			long numRecordsOut = 0;
+			MutableIOMetrics counts = new MutableIOMetrics();
 
 			for (AccessExecutionVertex vertex : taskVertices) {
 				final ExecutionState state = vertex.getExecutionState();
@@ -117,23 +121,11 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 				allFinished &= state.isTerminal();
 				endTime = Math.max(endTime, vertex.getStateTimestamp(state));
 
-				IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
-
-				if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
-					numBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
-					numBytesOut += ioMetrics.getNumBytesOut();
-					numRecordsIn += ioMetrics.getNumRecordsIn();
-					numRecordsOut += ioMetrics.getNumRecordsOut();
-				} else { // execAttempt is still running, use MetricQueryService instead
-					fetcher.update();
-					MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), vertex.getParallelSubtaskIndex());
-					if (metrics != null) {
-						numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
-						numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
-						numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
-						numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
-					}
-				}
+				counts.addIOMetrics(
+					vertex.getCurrentExecutionAttempt(),
+					fetcher,
+					jobID,
+					jobVertex.getJobVertexId().toString());
 			}
 
 			long duration;
@@ -164,12 +156,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 			gen.writeNumberField("end-time", endTime);
 			gen.writeNumberField("duration", duration);
 
-			gen.writeObjectFieldStart("metrics");
-			gen.writeNumberField("read-bytes", numBytesIn);
-			gen.writeNumberField("write-bytes", numBytesOut);
-			gen.writeNumberField("read-records", numRecordsIn);
-			gen.writeNumberField("write-records", numRecordsOut);
-			gen.writeEndObject();
+			counts.writeIOMetricsAsJson(gen);
 
 			gen.writeObjectFieldStart("status-counts");
 			for (ExecutionState state : ExecutionState.values()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index ba3a5ee..a63016c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -45,10 +46,14 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 
 	@Override
 	public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
-		final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
+		return createAttemptAccumulatorsJson(execAttempt);
+	}
 		
+	public static String createAttemptAccumulatorsJson(AccessExecution execAttempt) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+		
+		final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified();
 
 		gen.writeStartObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index b753b6e..5af6af9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -22,13 +22,13 @@ import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
+import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -53,6 +53,17 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 
 	@Override
 	public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception {
+		return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher);
+	}
+
+	public static String createAttemptDetailsJson(
+			AccessExecution execAttempt,
+			String jobID,
+			String vertexID,
+			@Nullable MetricFetcher fetcher) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+
 		final ExecutionState status = execAttempt.getState();
 		final long now = System.currentTimeMillis();
 
@@ -66,9 +77,6 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 		long endTime = status.isTerminal() ? execAttempt.getStateTimestamp(status) : -1;
 		long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1;
 
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
-
 		gen.writeStartObject();
 		gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex());
 		gen.writeStringField("status", status.name());
@@ -78,35 +86,16 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 		gen.writeNumberField("end-time", endTime);
 		gen.writeNumberField("duration", duration);
 
-		IOMetrics ioMetrics = execAttempt.getIOMetrics();
+		MutableIOMetrics counts = new MutableIOMetrics();
 
-		long numBytesIn = 0;
-		long numBytesOut = 0;
-		long numRecordsIn = 0;
-		long numRecordsOut = 0;
+		counts.addIOMetrics(
+			execAttempt,
+			fetcher,
+			jobID,
+			vertexID
+		);
 		
-		if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
-			numBytesIn = ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
-			numBytesOut = ioMetrics.getNumBytesOut();
-			numRecordsIn = ioMetrics.getNumRecordsIn();
-			numRecordsOut = ioMetrics.getNumRecordsOut();
-		} else { // execAttempt is still running, use MetricQueryService instead
-			fetcher.update();
-			MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), execAttempt.getParallelSubtaskIndex());
-			if (metrics != null) {
-				numBytesIn = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
-				numBytesOut = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
-				numRecordsIn = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
-				numRecordsOut = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
-			}
-		}
-		
-		gen.writeObjectFieldStart("metrics");
-		gen.writeNumberField("read-bytes", numBytesIn);
-		gen.writeNumberField("write-bytes", numBytesOut);
-		gen.writeNumberField("read-records", numRecordsIn);
-		gen.writeNumberField("write-records", numRecordsOut);
-		gen.writeEndObject();
+		counts.writeIOMetricsAsJson(gen);
 
 		gen.writeEndObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 222d474..10a8773 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -47,6 +48,10 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
 
 	@Override
 	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+		return createSubtasksAccumulatorsJson(jobVertex);
+	}
+
+	public static String createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index e2e35e3..08bd722 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -48,6 +49,10 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 
 	@Override
 	public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+		return createSubtaskTimesJson(jobVertex);
+	}
+
+	public static String createSubtaskTimesJson(AccessExecutionJobVertex jobVertex) throws IOException {
 		final long now = System.currentTimeMillis();
 
 		StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index de40a4a..9976298 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -47,8 +48,11 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
 
 	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		StringWriter writer = new StringWriter();
+		return createCheckpointConfigJson(graph);
+	}
 
+	private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException {
+		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 		JobSnapshottingSettings settings = graph.getJobSnapshottingSettings();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index e651824..4bbb8f6 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -76,10 +76,10 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
 			}
 		}
 
-		return writeResponse(checkpoint);
+		return createCheckpointDetailsJson(checkpoint);
 	}
 
-	private String writeResponse(AbstractCheckpointStats checkpoint) throws IOException {
+	public static String createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 		gen.writeStartObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index 15dd911..b28ecef 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -36,6 +36,7 @@ import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
+import static org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -95,19 +96,19 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
 			}
 		}
 
-		return writeResponse(checkpoint, vertexId);
-	}
-
-	private String writeResponse(AbstractCheckpointStats checkpoint, JobVertexID vertexId) throws IOException {
-		StringWriter writer = new StringWriter();
-		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
-		gen.writeStartObject();
-
 		TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId);
 		if (taskStats == null) {
 			return "{}";
 		}
+		
+		return createSubtaskCheckpointDetailsJson(checkpoint, taskStats);
+	}
 
+	private static String createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, TaskStateStats taskStats) throws IOException {
+		StringWriter writer = new StringWriter();
+		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
+
+		gen.writeStartObject();
 		// Overview
 		gen.writeNumberField("id", checkpoint.getCheckpointId());
 		gen.writeStringField("status", checkpoint.getStatus().toString());
@@ -188,10 +189,4 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
 		return writer.toString();
 	}
 
-	private void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
-		gen.writeNumberField("min", minMaxAvg.getMinimum());
-		gen.writeNumberField("max", minMaxAvg.getMaximum());
-		gen.writeNumberField("avg", minMaxAvg.getAverage());
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 6413806..585ab26 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -56,6 +56,10 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 
 	@Override
 	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+		return createCheckpointStatsJson(graph);
+	}
+
+	private static String createCheckpointStatsJson(AccessExecutionGraph graph) throws IOException {
 		StringWriter writer = new StringWriter();
 		JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
 
@@ -91,7 +95,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 		return writer.toString();
 	}
 
-	private void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException {
+	private static void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException {
 		gen.writeObjectFieldStart("counts");
 		gen.writeNumberField("restored", counts.getNumberOfRestoredCheckpoints());
 		gen.writeNumberField("total", counts.getTotalNumberOfCheckpoints());
@@ -101,7 +105,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 		gen.writeEndObject();
 	}
 
-	private void writeSummary(
+	private static void writeSummary(
 		JsonGenerator gen,
 		CompletedCheckpointStatsSummary summary) throws IOException {
 		gen.writeObjectFieldStart("summary");
@@ -119,13 +123,13 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 		gen.writeEndObject();
 	}
 
-	private void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
+	static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException {
 		gen.writeNumberField("min", minMaxAvg.getMinimum());
 		gen.writeNumberField("max", minMaxAvg.getMaximum());
 		gen.writeNumberField("avg", minMaxAvg.getAverage());
 	}
 
-	private void writeLatestCheckpoints(
+	private static void writeLatestCheckpoints(
 		JsonGenerator gen,
 		@Nullable CompletedCheckpointStats completed,
 		@Nullable CompletedCheckpointStats savepoint,
@@ -187,7 +191,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 		gen.writeEndObject();
 	}
 
-	private void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException {
+	private static void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException {
 		gen.writeNumberField("id", checkpoint.getCheckpointId());
 		gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp());
 		gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp());
@@ -197,7 +201,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 
 	}
 
-	private void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException {
+	private static void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException {
 		gen.writeArrayFieldStart("history");
 		for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) {
 			gen.writeStartObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
new file mode 100644
index 0000000..32cda7f
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+/**
+ * This class is a mutable version of the {@link IOMetrics} class that allows adding up IO-related metrics.
+ * 
+ * For finished jobs these metrics are stored in the {@link ExecutionGraph} as another {@link IOMetrics}.
+ * For running jobs these metrics are retrieved using the {@link MetricFetcher}.
+ * 
+ * This class provides a common interface to handle both cases, reducing complexity in various handlers (like
+ * the {@link JobVertexDetailsHandler}).
+ */
+public class MutableIOMetrics extends IOMetrics {
+
+	private static final long serialVersionUID = -5460777634971381737L;
+
+	public MutableIOMetrics() {
+		super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
+	}
+
+	/**
+	 * Adds the IO metrics for the given attempt to this object. If the {@link AccessExecution} is in
+	 * a terminal state the contained {@link IOMetrics} object is added. Otherwise the given {@link MetricFetcher} is
+	 * used to retrieve the required metrics.
+	 * 
+	 * @param attempt Attempt whose IO metrics should be added
+	 * @param fetcher MetricFetcher to retrieve metrics for running jobs
+	 * @param jobID JobID to which the attempt belongs
+	 * @param taskID TaskID to which the attempt belongs
+	 */
+	public void addIOMetrics(AccessExecution attempt, @Nullable MetricFetcher fetcher, String jobID, String taskID) {
+		if (attempt.getState().isTerminal()) {
+			IOMetrics ioMetrics = attempt.getIOMetrics();
+			if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph
+				this.numBytesInLocal += ioMetrics.getNumBytesInLocal();
+				this.numBytesInRemote += ioMetrics.getNumBytesInRemote();
+				this.numBytesOut += ioMetrics.getNumBytesOut();
+				this.numRecordsIn += ioMetrics.getNumRecordsIn();
+				this.numRecordsOut += ioMetrics.getNumRecordsOut();
+			}
+		} else { // execAttempt is still running, use MetricQueryService instead
+			if (fetcher != null) {
+				fetcher.update();
+				MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex());
+				if (metrics != null) {
+					this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
+					this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+					this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+					this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+					this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+				}
+			}
+		}
+	}
+
+	/**
+	 * Writes the IO metrics contained in this object to the given {@link JsonGenerator}.
+	 * 
+	 * The JSON structure written is as follows:
+	 * "metrics": {
+	 *     "read-bytes": 1,
+	 *     "write-bytes": 2,
+	 *     "read-records": 3,
+	 *     "write-records": 4
+	 * }
+	 * 
+	 * @param gen JsonGenerator to which the metrics should be written
+	 * @throws IOException
+	 */
+	public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException {
+		gen.writeObjectFieldStart("metrics");
+		gen.writeNumberField("read-bytes",this.numBytesInLocal + this.numBytesInRemote);
+		gen.writeNumberField("write-bytes", this.numBytesOut);
+		gen.writeNumberField("read-records", this.numRecordsIn);
+		gen.writeNumberField("write-records", this.numRecordsOut);
+		gen.writeEndObject();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
index 3207fec..caf6d8e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
@@ -17,10 +17,18 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.StringWriter;
 import java.util.concurrent.TimeUnit;
 
 public class CurrentJobsOverviewHandlerTest {
@@ -41,4 +49,38 @@ public class CurrentJobsOverviewHandlerTest {
 		Assert.assertEquals(1, pathsCompleted.length);
 		Assert.assertEquals("/joboverview/completed", pathsCompleted[0]);
 	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob);
+		StringWriter writer = new StringWriter();
+		try (JsonGenerator gen = ArchivedJobGenerationUtils.jacksonFactory.createGenerator(writer)) {
+			CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 0);
+		}
+		String answer = writer.toString();
+
+		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(answer);
+
+		Assert.assertEquals(expectedDetails.getJobId().toString(), result.get("jid").asText());
+		Assert.assertEquals(expectedDetails.getJobName(), result.get("name").asText());
+		Assert.assertEquals(expectedDetails.getStatus().name(), result.get("state").asText());
+
+		Assert.assertEquals(expectedDetails.getStartTime(), result.get("start-time").asLong());
+		Assert.assertEquals(expectedDetails.getEndTime(), result.get("end-time").asLong());
+		Assert.assertEquals(expectedDetails.getEndTime() - expectedDetails.getStartTime(), result.get("duration").asLong());
+		Assert.assertEquals(expectedDetails.getLastUpdateTime(), result.get("last-modification").asLong());
+
+		JsonNode tasks = result.get("tasks");
+		Assert.assertEquals(expectedDetails.getNumTasks(), tasks.get("total").asInt());
+		int[] tasksPerState = expectedDetails.getNumVerticesPerExecutionState();
+		Assert.assertEquals(
+			tasksPerState[ExecutionState.CREATED.ordinal()] + tasksPerState[ExecutionState.SCHEDULED.ordinal()] + tasksPerState[ExecutionState.DEPLOYING.ordinal()],
+			tasks.get("pending").asInt());
+		Assert.assertEquals(tasksPerState[ExecutionState.RUNNING.ordinal()], tasks.get("running").asInt());
+		Assert.assertEquals(tasksPerState[ExecutionState.FINISHED.ordinal()], tasks.get("finished").asInt());
+		Assert.assertEquals(tasksPerState[ExecutionState.CANCELING.ordinal()], tasks.get("canceling").asInt());
+		Assert.assertEquals(tasksPerState[ExecutionState.CANCELED.ordinal()], tasks.get("canceled").asInt());
+		Assert.assertEquals(tasksPerState[ExecutionState.FAILED.ordinal()], tasks.get("failed").asInt());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
index aa2d552..9784a06 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
@@ -17,9 +17,14 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.TimeZone;
+
 public class DashboardConfigHandlerTest {
 	@Test
 	public void testGetPaths() {
@@ -28,4 +33,21 @@ public class DashboardConfigHandlerTest {
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/config", paths[0]);
 	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		long refreshInterval = 12345;
+		TimeZone timeZone = TimeZone.getDefault();
+		EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation();
+
+		String json = DashboardConfigHandler.createConfigJson(refreshInterval);
+
+		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
+
+		Assert.assertEquals(refreshInterval, result.get("refresh-interval").asLong());
+		Assert.assertEquals(timeZone.getDisplayName(), result.get("timezone-name").asText());
+		Assert.assertEquals(timeZone.getRawOffset(), result.get("timezone-offset").asLong());
+		Assert.assertEquals(EnvironmentInformation.getVersion(), result.get("flink-version").asText());
+		Assert.assertEquals(revision.commitId + " @ " + revision.commitDate, result.get("flink-revision").asText());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
index 96c7dd5..34748b7 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +32,20 @@ public class JobAccumulatorsHandlerTest {
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]);
 	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		String json = JobAccumulatorsHandler.createJobAccumulatorsJson(originalJob);
+
+		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
+
+		ArrayNode accs = (ArrayNode) result.get("job-accumulators");
+		Assert.assertEquals(0, accs.size());
+
+		Assert.assertTrue(originalJob.getAccumulatorResultsStringified().length > 0);
+		ArchivedJobGenerationUtils.compareStringifiedAccumulators(
+			originalJob.getAccumulatorResultsStringified(),
+			(ArrayNode) result.get("user-task-accumulators"));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
index 47ea6bf..f304efe 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
@@ -17,9 +17,15 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Map;
+
 public class JobConfigHandlerTest {
 	@Test
 	public void testGetPaths() {
@@ -28,4 +34,29 @@ public class JobConfigHandlerTest {
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/config", paths[0]);
 	}
+
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		String answer = JobConfigHandler.createJobConfigJson(originalJob);
+
+		JsonNode job = ArchivedJobGenerationUtils.mapper.readTree(answer);
+
+		Assert.assertEquals(originalJob.getJobID().toString(), job.get("jid").asText());
+		Assert.assertEquals(originalJob.getJobName(), job.get("name").asText());
+
+		ArchivedExecutionConfig originalConfig = originalJob.getArchivedExecutionConfig();
+		JsonNode config = job.get("execution-config");
+
+		Assert.assertEquals(originalConfig.getExecutionMode(), config.get("execution-mode").asText());
+		Assert.assertEquals(originalConfig.getRestartStrategyDescription(), config.get("restart-strategy").asText());
+		Assert.assertEquals(originalConfig.getParallelism(), config.get("job-parallelism").asInt());
+		Assert.assertEquals(originalConfig.getObjectReuseEnabled(), config.get("object-reuse-mode").asBoolean());
+
+		Map<String, String> originalUserConfig = originalConfig.getGlobalJobParameters();
+		JsonNode userConfig = config.get("user-config");
+
+		for (Map.Entry<String, String> originalEntry : originalUserConfig.entrySet()) {
+			Assert.assertEquals(originalEntry.getValue(), userConfig.get(originalEntry.getKey()).asText());
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
index b56bd64..3f80d12 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
@@ -17,7 +17,16 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.collect.Lists;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -33,4 +42,90 @@ public class JobDetailsHandlerTest {
 		Assert.assertTrue(pathsList.contains("/jobs/:jobid"));
 		Assert.assertTrue(pathsList.contains("/jobs/:jobid/vertices"));
 	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		String json = JobDetailsHandler.createJobDetailsJson(originalJob, null);
+
+		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
+
+		Assert.assertEquals(originalJob.getJobID().toString(), result.get("jid").asText());
+		Assert.assertEquals(originalJob.getJobName(), result.get("name").asText());
+		Assert.assertEquals(originalJob.isStoppable(), result.get("isStoppable").asBoolean());
+		Assert.assertEquals(originalJob.getState().name(), result.get("state").asText());
+
+		Assert.assertEquals(originalJob.getStatusTimestamp(JobStatus.CREATED), result.get("start-time").asLong());
+		Assert.assertEquals(originalJob.getStatusTimestamp(originalJob.getState()), result.get("end-time").asLong());
+		Assert.assertEquals(
+			originalJob.getStatusTimestamp(originalJob.getState()) - originalJob.getStatusTimestamp(JobStatus.CREATED),
+			result.get("duration").asLong()
+		);
+
+		JsonNode timestamps = result.get("timestamps");
+		for (JobStatus status : JobStatus.values()) {
+			Assert.assertEquals(originalJob.getStatusTimestamp(status), timestamps.get(status.name()).asLong());
+		}
+
+		ArrayNode tasks = (ArrayNode) result.get("vertices");
+		int x = 0;
+		for (AccessExecutionJobVertex expectedTask : originalJob.getVerticesTopologically()) {
+			JsonNode task = tasks.get(x);
+
+			Assert.assertEquals(expectedTask.getJobVertexId().toString(), task.get("id").asText());
+			Assert.assertEquals(expectedTask.getName(), task.get("name").asText());
+			Assert.assertEquals(expectedTask.getParallelism(), task.get("parallelism").asInt());
+			Assert.assertEquals(expectedTask.getAggregateState().name(), task.get("status").asText());
+
+			Assert.assertEquals(3, task.get("start-time").asLong());
+			Assert.assertEquals(5, task.get("end-time").asLong());
+			Assert.assertEquals(2, task.get("duration").asLong());
+
+			JsonNode subtasksPerState = task.get("tasks");
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CREATED.name()).asInt());
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.SCHEDULED.name()).asInt());
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.DEPLOYING.name()).asInt());
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.RUNNING.name()).asInt());
+			Assert.assertEquals(1, subtasksPerState.get(ExecutionState.FINISHED.name()).asInt());
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELING.name()).asInt());
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELED.name()).asInt());
+			Assert.assertEquals(0, subtasksPerState.get(ExecutionState.FAILED.name()).asInt());
+
+			long expectedNumBytesIn = 0;
+			long expectedNumBytesOut = 0;
+			long expectedNumRecordsIn = 0;
+			long expectedNumRecordsOut = 0;
+
+			for (AccessExecutionVertex vertex : expectedTask.getTaskVertices()) {
+				IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+				expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+				expectedNumBytesOut += ioMetrics.getNumBytesOut();
+				expectedNumRecordsIn += ioMetrics.getNumRecordsIn();
+				expectedNumRecordsOut += ioMetrics.getNumRecordsOut();
+			}
+
+			JsonNode metrics = task.get("metrics");
+
+			Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong());
+			Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong());
+			Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong());
+			Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong());
+
+			x++;
+		}
+		Assert.assertEquals(1, tasks.size());
+
+		JsonNode statusCounts = result.get("status-counts");
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
+		Assert.assertEquals(1, statusCounts.get(ExecutionState.RUNNING.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.FINISHED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt());
+
+		Assert.assertEquals(ArchivedJobGenerationUtils.mapper.readTree(originalJob.getJsonPlan()), result.get("plan"));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
index 850971a..c86ce6a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
@@ -17,6 +17,13 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +35,32 @@ public class JobExceptionsHandlerTest {
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]);
 	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		String json = JobExceptionsHandler.createJobExceptionsJson(originalJob);
+
+		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
+
+		Assert.assertEquals(originalJob.getFailureCauseAsString(), result.get("root-exception").asText());
+
+		ArrayNode exceptions = (ArrayNode) result.get("all-exceptions");
+
+		int x = 0;
+		for (AccessExecutionVertex expectedSubtask : originalJob.getAllExecutionVertices()) {
+			if (!expectedSubtask.getFailureCauseAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+				JsonNode exception = exceptions.get(x);
+
+				Assert.assertEquals(expectedSubtask.getFailureCauseAsString(), exception.get("exception").asText());
+				Assert.assertEquals(expectedSubtask.getTaskNameWithSubtaskIndex(), exception.get("task").asText());
+
+				TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation();
+				String expectedLocationString = location.getFQDNHostname() + ':' + location.dataPort();
+				Assert.assertEquals(expectedLocationString, exception.get("location").asText());
+			}
+			x++;
+		}
+		Assert.assertEquals(x > JobExceptionsHandler.MAX_NUMBER_EXCEPTION_TO_REPORT, result.get("truncated").asBoolean());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
index d513836..03c1896 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
@@ -17,6 +17,11 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +33,19 @@ public class JobVertexAccumulatorsHandlerTest {
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]);
 	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		String json = JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask);
+
+		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
+
+		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+
+		ArrayNode accs = (ArrayNode) result.get("user-accumulators");
+		StringifiedAccumulatorResult[] expectedAccs = originalTask.getAggregatedUserAccumulatorsStringified();
+
+		ArchivedJobGenerationUtils.compareStringifiedAccumulators(expectedAccs, accs);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
index d20d736..e909c8c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
@@ -17,6 +17,13 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +35,41 @@ public class JobVertexDetailsHandlerTest {
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]);
 	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		String json = JobVertexDetailsHandler.createVertexDetailsJson(
+			originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
+
+		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
+
+		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+		Assert.assertEquals(originalTask.getName(), result.get("name").asText());
+		Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt());
+		Assert.assertTrue(result.get("now").asLong() > 0);
+
+		ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+		Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size());
+		for (int x = 0; x < originalTask.getTaskVertices().length; x++) {
+			AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x];
+			JsonNode subtask = subtasks.get(x);
+
+			Assert.assertEquals(x, subtask.get("subtask").asInt());
+			Assert.assertEquals(expectedSubtask.getExecutionState().name(), subtask.get("status").asText());
+			Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt());
+
+			TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation();
+			String expectedLocationString = location.getHostname() + ":" + location.dataPort();
+			Assert.assertEquals(expectedLocationString, subtask.get("host").asText());
+			long start = expectedSubtask.getStateTimestamp(ExecutionState.DEPLOYING);
+			Assert.assertEquals(start, subtask.get("start-time").asLong());
+			long end = expectedSubtask.getStateTimestamp(ExecutionState.FINISHED);
+			Assert.assertEquals(end, subtask.get("end-time").asLong());
+			Assert.assertEquals(end - start, subtask.get("duration").asLong());
+
+			ArchivedJobGenerationUtils.compareIoMetrics(expectedSubtask.getCurrentExecutionAttempt().getIOMetrics(), subtask.get("metrics"));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
index e56a517..11e35e5 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
@@ -17,6 +17,14 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +36,62 @@ public class JobVertexTaskManagersHandlerTest {
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);
 	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask();
+		String json = JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson(
+			originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
+
+		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
+
+		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+		Assert.assertEquals(originalTask.getName(), result.get("name").asText());
+		Assert.assertTrue(result.get("now").asLong() > 0);
+
+		ArrayNode taskmanagers = (ArrayNode) result.get("taskmanagers");
+
+		JsonNode taskManager = taskmanagers.get(0);
+
+		TaskManagerLocation location = originalSubtask.getCurrentAssignedResourceLocation();
+		String expectedLocationString = location.getHostname() + ':' + location.dataPort();
+		Assert.assertEquals(expectedLocationString, taskManager.get("host").asText());
+		Assert.assertEquals(ExecutionState.FINISHED.name(), taskManager.get("status").asText());
+
+		Assert.assertEquals(3, taskManager.get("start-time").asLong());
+		Assert.assertEquals(5, taskManager.get("end-time").asLong());
+		Assert.assertEquals(2, taskManager.get("duration").asLong());
+
+		JsonNode statusCounts = taskManager.get("status-counts");
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.RUNNING.name()).asInt());
+		Assert.assertEquals(1, statusCounts.get(ExecutionState.FINISHED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt());
+		Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt());
+
+		long expectedNumBytesIn = 0;
+		long expectedNumBytesOut = 0;
+		long expectedNumRecordsIn = 0;
+		long expectedNumRecordsOut = 0;
+
+		for (AccessExecutionVertex vertex : originalTask.getTaskVertices()) {
+			IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+			expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+			expectedNumBytesOut += ioMetrics.getNumBytesOut();
+			expectedNumRecordsIn += ioMetrics.getNumRecordsIn();
+			expectedNumRecordsOut += ioMetrics.getNumRecordsOut();
+		}
+
+		JsonNode metrics = taskManager.get("metrics");
+
+		Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong());
+		Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong());
+		Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong());
+		Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index 0b6038f..8d24bd0 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +32,18 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]);
 	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+		String json = SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt);
+
+		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
+
+		Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt());
+		Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt());
+		Assert.assertEquals(originalAttempt.getAttemptId().toString(), result.get("id").asText());
+
+		ArchivedJobGenerationUtils.compareStringifiedAccumulators(originalAttempt.getUserAccumulatorsStringified(), (ArrayNode) result.get("user-accumulators"));
+	}
 }


Mime
View raw message