flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [01/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler to flink-runtime
Date Tue, 19 Sep 2017 22:44:11 GMT
Repository: flink
Updated Branches:
  refs/heads/master 327701032 -> 4fc019a96


http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionConfigBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionConfigBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionConfigBuilder.java
new file mode 100644
index 0000000..cb7fe90
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionConfigBuilder.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.utils;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionMode;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Utility class for constructing an ArchivedExecutionConfig.
+ */
+public class ArchivedExecutionConfigBuilder {
+	private String executionMode;
+	private String restartStrategyDescription;
+	private int parallelism;
+	private boolean objectReuseEnabled;
+	private Map<String, String> globalJobParameters;
+
+	public ArchivedExecutionConfigBuilder setExecutionMode(String executionMode) {
+		this.executionMode = executionMode;
+		return this;
+	}
+
+	public ArchivedExecutionConfigBuilder setRestartStrategyDescription(String restartStrategyDescription)
{
+		this.restartStrategyDescription = restartStrategyDescription;
+		return this;
+	}
+
+	public ArchivedExecutionConfigBuilder setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+		return this;
+	}
+
+	public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean objectReuseEnabled)
{
+		this.objectReuseEnabled = objectReuseEnabled;
+		return this;
+	}
+
+	public ArchivedExecutionConfigBuilder setGlobalJobParameters(Map<String, String> globalJobParameters)
{
+		this.globalJobParameters = globalJobParameters;
+		return this;
+	}
+
+	public ArchivedExecutionConfig build() {
+		return new ArchivedExecutionConfig(
+			executionMode != null ? executionMode : ExecutionMode.PIPELINED.name(),
+			restartStrategyDescription != null ? restartStrategyDescription : "default",
+			parallelism,
+			objectReuseEnabled,
+			globalJobParameters != null ? globalJobParameters : Collections.<String, String>emptyMap()
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
new file mode 100644
index 0000000..68077ba
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.utils;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Utility class for constructing an ArchivedExecutionGraph.
+ */
+public class ArchivedExecutionGraphBuilder {
+
+	private static final Random RANDOM = new Random();
+
+	private JobID jobID;
+	private String jobName;
+	private Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
+	private List<ArchivedExecutionJobVertex> verticesInCreationOrder;
+	private long[] stateTimestamps;
+	private JobStatus state;
+	private ErrorInfo failureCause;
+	private String jsonPlan;
+	private StringifiedAccumulatorResult[] archivedUserAccumulators;
+	private ArchivedExecutionConfig archivedExecutionConfig;
+	private boolean isStoppable;
+	private Map<String, SerializedValue<Object>> serializedUserAccumulators;
+
+	public ArchivedExecutionGraphBuilder setJobID(JobID jobID) {
+		this.jobID = jobID;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setJobName(String jobName) {
+		this.jobName = jobName;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setTasks(Map<JobVertexID, ArchivedExecutionJobVertex>
tasks) {
+		this.tasks = tasks;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setVerticesInCreationOrder(List<ArchivedExecutionJobVertex>
verticesInCreationOrder) {
+		this.verticesInCreationOrder = verticesInCreationOrder;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setStateTimestamps(long[] stateTimestamps) {
+		Preconditions.checkArgument(stateTimestamps.length == JobStatus.values().length);
+		this.stateTimestamps = stateTimestamps;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setState(JobStatus state) {
+		this.state = state;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setFailureCause(ErrorInfo failureCause) {
+		this.failureCause = failureCause;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setJsonPlan(String jsonPlan) {
+		this.jsonPlan = jsonPlan;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[]
archivedUserAccumulators) {
+		this.archivedUserAccumulators = archivedUserAccumulators;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setArchivedExecutionConfig(ArchivedExecutionConfig
archivedExecutionConfig) {
+		this.archivedExecutionConfig = archivedExecutionConfig;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setStoppable(boolean stoppable) {
+		isStoppable = stoppable;
+		return this;
+	}
+
+	public ArchivedExecutionGraphBuilder setSerializedUserAccumulators(Map<String, SerializedValue<Object>>
serializedUserAccumulators) {
+		this.serializedUserAccumulators = serializedUserAccumulators;
+		return this;
+	}
+
+	public ArchivedExecutionGraph build() {
+		Preconditions.checkNotNull(tasks, "Tasks must not be null.");
+		JobID jobID = this.jobID != null ? this.jobID : new JobID();
+		String jobName = this.jobName != null ? this.jobName : "job_" + RANDOM.nextInt();
+		return new ArchivedExecutionGraph(
+			jobID,
+			jobName,
+			tasks,
+			verticesInCreationOrder != null ? verticesInCreationOrder : new ArrayList<>(tasks.values()),
+			stateTimestamps != null ? stateTimestamps : new long[JobStatus.values().length],
+			state != null ? state : JobStatus.FINISHED,
+			failureCause,
+			jsonPlan != null ? jsonPlan : "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName +
"\", \"nodes\":[]}",
+			archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0],
+			serializedUserAccumulators != null ? serializedUserAccumulators : Collections.<String,
SerializedValue<Object>>emptyMap(),
+			archivedExecutionConfig != null ? archivedExecutionConfig : new ArchivedExecutionConfigBuilder().build(),
+			isStoppable,
+			null,
+			null
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java
new file mode 100644
index 0000000..814c4db
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.utils;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Random;
+
+/**
+ * Utility class for constructing an ArchivedExecutionJobVertex.
+ */
+public class ArchivedExecutionJobVertexBuilder {
+
+	private static final Random RANDOM = new Random();
+
+	private ArchivedExecutionVertex[] taskVertices;
+	private JobVertexID id;
+	private String name;
+	private int parallelism;
+	private int maxParallelism;
+	private StringifiedAccumulatorResult[] archivedUserAccumulators;
+
+	public ArchivedExecutionJobVertexBuilder setTaskVertices(ArchivedExecutionVertex[] taskVertices)
{
+		this.taskVertices = taskVertices;
+		return this;
+	}
+
+	public ArchivedExecutionJobVertexBuilder setId(JobVertexID id) {
+		this.id = id;
+		return this;
+	}
+
+	public ArchivedExecutionJobVertexBuilder setName(String name) {
+		this.name = name;
+		return this;
+	}
+
+	public ArchivedExecutionJobVertexBuilder setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+		return this;
+	}
+
+	public ArchivedExecutionJobVertexBuilder setMaxParallelism(int maxParallelism) {
+		this.maxParallelism = maxParallelism;
+		return this;
+	}
+
+	public ArchivedExecutionJobVertexBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[]
archivedUserAccumulators) {
+		this.archivedUserAccumulators = archivedUserAccumulators;
+		return this;
+	}
+
+	public ArchivedExecutionJobVertex build() {
+		Preconditions.checkNotNull(taskVertices);
+		return new ArchivedExecutionJobVertex(
+			taskVertices,
+			id != null ? id : new JobVertexID(),
+			name != null ? name : "task_" + RANDOM.nextInt(),
+			parallelism,
+			maxParallelism,
+			archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0]
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionVertexBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionVertexBuilder.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionVertexBuilder.java
new file mode 100644
index 0000000..935c792
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionVertexBuilder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.utils;
+
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.util.EvictingBoundedList;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Utility class for constructing an ArchivedExecutionVertex.
+ */
+public class ArchivedExecutionVertexBuilder {
+
+	private static final Random RANDOM = new Random();
+
+	private int subtaskIndex;
+	private EvictingBoundedList<ArchivedExecution> priorExecutions;
+	private String taskNameWithSubtask;
+	private ArchivedExecution currentExecution;
+
+	public ArchivedExecutionVertexBuilder setSubtaskIndex(int subtaskIndex) {
+		this.subtaskIndex = subtaskIndex;
+		return this;
+	}
+
+	public ArchivedExecutionVertexBuilder setPriorExecutions(List<ArchivedExecution> priorExecutions)
{
+		this.priorExecutions = new EvictingBoundedList<>(priorExecutions.size());
+		for (ArchivedExecution execution : priorExecutions) {
+			this.priorExecutions.add(execution);
+		}
+		return this;
+	}
+
+	public ArchivedExecutionVertexBuilder setTaskNameWithSubtask(String taskNameWithSubtask)
{
+		this.taskNameWithSubtask = taskNameWithSubtask;
+		return this;
+	}
+
+	public ArchivedExecutionVertexBuilder setCurrentExecution(ArchivedExecution currentExecution)
{
+		this.currentExecution = currentExecution;
+		return this;
+	}
+
+	public ArchivedExecutionVertex build() {
+		Preconditions.checkNotNull(currentExecution);
+		return new ArchivedExecutionVertex(
+			subtaskIndex,
+			taskNameWithSubtask != null ? taskNameWithSubtask : "task_" + RANDOM.nextInt() + "_" +
subtaskIndex,
+			currentExecution,
+			priorExecutions != null ? priorExecutions : new EvictingBoundedList<ArchivedExecution>(0)
+		);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
new file mode 100644
index 0000000..d256e92
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Common entry-point for accessing generated ArchivedExecution* components.
+ */
+public class ArchivedJobGenerationUtils {
+	public static final ObjectMapper MAPPER = new ObjectMapper();
+	public static final JsonFactory JACKSON_FACTORY = new JsonFactory()
+		.enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
+		.disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);
+
+	private static ArchivedExecutionGraph originalJob;
+	private static ArchivedExecutionJobVertex originalTask;
+	private static ArchivedExecutionVertex originalSubtask;
+	private static ArchivedExecution originalAttempt;
+
+	private static final Object lock = new Object();
+
+	private ArchivedJobGenerationUtils() {
+	}
+
+	public static AccessExecutionGraph getTestJob() throws Exception {
+		synchronized (lock) {
+			if (originalJob == null) {
+				generateArchivedJob();
+			}
+		}
+		return originalJob;
+	}
+
+	public static AccessExecutionJobVertex getTestTask() throws Exception {
+		synchronized (lock) {
+			if (originalJob == null) {
+				generateArchivedJob();
+			}
+		}
+		return originalTask;
+	}
+
+	public static AccessExecutionVertex getTestSubtask() throws Exception {
+		synchronized (lock) {
+			if (originalJob == null) {
+				generateArchivedJob();
+			}
+		}
+		return originalSubtask;
+	}
+
+	public static AccessExecution getTestAttempt() throws Exception {
+		synchronized (lock) {
+			if (originalJob == null) {
+				generateArchivedJob();
+			}
+		}
+		return originalAttempt;
+	}
+
+	private static void generateArchivedJob() throws Exception {
+		// Attempt
+		StringifiedAccumulatorResult acc1 = new StringifiedAccumulatorResult("name1", "type1",
"value1");
+		StringifiedAccumulatorResult acc2 = new StringifiedAccumulatorResult("name2", "type2",
"value2");
+		TaskManagerLocation location = new TaskManagerLocation(new ResourceID("hello"), InetAddress.getLocalHost(),
1234);
+		originalAttempt = new ArchivedExecutionBuilder()
+			.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9})
+			.setParallelSubtaskIndex(1)
+			.setAttemptNumber(0)
+			.setAssignedResourceLocation(location)
+			.setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
+			.setState(ExecutionState.FINISHED)
+			.setFailureCause("attemptException")
+			.build();
+		// Subtask
+		originalSubtask = new ArchivedExecutionVertexBuilder()
+			.setSubtaskIndex(originalAttempt.getParallelSubtaskIndex())
+			.setTaskNameWithSubtask("hello(1/1)")
+			.setCurrentExecution(originalAttempt)
+			.build();
+		// Task
+		originalTask = new ArchivedExecutionJobVertexBuilder()
+			.setTaskVertices(new ArchivedExecutionVertex[]{originalSubtask})
+			.build();
+		// Job
+		Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new HashMap<>();
+		tasks.put(originalTask.getJobVertexId(), originalTask);
+		originalJob = new ArchivedExecutionGraphBuilder()
+			.setJobID(new JobID())
+			.setTasks(tasks)
+			.setFailureCause(new ErrorInfo(new Exception("jobException"), originalAttempt.getStateTimestamp(ExecutionState.FAILED)))
+			.setState(JobStatus.FINISHED)
+			.setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10})
+			.setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2})
+			.build();
+	}
+
+	// ========================================================================
+	// utility methods
+	// ========================================================================
+
+	public static void compareStringifiedAccumulators(StringifiedAccumulatorResult[] expectedAccs,
ArrayNode writtenAccs) {
+		assertEquals(expectedAccs.length, writtenAccs.size());
+		for (int x = 0; x < expectedAccs.length; x++) {
+			JsonNode acc = writtenAccs.get(x);
+
+			assertEquals(expectedAccs[x].getName(), acc.get("name").asText());
+			assertEquals(expectedAccs[x].getType(), acc.get("type").asText());
+			assertEquals(expectedAccs[x].getValue(), acc.get("value").asText());
+		}
+	}
+
+	public static void compareIoMetrics(IOMetrics expectedMetrics, JsonNode writtenMetrics)
{
+		assertEquals(expectedMetrics.getNumBytesInTotal(), writtenMetrics.get("read-bytes").asLong());
+		assertEquals(expectedMetrics.getNumBytesOut(), writtenMetrics.get("write-bytes").asLong());
+		assertEquals(expectedMetrics.getNumRecordsIn(), writtenMetrics.get("read-records").asLong());
+		assertEquals(expectedMetrics.getNumRecordsOut(), writtenMetrics.get("write-records").asLong());
+	}
+}


Mime
View raw message