flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [1/4] flink git commit: [FLINK-5852] Move handler JSON generation code into static methods
Date Thu, 02 Mar 2017 10:40:35 GMT
Repository: flink
Updated Branches:
  refs/heads/master 51b7ede28 -> a552d6746


http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
index e9c9f84..54f3f9c 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -17,6 +17,12 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +34,27 @@ public class SubtaskExecutionAttemptDetailsHandlerTest {
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt",
paths[0]);
 	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob();
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+		String json = SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson(
+			originalAttempt, originalJob.getJobID().toString(), originalTask.getJobVertexId().toString(),
null);
+
+		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
+
+		Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt());
+		Assert.assertEquals(originalAttempt.getState().name(), result.get("status").asText());
+		Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt());
+		Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(), result.get("host").asText());
+		long start = originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
+		Assert.assertEquals(start, result.get("start-time").asLong());
+		long end = originalAttempt.getStateTimestamp(ExecutionState.FINISHED);
+		Assert.assertEquals(end, result.get("end-time").asLong());
+		Assert.assertEquals(end - start, result.get("duration").asLong());
+
+		ArchivedJobGenerationUtils.compareIoMetrics(originalAttempt.getIOMetrics(), result.get("metrics"));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
index 1efb260..954ebad 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
@@ -17,6 +17,11 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +33,31 @@ public class SubtasksAllAccumulatorsHandlerTest {
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]);
 	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		String json = SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask);
+
+		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
+
+		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+		Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt());
+
+		ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+		Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size());
+		for (int x = 0; x < originalTask.getTaskVertices().length; x++) {
+			JsonNode subtask = subtasks.get(x);
+			AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x];
+
+			Assert.assertEquals(x, subtask.get("subtask").asInt());
+			Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt());
+			Assert.assertEquals(expectedSubtask.getCurrentAssignedResourceLocation().getHostname(),
subtask.get("host").asText());
+
+			ArchivedJobGenerationUtils.compareStringifiedAccumulators(
+				expectedSubtask.getCurrentExecutionAttempt().getUserAccumulatorsStringified(),
+				(ArrayNode) subtask.get("user-accumulators"));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
index 1c8d2bd..939f439 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
@@ -17,6 +17,12 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +34,35 @@ public class SubtasksTimesHandlerTest {
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]);
 	}
+
+	@Test
+	public void testJsonGeneration() throws Exception {
+		AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask();
+		AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt();
+		String json = SubtasksTimesHandler.createSubtaskTimesJson(originalTask);
+
+		JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json);
+
+		Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
+		Assert.assertEquals(originalTask.getName(), result.get("name").asText());
+		Assert.assertTrue(result.get("now").asLong() > 0L);
+
+		ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+		JsonNode subtask = subtasks.get(0);
+		Assert.assertEquals(0, subtask.get("subtask").asInt());
+		Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(), subtask.get("host").asText());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(originalAttempt.getState()) - originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED),
subtask.get("duration").asLong());
+
+		JsonNode timestamps = subtask.get("timestamps");
+
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CREATED), timestamps.get(ExecutionState.CREATED.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED), timestamps.get(ExecutionState.SCHEDULED.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING), timestamps.get(ExecutionState.DEPLOYING.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.RUNNING), timestamps.get(ExecutionState.RUNNING.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FINISHED), timestamps.get(ExecutionState.FINISHED.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELING), timestamps.get(ExecutionState.CANCELING.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELED), timestamps.get(ExecutionState.CANCELED.name()).asLong());
+		Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FAILED), timestamps.get(ExecutionState.FAILED.name()).asLong());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
new file mode 100644
index 0000000..98fc92d
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class ArchivedExecutionBuilder {
+	private ExecutionAttemptID attemptId;
+	private long[] stateTimestamps;
+	private int attemptNumber;
+	private ExecutionState state;
+	private String failureCause;
+	private TaskManagerLocation assignedResourceLocation;
+	private StringifiedAccumulatorResult[] userAccumulators;
+	private IOMetrics ioMetrics;
+	private int parallelSubtaskIndex;
+
+	public ArchivedExecutionBuilder setAttemptId(ExecutionAttemptID attemptId) {
+		this.attemptId = attemptId;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setStateTimestamps(long[] stateTimestamps) {
+		Preconditions.checkArgument(stateTimestamps.length == ExecutionState.values().length);
+		this.stateTimestamps = stateTimestamps;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setAttemptNumber(int attemptNumber) {
+		this.attemptNumber = attemptNumber;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setState(ExecutionState state) {
+		this.state = state;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setFailureCause(String failureCause) {
+		this.failureCause = failureCause;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setAssignedResourceLocation(TaskManagerLocation assignedResourceLocation)
{
+		this.assignedResourceLocation = assignedResourceLocation;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setUserAccumulators(StringifiedAccumulatorResult[] userAccumulators)
{
+		this.userAccumulators = userAccumulators;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setParallelSubtaskIndex(int parallelSubtaskIndex) {
+		this.parallelSubtaskIndex = parallelSubtaskIndex;
+		return this;
+	}
+
+	public ArchivedExecutionBuilder setIOMetrics(IOMetrics ioMetrics) {
+		this.ioMetrics = ioMetrics;
+		return this;
+	}
+
+	public ArchivedExecution build() throws UnknownHostException {
+		return new ArchivedExecution(
+			userAccumulators != null ? userAccumulators : new StringifiedAccumulatorResult[0],
+			ioMetrics != null ? ioMetrics : new TestIOMetrics(),
+			attemptId != null ? attemptId : new ExecutionAttemptID(),
+			attemptNumber,
+			state != null ? state : ExecutionState.FINISHED,
+			failureCause != null ? failureCause : "(null)",
+			assignedResourceLocation != null ? assignedResourceLocation : new TaskManagerLocation(new
ResourceID("tm"), InetAddress.getLocalHost(), 1234),
+			parallelSubtaskIndex,
+			stateTimestamps != null ? stateTimestamps : new long[]{1, 2, 3, 4, 5, 5, 5, 5}
+		);
+	}
+
+	private static class TestIOMetrics extends IOMetrics {
+		private static final long serialVersionUID = -5920076211680012555L;
+
+		public TestIOMetrics() {
+			super(
+				new MeterView(new TestCounter(1), 0),
+				new MeterView(new TestCounter(2), 0),
+				new MeterView(new TestCounter(3), 0),
+				new MeterView(new TestCounter(4), 0),
+				new MeterView(new TestCounter(5), 0));
+		}
+	}
+
+	private static class TestCounter implements Counter {
+		private final long count;
+
+		private TestCounter(long count) {
+			this.count = count;
+		}
+
+		@Override
+		public void inc() {
+		}
+
+		@Override
+		public void inc(long n) {
+		}
+
+		@Override
+		public void dec() {
+		}
+
+		@Override
+		public void dec(long n) {
+		}
+
+		@Override
+		public long getCount() {
+			return count;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
new file mode 100644
index 0000000..0880133
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionMode;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class ArchivedExecutionConfigBuilder {
+	private String executionMode;
+	private String restartStrategyDescription;
+	private int parallelism;
+	private boolean objectReuseEnabled;
+	private Map<String, String> globalJobParameters;
+
+	public ArchivedExecutionConfigBuilder setExecutionMode(String executionMode) {
+		this.executionMode = executionMode;
+		return this;
+	}
+
+	public ArchivedExecutionConfigBuilder setRestartStrategyDescription(String restartStrategyDescription)
{
+		this.restartStrategyDescription = restartStrategyDescription;
+		return this;
+	}
+
+	public ArchivedExecutionConfigBuilder setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+		return this;
+	}
+
+	public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean objectReuseEnabled)
{
+		this.objectReuseEnabled = objectReuseEnabled;
+		return this;
+	}
+
+	public ArchivedExecutionConfigBuilder setGlobalJobParameters(Map<String, String> globalJobParameters)
{
+		this.globalJobParameters = globalJobParameters;
+		return this;
+	}
+
+	public ArchivedExecutionConfig build() {
+		return new ArchivedExecutionConfig(
+			executionMode != null ? executionMode : ExecutionMode.PIPELINED.name(),
+			restartStrategyDescription != null ? restartStrategyDescription : "default",
+			parallelism,
+			objectReuseEnabled,
+			globalJobParameters != null ? globalJobParameters : Collections.<String, String>emptyMap()
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
new file mode 100644
index 0000000..1514a5a
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class ArchivedExecutionGraphBuilder {
+
+	private static final Random RANDOM = new Random();
+
+	private JobID jobID;
+	private String jobName;
+	private Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
+	private List<ArchivedExecutionJobVertex> verticesInCreationOrder;
+	private long[] stateTimestamps;
+	private JobStatus state;
+	private String failureCause;
+	private String jsonPlan;
+	private StringifiedAccumulatorResult[] archivedUserAccumulators;
+	private ArchivedExecutionConfig archivedExecutionConfig;
+	private boolean isStoppable;
+	private Map<String, SerializedValue<Object>> serializedUserAccumulators;
+
+	public ArchivedExecutionGraphBuilder setJobID(JobID jobID) {
+		this.jobID = jobID;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setJobName(String jobName) {
+		this.jobName = jobName;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setTasks(Map<JobVertexID, ArchivedExecutionJobVertex>
tasks) {
+		this.tasks = tasks;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setVerticesInCreationOrder(List<ArchivedExecutionJobVertex>
verticesInCreationOrder) {
+		this.verticesInCreationOrder = verticesInCreationOrder;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setStateTimestamps(long[] stateTimestamps) {
+		Preconditions.checkArgument(stateTimestamps.length == JobStatus.values().length);
+		this.stateTimestamps = stateTimestamps;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setState(JobStatus state) {
+		this.state = state;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setFailureCause(String failureCause) {
+		this.failureCause = failureCause;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setJsonPlan(String jsonPlan) {
+		this.jsonPlan = jsonPlan;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[]
archivedUserAccumulators) {
+		this.archivedUserAccumulators = archivedUserAccumulators;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setArchivedExecutionConfig(ArchivedExecutionConfig
archivedExecutionConfig) {
+		this.archivedExecutionConfig = archivedExecutionConfig;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setStoppable(boolean stoppable) {
+		isStoppable = stoppable;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setSerializedUserAccumulators(Map<String, SerializedValue<Object>>
serializedUserAccumulators) {
+		this.serializedUserAccumulators = serializedUserAccumulators;
+		return this;
+	}
+
+	public ArchivedExecutionGraph build() {
+		Preconditions.checkNotNull(tasks, "Tasks must not be null.");
+		JobID jobID = this.jobID != null ? this.jobID : new JobID();
+		String jobName = this.jobName != null ? this.jobName : "job_" + RANDOM.nextInt();
+		return new ArchivedExecutionGraph(
+			jobID,
+			jobName,
+			tasks,
+			verticesInCreationOrder != null ? verticesInCreationOrder : new ArrayList<>(tasks.values()),
+			stateTimestamps != null ? stateTimestamps : new long[JobStatus.values().length],
+			state != null ? state : JobStatus.FINISHED,
+			failureCause != null ? failureCause : "(null)",
+			jsonPlan != null ? jsonPlan : "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName +
"\", \"nodes\":[]}",
+			archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0],
+			serializedUserAccumulators != null ? serializedUserAccumulators : Collections.<String,
SerializedValue<Object>>emptyMap(),
+			archivedExecutionConfig != null ? archivedExecutionConfig : new ArchivedExecutionConfigBuilder().build(),
+			isStoppable,
+			null,
+			null
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
new file mode 100644
index 0000000..8a45d35
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Random;
+
+public class ArchivedExecutionJobVertexBuilder {
+
+	private static final Random RANDOM = new Random();
+
+	private ArchivedExecutionVertex[] taskVertices;
+	private JobVertexID id;
+	private String name;
+	private int parallelism;
+	private int maxParallelism;
+	private StringifiedAccumulatorResult[] archivedUserAccumulators;
+
+	public ArchivedExecutionJobVertexBuilder setTaskVertices(ArchivedExecutionVertex[] taskVertices)
{
+		this.taskVertices = taskVertices;
+		return this;
+	}
+
+	public ArchivedExecutionJobVertexBuilder setId(JobVertexID id) {
+		this.id = id;
+		return this;
+	}
+
+	public ArchivedExecutionJobVertexBuilder setName(String name) {
+		this.name = name;
+		return this;
+	}
+
+	public ArchivedExecutionJobVertexBuilder setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+		return this;
+	}
+
+	public ArchivedExecutionJobVertexBuilder setMaxParallelism(int maxParallelism) {
+		this.maxParallelism = maxParallelism;
+		return this;
+	}
+
+	public ArchivedExecutionJobVertexBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[]
archivedUserAccumulators) {
+		this.archivedUserAccumulators = archivedUserAccumulators;
+		return this;
+	}
+
+	public ArchivedExecutionJobVertex build() {
+		Preconditions.checkNotNull(taskVertices);
+		return new ArchivedExecutionJobVertex(
+			taskVertices,
+			id != null ? id : new JobVertexID(),
+			name != null ? name : "task_" + RANDOM.nextInt(),
+			parallelism,
+			maxParallelism,
+			archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0]
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
new file mode 100644
index 0000000..3707374
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.util.EvictingBoundedList;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.Random;
+
+public class ArchivedExecutionVertexBuilder {
+
+	private static final Random RANDOM = new Random();
+
+	private int subtaskIndex;
+	private EvictingBoundedList<ArchivedExecution> priorExecutions;
+	private String taskNameWithSubtask;
+	private ArchivedExecution currentExecution;
+
+	public ArchivedExecutionVertexBuilder setSubtaskIndex(int subtaskIndex) {
+		this.subtaskIndex = subtaskIndex;
+		return this;
+	}
+
+	public ArchivedExecutionVertexBuilder setPriorExecutions(List<ArchivedExecution> priorExecutions)
{
+		this.priorExecutions = new EvictingBoundedList<>(priorExecutions.size());
+		for (ArchivedExecution execution : priorExecutions) {
+			this.priorExecutions.add(execution);
+		}
+		return this;
+	}
+
+	public ArchivedExecutionVertexBuilder setTaskNameWithSubtask(String taskNameWithSubtask)
{
+		this.taskNameWithSubtask = taskNameWithSubtask;
+		return this;
+	}
+
+	public ArchivedExecutionVertexBuilder setCurrentExecution(ArchivedExecution currentExecution)
{
+		this.currentExecution = currentExecution;
+		return this;
+	}
+
+	public ArchivedExecutionVertex build() {
+		Preconditions.checkNotNull(currentExecution);
+		return new ArchivedExecutionVertex(
+			subtaskIndex,
+			taskNameWithSubtask != null ? taskNameWithSubtask : "task_" + RANDOM.nextInt() + "_" +
subtaskIndex,
+			currentExecution,
+			priorExecutions != null ? priorExecutions : new EvictingBoundedList<ArchivedExecution>(0)
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
new file mode 100644
index 0000000..0340d87
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ArchivedJobGenerationUtils {
+	public static final ObjectMapper mapper = new ObjectMapper();
+	public static final JsonFactory jacksonFactory = new JsonFactory()
+		.enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
+		.disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);
+
+	private static ArchivedExecutionGraph originalJob;
+	private static ArchivedExecutionJobVertex originalTask;
+	private static ArchivedExecutionVertex originalSubtask;
+	private static ArchivedExecution originalAttempt;
+
+	private static final Object lock = new Object();
+
+	private ArchivedJobGenerationUtils() {
+	}
+
+	public static AccessExecutionGraph getTestJob() throws Exception {
+		synchronized (lock) {
+			if (originalJob == null) {
+				generateArchivedJob();
+			}
+		}
+		return originalJob;
+	}
+
+	public static AccessExecutionJobVertex getTestTask() throws Exception {
+		synchronized (lock) {
+			if (originalJob == null) {
+				generateArchivedJob();
+			}
+		}
+		return originalTask;
+	}
+
+	public static AccessExecutionVertex getTestSubtask() throws Exception {
+		synchronized (lock) {
+			if (originalJob == null) {
+				generateArchivedJob();
+			}
+		}
+		return originalSubtask;
+	}
+
+	public static AccessExecution getTestAttempt() throws Exception {
+		synchronized (lock) {
+			if (originalJob == null) {
+				generateArchivedJob();
+			}
+		}
+		return originalAttempt;
+	}
+
+	private static void generateArchivedJob() throws Exception {
+		// Attempt
+		StringifiedAccumulatorResult acc1 = new StringifiedAccumulatorResult("name1", "type1",
"value1");
+		StringifiedAccumulatorResult acc2 = new StringifiedAccumulatorResult("name2", "type2",
"value2");
+		TaskManagerLocation location = new TaskManagerLocation(new ResourceID("hello"), InetAddress.getLocalHost(),
1234);
+		originalAttempt = new ArchivedExecutionBuilder()
+			.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
+			.setParallelSubtaskIndex(1)
+			.setAttemptNumber(3)
+			.setAssignedResourceLocation(location)
+			.setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
+			.setState(ExecutionState.FINISHED)
+			.setFailureCause("attemptException")
+			.build();
+		// Subtask
+		originalSubtask = new ArchivedExecutionVertexBuilder()
+			.setSubtaskIndex(originalAttempt.getParallelSubtaskIndex())
+			.setTaskNameWithSubtask("hello(1/1)")
+			.setCurrentExecution(originalAttempt)
+			.build();
+		// Task
+		originalTask = new ArchivedExecutionJobVertexBuilder()
+			.setTaskVertices(new ArchivedExecutionVertex[]{originalSubtask})
+			.build();
+		// Job
+		Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new HashMap<>();
+		tasks.put(originalTask.getJobVertexId(), originalTask);
+		originalJob = new ArchivedExecutionGraphBuilder()
+			.setJobID(new JobID())
+			.setTasks(tasks)
+			.setFailureCause("jobException")
+			.setState(JobStatus.FINISHED)
+			.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
+			.setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
+			.build();
+	}
+
+	// ========================================================================
+	// utility methods
+	// ========================================================================
+
+	public static void compareStringifiedAccumulators(StringifiedAccumulatorResult[] expectedAccs,
ArrayNode writtenAccs) {
+		assertEquals(expectedAccs.length, writtenAccs.size());
+		for (int x = 0; x < expectedAccs.length; x++) {
+			JsonNode acc = writtenAccs.get(x);
+
+			assertEquals(expectedAccs[x].getName(), acc.get("name").asText());
+			assertEquals(expectedAccs[x].getType(), acc.get("type").asText());
+			assertEquals(expectedAccs[x].getValue(), acc.get("value").asText());
+		}
+	}
+
+	public static void compareIoMetrics(IOMetrics expectedMetrics, JsonNode writtenMetrics)
{
+		assertEquals(expectedMetrics.getNumBytesInTotal(), writtenMetrics.get("read-bytes").asLong());
+		assertEquals(expectedMetrics.getNumBytesOut(), writtenMetrics.get("write-bytes").asLong());
+		assertEquals(expectedMetrics.getNumRecordsIn(), writtenMetrics.get("read-records").asLong());
+		assertEquals(expectedMetrics.getNumRecordsOut(), writtenMetrics.get("write-records").asLong());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index c189d42..4b1c62f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -59,6 +59,21 @@ public class ArchivedExecution implements AccessExecution, Serializable
{
 		this.ioMetrics = execution.getIOMetrics();
 	}
 
+	public ArchivedExecution(
+			StringifiedAccumulatorResult[] userAccumulators, IOMetrics ioMetrics,
+			ExecutionAttemptID attemptId, int attemptNumber, ExecutionState state, String failureCause,
+			TaskManagerLocation assignedResourceLocation, int parallelSubtaskIndex, long[] stateTimestamps)
{
+		this.userAccumulators = userAccumulators;
+		this.ioMetrics = ioMetrics;
+		this.failureCause = failureCause;
+		this.assignedResourceLocation = assignedResourceLocation;
+		this.attemptNumber = attemptNumber;
+		this.attemptId = attemptId;
+		this.state = state;
+		this.stateTimestamps = stateTimestamps;
+		this.parallelSubtaskIndex = parallelSubtaskIndex;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//   Accessors
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
index c744907..6b54760 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
@@ -54,6 +54,21 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex,
Ser
 		this.maxParallelism = jobVertex.getMaxParallelism();
 	}
 
+	public ArchivedExecutionJobVertex(
+			ArchivedExecutionVertex[] taskVertices,
+			JobVertexID id,
+			String name,
+			int parallelism,
+			int maxParallelism,
+			StringifiedAccumulatorResult[] archivedUserAccumulators) {
+		this.taskVertices = taskVertices;
+		this.id = id;
+		this.name = name;
+		this.parallelism = parallelism;
+		this.maxParallelism = maxParallelism;
+		this.archivedUserAccumulators = archivedUserAccumulators;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//   Accessors
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index 5053cae..36669d3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -45,6 +45,15 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex,
Serializa
 		this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
 	}
 
+	public ArchivedExecutionVertex(
+			int subTaskIndex, String taskNameWithSubtask,
+			ArchivedExecution currentExecution, EvictingBoundedList<ArchivedExecution> priorExecutions)
{
+		this.subTaskIndex = subTaskIndex;
+		this.taskNameWithSubtask = taskNameWithSubtask;
+		this.currentExecution = currentExecution;
+		this.priorExecutions = priorExecutions;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//   Accessors
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
index 15c54b4..e0472ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -26,19 +26,19 @@ import java.io.Serializable;
  */
 public class IOMetrics implements Serializable {
 	private static final long serialVersionUID = -7208093607556457183L;
-	private final long numRecordsIn;
-	private final long numRecordsOut;
+	protected long numRecordsIn;
+	protected long numRecordsOut;
 
-	private final double numRecordsInPerSecond;
-	private final double numRecordsOutPerSecond;
+	protected double numRecordsInPerSecond;
+	protected double numRecordsOutPerSecond;
 
-	private final long numBytesInLocal;
-	private final long numBytesInRemote;
-	private final long numBytesOut;
+	protected long numBytesInLocal;
+	protected long numBytesInRemote;
+	protected long numBytesOut;
 
-	private final double numBytesInLocalPerSecond;
-	private final double numBytesInRemotePerSecond;
-	private final double numBytesOutPerSecond;
+	protected double numBytesInLocalPerSecond;
+	protected double numBytesInRemotePerSecond;
+	protected double numBytesOutPerSecond;
 
 	public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter bytesRemoteIn,
Meter bytesOut) {
 		this.numRecordsIn = recordsIn.getCount();
@@ -53,6 +53,22 @@ public class IOMetrics implements Serializable {
 		this.numBytesOutPerSecond = bytesOut.getRate();
 	}
 
+	public IOMetrics(
+			int numBytesInLocal, int numBytesInRemote, int numBytesOut, int numRecordsIn, int numRecordsOut,
+			double numBytesInLocalPerSecond, double numBytesInRemotePerSecond, double numBytesOutPerSecond,
+			double numRecordsInPerSecond, double numRecordsOutPerSecond) {
+		this.numBytesInLocal = numBytesInLocal;
+		this.numBytesInRemote = numBytesInRemote;
+		this.numBytesOut = numBytesOut;
+		this.numRecordsIn = numRecordsIn;
+		this.numRecordsOut = numRecordsOut;
+		this.numBytesInLocalPerSecond = numBytesInLocalPerSecond;
+		this.numBytesInRemotePerSecond = numBytesInRemotePerSecond;
+		this.numBytesOutPerSecond = numBytesOutPerSecond;
+		this.numRecordsInPerSecond = numRecordsInPerSecond;
+		this.numRecordsOutPerSecond = numRecordsOutPerSecond;
+	}
+
 	public long getNumRecordsIn() {
 		return numRecordsIn;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 3090172..b3e9d5d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -106,9 +106,13 @@ public class ArchivedExecutionGraphTest {
 			new NoRestartStrategy());
 		runtimeGraph.attachJobGraph(vertices);
 
+		List<ExecutionJobVertex> jobVertices = new ArrayList<>();
+		jobVertices.add(runtimeGraph.getJobVertex(v1ID));
+		jobVertices.add(runtimeGraph.getJobVertex(v2ID));
+		
 		CheckpointStatsTracker statsTracker = new CheckpointStatsTracker(
 				0,
-				Collections.<ExecutionJobVertex>emptyList(),
+				jobVertices,
 				mock(JobSnapshottingSettings.class),
 				new UnregisteredMetricsGroup());
 


Mime
View raw message