flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ches...@apache.org
Subject [2/3] flink git commit: [FLINK-4720] Implement archived ExecutionGraph
Date Fri, 14 Oct 2016 11:55:48 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
new file mode 100644
index 0000000..9faf3fb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+/**
+ * Common interface for the runtime {@link ExecutionVertex} and {@link ArchivedExecutionVertex}.
+ */
+public interface AccessExecutionVertex {
+	/**
+	 * Returns the name of this execution vertex in the format "myTask (2/7)".
+	 *
+	 * @return name of this execution vertex
+	 */
+	String getTaskNameWithSubtaskIndex();
+
+	/**
+	 * Returns the subtask index of this execution vertex.
+	 *
+	 * @return subtask index of this execution vertex.
+	 */
+	int getParallelSubtaskIndex();
+
+	/**
+	 * Returns the current execution for this execution vertex.
+	 *
+	 * @return current execution
+	 */
+	AccessExecution getCurrentExecutionAttempt();
+
+	/**
+	 * Returns the current {@link ExecutionState} for this execution vertex.
+	 *
+	 * @return execution state for this execution vertex
+	 */
+	ExecutionState getExecutionState();
+
+	/**
+	 * Returns the timestamp for the given {@link ExecutionState}.
+	 *
+	 * @param state state for which the timestamp should be returned
+	 * @return timestamp for the given state
+	 */
+	long getStateTimestamp(ExecutionState state);
+
+	/**
+	 * Returns the exception that caused the job to fail. This is the first root exception
+	 * that was not recoverable and triggered job failure.
+	 *
+	 * @return failure exception as a string, or {@code "(null)"}
+	 */
+	String getFailureCauseAsString();
+
+	/**
+	 * Returns the {@link TaskManagerLocation} for this execution vertex.
+	 *
+	 * @return taskmanager location for this execution vertex.
+	 */
+	TaskManagerLocation getCurrentAssignedResourceLocation();
+
+	/**
+	 * Returns the execution for the given attempt number.
+	 *
+	 * @param attemptNumber attempt number of execution to be returned
+	 * @return execution for the given attempt number
+	 */
+	AccessExecution getPriorExecutionAttempt(int attemptNumber);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
new file mode 100644
index 0000000..0b2992f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public class ArchivedExecution implements AccessExecution, Serializable {
+	private static final long serialVersionUID = 4817108757483345173L;
+	// --------------------------------------------------------------------------------------------
+
+	private final ExecutionAttemptID attemptId;
+
+	private final long[] stateTimestamps;
+
+	private final int attemptNumber;
+
+	private final ExecutionState state;
+
+	private final String failureCause;          // once assigned, never changes
+
+	private final TaskManagerLocation assignedResourceLocation; // for the archived execution
+
+	/* Continuously updated map of user-defined accumulators */
+	private final StringifiedAccumulatorResult[] userAccumulators;
+
+	/* Continuously updated map of internal accumulators */
+	private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> flinkAccumulators;
+	private final int parallelSubtaskIndex;
+
+	public ArchivedExecution(Execution execution) {
+		this.userAccumulators = execution.getUserAccumulatorsStringified();
+		this.flinkAccumulators = execution.getFlinkAccumulators();
+		this.attemptId = execution.getAttemptId();
+		this.attemptNumber = execution.getAttemptNumber();
+		this.stateTimestamps = execution.getStateTimestamps();
+		this.parallelSubtaskIndex = execution.getVertex().getParallelSubtaskIndex();
+		this.state = execution.getState();
+		this.failureCause = ExceptionUtils.stringifyException(execution.getFailureCause());
+		this.assignedResourceLocation = execution.getAssignedResourceLocation();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//   Accessors
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public ExecutionAttemptID getAttemptId() {
+		return attemptId;
+	}
+
+	@Override
+	public int getAttemptNumber() {
+		return attemptNumber;
+	}
+
+	@Override
+	public long[] getStateTimestamps() {
+		return stateTimestamps;
+	}
+
+	@Override
+	public ExecutionState getState() {
+		return state;
+	}
+
+	@Override
+	public TaskManagerLocation getAssignedResourceLocation() {
+		return assignedResourceLocation;
+	}
+
+	@Override
+	public String getFailureCauseAsString() {
+		return failureCause;
+	}
+
+	@Override
+	public long getStateTimestamp(ExecutionState state) {
+		return this.stateTimestamps[state.ordinal()];
+	}
+
+	@Override
+	public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
+		return userAccumulators;
+	}
+
+	@Override
+	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() {
+		return flinkAccumulators;
+	}
+
+	@Override
+	public int getParallelSubtaskIndex() {
+		return parallelSubtaskIndex;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
new file mode 100644
index 0000000..493825a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.SerializedValue;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializable {
+	private static final long serialVersionUID = 7231383912742578428L;
+	// --------------------------------------------------------------------------------------------
+
+	/** The ID of the job this graph has been built for. */
+	private final JobID jobID;
+
+	/** The name of the original job graph. */
+	private final String jobName;
+
+	/** All job vertices that are part of this graph */
+	private final Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
+
+	/** All vertices, in the order in which they were created **/
+	private final List<ArchivedExecutionJobVertex> verticesInCreationOrder;
+
+	/**
+	 * Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when
+	 * the execution graph transitioned into a certain state. The index into this array is the
+	 * ordinal of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is
+	 * at {@code stateTimestamps[RUNNING.ordinal()]}.
+	 */
+	private final long[] stateTimestamps;
+
+	// ------ Configuration of the Execution -------
+
+	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
+
+	/** Current status of the job execution */
+	private final JobStatus state;
+
+	/**
+	 * The exception that caused the job to fail. This is set to the first root exception
+	 * that was not recoverable and triggered job failure
+	 */
+	private final String failureCause;
+
+	// ------ Fields that are only relevant for archived execution graphs ------------
+	private final String jsonPlan;
+	private final StringifiedAccumulatorResult[] archivedUserAccumulators;
+	private final ArchivedExecutionConfig archivedExecutionConfig;
+	private final boolean isStoppable;
+	private final Map<String, SerializedValue<Object>> serializedUserAccumulators;
+	private final ArchivedCheckpointStatsTracker tracker;
+
+	public ArchivedExecutionGraph(
+		JobID jobID,
+		String jobName,
+		Map<JobVertexID, ArchivedExecutionJobVertex> tasks,
+		List<ArchivedExecutionJobVertex> verticesInCreationOrder,
+		long[] stateTimestamps,
+		JobStatus state,
+		String failureCause,
+		String jsonPlan,
+		StringifiedAccumulatorResult[] archivedUserAccumulators,
+		Map<String, SerializedValue<Object>> serializedUserAccumulators,
+		ArchivedExecutionConfig executionConfig,
+		boolean isStoppable,
+		ArchivedCheckpointStatsTracker tracker
+	) {
+		this.jobID = jobID;
+		this.jobName = jobName;
+		this.tasks = tasks;
+		this.verticesInCreationOrder = verticesInCreationOrder;
+		this.stateTimestamps = stateTimestamps;
+		this.state = state;
+		this.failureCause = failureCause;
+		this.jsonPlan = jsonPlan;
+		this.archivedUserAccumulators = archivedUserAccumulators;
+		this.serializedUserAccumulators = serializedUserAccumulators;
+		this.archivedExecutionConfig = executionConfig;
+		this.isStoppable = isStoppable;
+		this.tracker = tracker;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	@Override
+	public String getJsonPlan() {
+		return jsonPlan;
+	}
+
+	@Override
+	public JobID getJobID() {
+		return jobID;
+	}
+
+	@Override
+	public String getJobName() {
+		return jobName;
+	}
+
+	@Override
+	public JobStatus getState() {
+		return state;
+	}
+
+	@Override
+	public String getFailureCauseAsString() {
+		return failureCause;
+	}
+
+	@Override
+	public ArchivedExecutionJobVertex getJobVertex(JobVertexID id) {
+		return this.tasks.get(id);
+	}
+
+	@Override
+	public Map<JobVertexID, AccessExecutionJobVertex> getAllVertices() {
+		return Collections.<JobVertexID, AccessExecutionJobVertex>unmodifiableMap(this.tasks);
+	}
+
+	@Override
+	public Iterable<ArchivedExecutionJobVertex> getVerticesTopologically() {
+		// we return a specific iterator that does not fail with concurrent modifications
+		// the list is append only, so it is safe for that
+		final int numElements = this.verticesInCreationOrder.size();
+
+		return new Iterable<ArchivedExecutionJobVertex>() {
+			@Override
+			public Iterator<ArchivedExecutionJobVertex> iterator() {
+				return new Iterator<ArchivedExecutionJobVertex>() {
+					private int pos = 0;
+
+					@Override
+					public boolean hasNext() {
+						return pos < numElements;
+					}
+
+					@Override
+					public ArchivedExecutionJobVertex next() {
+						if (hasNext()) {
+							return verticesInCreationOrder.get(pos++);
+						} else {
+							throw new NoSuchElementException();
+						}
+					}
+
+					@Override
+					public void remove() {
+						throw new UnsupportedOperationException();
+					}
+				};
+			}
+		};
+	}
+
+	@Override
+	public Iterable<ArchivedExecutionVertex> getAllExecutionVertices() {
+		return new Iterable<ArchivedExecutionVertex>() {
+			@Override
+			public Iterator<ArchivedExecutionVertex> iterator() {
+				return new AllVerticesIterator(getVerticesTopologically().iterator());
+			}
+		};
+	}
+
+	@Override
+	public long getStatusTimestamp(JobStatus status) {
+		return this.stateTimestamps[status.ordinal()];
+	}
+
+	@Override
+	public CheckpointStatsTracker getCheckpointStatsTracker() {
+		return tracker;
+	}
+
+	/**
+	 * Gets the internal flink accumulator map of maps which contains some metrics.
+	 *
+	 * @return A map of accumulators for every executed task.
+	 */
+	@Override
+	public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> getFlinkAccumulators() {
+		Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators =
+			new HashMap<>();
+
+		for (AccessExecutionVertex vertex : getAllExecutionVertices()) {
+			Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> taskAccs = vertex.getCurrentExecutionAttempt().getFlinkAccumulators();
+			flinkAccumulators.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskAccs);
+		}
+
+		return flinkAccumulators;
+	}
+
+	@Override
+	public boolean isArchived() {
+		return true;
+	}
+
+	public StringifiedAccumulatorResult[] getUserAccumulators() {
+		return archivedUserAccumulators;
+	}
+
+	public ArchivedExecutionConfig getArchivedExecutionConfig() {
+		return archivedExecutionConfig;
+	}
+
+	@Override
+	public boolean isStoppable() {
+		return isStoppable;
+	}
+
+	@Override
+	public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
+		return archivedUserAccumulators;
+	}
+
+	@Override
+	public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() {
+		return serializedUserAccumulators;
+	}
+
+	class AllVerticesIterator implements Iterator<ArchivedExecutionVertex> {
+
+		private final Iterator<ArchivedExecutionJobVertex> jobVertices;
+
+		private ArchivedExecutionVertex[] currVertices;
+
+		private int currPos;
+
+
+		public AllVerticesIterator(Iterator<ArchivedExecutionJobVertex> jobVertices) {
+			this.jobVertices = jobVertices;
+		}
+
+
+		@Override
+		public boolean hasNext() {
+			while (true) {
+				if (currVertices != null) {
+					if (currPos < currVertices.length) {
+						return true;
+					} else {
+						currVertices = null;
+					}
+				} else if (jobVertices.hasNext()) {
+					currVertices = jobVertices.next().getTaskVertices();
+					currPos = 0;
+				} else {
+					return false;
+				}
+			}
+		}
+
+		@Override
+		public ArchivedExecutionVertex next() {
+			if (hasNext()) {
+				return currVertices[currPos++];
+			} else {
+				throw new NoSuchElementException();
+			}
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
new file mode 100644
index 0000000..4857bf5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import scala.Option;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.flink.runtime.executiongraph.ExecutionJobVertex.getAggregateJobVertexState;
+
+public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Serializable {
+
+	private static final long serialVersionUID = -5768187638639437957L;
+	private final ArchivedExecutionVertex[] taskVertices;
+
+	private final JobVertexID id;
+
+	private final String name;
+
+	private final int parallelism;
+
+	private final int maxParallelism;
+
+	private final Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> aggregatedMetricAccumulators;
+	private final Option<OperatorCheckpointStats> checkpointStats;
+	private final StringifiedAccumulatorResult[] archivedUserAccumulators;
+
+	public ArchivedExecutionJobVertex(ExecutionJobVertex jobVertex) {
+		this.taskVertices = new ArchivedExecutionVertex[jobVertex.getTaskVertices().length];
+		for (int x = 0; x < taskVertices.length; x++) {
+			taskVertices[x] = jobVertex.getTaskVertices()[x].archive();
+		}
+
+		aggregatedMetricAccumulators = jobVertex.getAggregatedMetricAccumulators();
+
+		Map<String, Accumulator<?, ?>> tmpArchivedUserAccumulators = new HashMap<>();
+		for (ExecutionVertex vertex : jobVertex.getTaskVertices()) {
+			Map<String, Accumulator<?, ?>> next = vertex.getCurrentExecutionAttempt().getUserAccumulators();
+			if (next != null) {
+				AccumulatorHelper.mergeInto(tmpArchivedUserAccumulators, next);
+			}
+		}
+		archivedUserAccumulators = jobVertex.getAggregatedUserAccumulatorsStringified();
+
+		this.id = jobVertex.getJobVertexId();
+		this.name = jobVertex.getJobVertex().getName();
+		this.parallelism = jobVertex.getParallelism();
+		this.maxParallelism = jobVertex.getMaxParallelism();
+		CheckpointStatsTracker tracker = jobVertex.getGraph().getCheckpointStatsTracker();
+		checkpointStats = tracker != null
+			? tracker.getOperatorStats(this.id)
+			: Option.<OperatorCheckpointStats>empty();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//   Accessors
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public String getName() {
+		return name;
+	}
+
+	@Override
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	@Override
+	public int getMaxParallelism() {
+		return maxParallelism;
+	}
+
+	@Override
+	public JobVertexID getJobVertexId() {
+		return id;
+	}
+
+	@Override
+	public ArchivedExecutionVertex[] getTaskVertices() {
+		return taskVertices;
+	}
+
+	@Override
+	public ExecutionState getAggregateState() {
+		int[] num = new int[ExecutionState.values().length];
+		for (ArchivedExecutionVertex vertex : this.taskVertices) {
+			num[vertex.getExecutionState().ordinal()]++;
+		}
+
+		return getAggregateJobVertexState(num, parallelism);
+	}
+
+	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getAggregatedMetricAccumulators() {
+		return this.aggregatedMetricAccumulators;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Static / pre-assigned input splits
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public Option<OperatorCheckpointStats> getCheckpointStats() {
+		return checkpointStats;
+	}
+
+	@Override
+	public StringifiedAccumulatorResult[] getAggregatedUserAccumulatorsStringified() {
+		return archivedUserAccumulators;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
new file mode 100644
index 0000000..e1fb11a
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializable {
+
+	private static final long serialVersionUID = -6708241535015028576L;
+	private final int subTaskIndex;
+
+	private final List<ArchivedExecution> priorExecutions;
+
+	/** The name in the format "myTask (2/7)", cached to avoid frequent string concatenations */
+	private final String taskNameWithSubtask;
+
+	private final ArchivedExecution currentExecution;    // this field must never be null
+
+	public ArchivedExecutionVertex(ExecutionVertex vertex) {
+		this.subTaskIndex = vertex.getParallelSubtaskIndex();
+		this.priorExecutions = new ArrayList<>();
+		for (Execution priorExecution : vertex.getPriorExecutions()) {
+			priorExecutions.add(priorExecution.archive());
+		}
+		this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex();
+		this.currentExecution = vertex.getCurrentExecutionAttempt().archive();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//   Accessors
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	public String getTaskNameWithSubtaskIndex() {
+		return this.taskNameWithSubtask;
+	}
+
+	@Override
+	public int getParallelSubtaskIndex() {
+		return this.subTaskIndex;
+	}
+
+	@Override
+	public ArchivedExecution getCurrentExecutionAttempt() {
+		return currentExecution;
+	}
+
+	@Override
+	public ExecutionState getExecutionState() {
+		return currentExecution.getState();
+	}
+
+	@Override
+	public long getStateTimestamp(ExecutionState state) {
+		return currentExecution.getStateTimestamp(state);
+	}
+
+	@Override
+	public String getFailureCauseAsString() {
+		return currentExecution.getFailureCauseAsString();
+	}
+
+	@Override
+	public TaskManagerLocation getCurrentAssignedResourceLocation() {
+		return currentExecution.getAssignedResourceLocation();
+	}
+
+	@Override
+	public ArchivedExecution getPriorExecutionAttempt(int attemptNumber) {
+		if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
+			return priorExecutions.get(attemptNumber);
+		} else {
+			throw new IllegalArgumentException("attempt does not exist");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b92e3af..0b56931 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.dispatch.OnComplete;
 import akka.dispatch.OnFailure;
+import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
@@ -102,7 +103,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * occasional double-checking to ensure that the state after a completed call is as expected, and trigger correcting
  * actions if it is not. Many actions are also idempotent (like canceling).
  */
-public class Execution {
+public class Execution implements AccessExecution, Archiveable<ArchivedExecution> {
 
 	private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
@@ -188,14 +189,17 @@ public class Execution {
 		return vertex;
 	}
 
+	@Override
 	public ExecutionAttemptID getAttemptId() {
 		return attemptId;
 	}
 
+	@Override
 	public int getAttemptNumber() {
 		return attemptNumber;
 	}
 
+	@Override
 	public ExecutionState getState() {
 		return state;
 	}
@@ -204,6 +208,7 @@ public class Execution {
 		return assignedResource;
 	}
 
+	@Override
 	public TaskManagerLocation getAssignedResourceLocation() {
 		return assignedResourceLocation;
 	}
@@ -212,10 +217,17 @@ public class Execution {
 		return failureCause;
 	}
 
+	@Override
+	public String getFailureCauseAsString() {
+		return ExceptionUtils.stringifyException(getFailureCause());
+	}
+
+	@Override
 	public long[] getStateTimestamps() {
 		return stateTimestamps;
 	}
 
+	@Override
 	public long getStateTimestamp(ExecutionState state) {
 		return this.stateTimestamps[state.ordinal()];
 	}
@@ -237,21 +249,6 @@ public class Execution {
 	}
 
 	/**
-	 * This method cleans fields that are irrelevant for the archived execution attempt.
-	 */
-	public void prepareForArchiving() {
-		if (assignedResource != null && assignedResource.isAlive()) {
-			throw new IllegalStateException("Cannot archive Execution while the assigned resource is still running.");
-		}
-		assignedResource = null;
-
-		executionContext = null;
-
-		partialInputChannelDeploymentDescriptors.clear();
-		partialInputChannelDeploymentDescriptors = null;
-	}
-
-	/**
 	 * Sets the initial state for the execution. The serialized state is then shipped via the
 	 * {@link TaskDeploymentDescriptor} to the TaskManagers.
 	 *
@@ -1055,14 +1052,21 @@ public class Execution {
 		return userAccumulators;
 	}
 
+	@Override
 	public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() {
 		return StringifiedAccumulatorResult.stringifyAccumulatorResults(userAccumulators);
 	}
 
+	@Override
 	public Map<AccumulatorRegistry.Metric, Accumulator<?, ?>> getFlinkAccumulators() {
 		return flinkAccumulators;
 	}
 
+	@Override
+	public int getParallelSubtaskIndex() {
+		return getVertex().getParallelSubtaskIndex();
+	}
+
 	// ------------------------------------------------------------------------
 	//  Standard utilities
 	// ------------------------------------------------------------------------
@@ -1072,4 +1076,9 @@ public class Execution {
 		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(),
 				(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
 	}
+
+	@Override
+	public ArchivedExecution archive() {
+		return new ArchivedExecution(this);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 10f0e88..aa9406c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -32,14 +33,16 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.ArchivedCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
-import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoader;
-import org.apache.flink.runtime.executiongraph.archive.ExecutionConfigSummary;
+import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -58,8 +61,10 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.duration.FiniteDuration;
+import scala.Option;
 
 import java.io.IOException;
 import java.net.URL;
@@ -102,7 +107,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *         address the message receiver.</li>
  * </ul>
  */
-public class ExecutionGraph {
+public class ExecutionGraph implements AccessExecutionGraph, Archiveable<ArchivedExecutionGraph> {
 
 	private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
@@ -180,9 +185,6 @@ public class ExecutionGraph {
 	 * from results than need to be materialized. */
 	private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
-	/** Flag to indicate whether the Graph has been archived */
-	private boolean isArchived = false;
-
 	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
 
 	/** Current status of the job execution */
@@ -222,9 +224,6 @@ public class ExecutionGraph {
 	// ------ Fields that are only relevant for archived execution graphs ------------
 	private String jsonPlan;
 
-	/** Serializable summary of all job config values, e.g. for web interface */
-	private ExecutionConfigSummary executionConfigSummary;
-
 	// --------------------------------------------------------------------------------------------
 	//   Constructors
 	// --------------------------------------------------------------------------------------------
@@ -304,16 +303,6 @@ public class ExecutionGraph {
 		metricGroup.gauge(RESTARTING_TIME_METRIC_NAME, new RestartTimeGauge());
 
 		this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
-
-		// create a summary of all relevant data accessed in the web interface's JobConfigHandler
-		try {
-			ExecutionConfig executionConfig = serializedConfig.deserializeValue(userClassLoader);
-			if (executionConfig != null) {
-				this.executionConfigSummary = new ExecutionConfigSummary(executionConfig);
-			}
-		} catch (IOException | ClassNotFoundException e) {
-			LOG.error("Couldn't create ExecutionConfigSummary for job {} ", jobID, e);
-		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -344,8 +333,9 @@ public class ExecutionGraph {
 		return scheduleMode;
 	}
 
+	@Override
 	public boolean isArchived() {
-		return isArchived;
+		return false;
 	}
 
 	public void enableSnapshotCheckpointing(
@@ -434,6 +424,7 @@ public class ExecutionGraph {
 		return restartStrategy;
 	}
 
+	@Override
 	public CheckpointStatsTracker getCheckpointStatsTracker() {
 		return checkpointStatsTracker;
 	}
@@ -484,6 +475,7 @@ public class ExecutionGraph {
 		this.jsonPlan = jsonPlan;
 	}
 
+	@Override
 	public String getJsonPlan() {
 		return jsonPlan;
 	}
@@ -492,14 +484,17 @@ public class ExecutionGraph {
 		return slotProvider;
 	}
 
+	@Override
 	public JobID getJobID() {
 		return jobID;
 	}
 
+	@Override
 	public String getJobName() {
 		return jobName;
 	}
 
+	@Override
 	public boolean isStoppable() {
 		return this.isStoppable;
 	}
@@ -512,6 +507,7 @@ public class ExecutionGraph {
 		return this.userClassLoader;
 	}
 
+	@Override
 	public JobStatus getState() {
 		return state;
 	}
@@ -520,14 +516,22 @@ public class ExecutionGraph {
 		return failureCause;
 	}
 
+	@Override
+	public String getFailureCauseAsString() {
+		return ExceptionUtils.stringifyException(failureCause);
+	}
+
+	@Override
 	public ExecutionJobVertex getJobVertex(JobVertexID id) {
 		return this.tasks.get(id);
 	}
 
+	@Override
 	public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
 		return Collections.unmodifiableMap(this.tasks);
 	}
 
+	@Override
 	public Iterable<ExecutionJobVertex> getVerticesTopologically() {
 		// we return a specific iterator that does not fail with concurrent modifications
 		// the list is append only, so it is safe for that
@@ -566,6 +570,7 @@ public class ExecutionGraph {
 		return Collections.unmodifiableMap(this.intermediateResults);
 	}
 
+	@Override
 	public Iterable<ExecutionVertex> getAllExecutionVertices() {
 		return new Iterable<ExecutionVertex>() {
 			@Override
@@ -575,6 +580,7 @@ public class ExecutionGraph {
 		};
 	}
 
+	@Override
 	public long getStatusTimestamp(JobStatus status) {
 		return this.stateTimestamps[status.ordinal()];
 	}
@@ -592,6 +598,7 @@ public class ExecutionGraph {
 	 * Gets the internal flink accumulator map of maps which contains some metrics.
 	 * @return A map of accumulators for every executed task.
 	 */
+	@Override
 	public Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> getFlinkAccumulators() {
 		Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>> flinkAccumulators =
 				new HashMap<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?, ?>>>();
@@ -627,6 +634,7 @@ public class ExecutionGraph {
 	 * @return The accumulator map with serialized accumulator values.
 	 * @throws IOException
 	 */
+	@Override
 	public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException {
 
 		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
@@ -643,6 +651,7 @@ public class ExecutionGraph {
 	 * Returns the a stringified version of the user-defined accumulators.
 	 * @return an Array containing the StringifiedAccumulatorResult objects
 	 */
+	@Override
 	public StringifiedAccumulatorResult[] getAccumulatorResultsStringified() {
 		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
 		return StringifiedAccumulatorResult.stringifyAccumulatorResults(accumulatorMap);
@@ -926,51 +935,21 @@ public class ExecutionGraph {
 	}
 
 	/**
-	 * This method cleans fields that are irrelevant for the archived execution attempt.
+	 * Returns the serializable ArchivedExecutionConfig
+	 * @return ArchivedExecutionConfig which may be null in case of errors
 	 */
-	public void prepareForArchiving() {
-		if (!state.isGloballyTerminalState()) {
-			throw new IllegalStateException("Can only archive the job from a terminal state");
-		}
-
-		// clear the non-serializable fields
-		restartStrategy = null;
-		slotProvider = null;
-		checkpointCoordinator = null;
-		executionContext = null;
-		kvStateLocationRegistry = null;
-
-		for (ExecutionJobVertex vertex : verticesInCreationOrder) {
-			vertex.prepareForArchiving();
-		}
-
-		intermediateResults.clear();
-		currentExecutions.clear();
-		requiredJarFiles.clear();
-		requiredClasspaths.clear();
-		jobStatusListeners.clear();
-		executionListeners.clear();
-
-		if (userClassLoader instanceof FlinkUserCodeClassLoader) {
-			try {
-				// close the classloader to free space of user jars immediately
-				// otherwise we have to wait until garbage collection
-				((FlinkUserCodeClassLoader) userClassLoader).close();
-			} catch (IOException e) {
-				LOG.warn("Failed to close the user classloader for job {}", jobID, e);
+	@Override
+	public ArchivedExecutionConfig getArchivedExecutionConfig() {
+		// create a summary of all relevant data accessed in the web interface's JobConfigHandler
+		try {
+			ExecutionConfig executionConfig = getSerializedExecutionConfig().deserializeValue(userClassLoader);
+			if (executionConfig != null) {
+				return executionConfig.archive();
 			}
-		}
-		userClassLoader = null;
-
-		isArchived = true;
-	}
-
-	/**
-	 * Returns the serializable ExecutionConfigSummary
-	 * @return ExecutionConfigSummary which may be null in case of errors
-	 */
-	public ExecutionConfigSummary getExecutionConfigSummary() {
-		return executionConfigSummary;
+		} catch (IOException | ClassNotFoundException e) {
+			LOG.error("Couldn't create ArchivedExecutionConfig for job {} ", jobID, e);
+		};
+		return null;
 	}
 
 	/**
@@ -1282,4 +1261,53 @@ public class ExecutionGraph {
 			}
 		}
 	}
+
+	@Override
+	public ArchivedExecutionGraph archive() {
+		Map<JobVertexID, OperatorCheckpointStats> operatorStats = new HashMap<>();
+		Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>();
+		List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>();
+		for (ExecutionJobVertex task : verticesInCreationOrder) {
+			ArchivedExecutionJobVertex archivedTask = task.archive();
+			archivedVerticesInCreationOrder.add(archivedTask);
+			archivedTasks.put(task.getJobVertexId(), archivedTask);
+			Option<OperatorCheckpointStats> statsOption = task.getCheckpointStats();
+			if (statsOption.isDefined()) {
+				operatorStats.put(task.getJobVertexId(), statsOption.get());
+			}
+		}
+
+		Option<JobCheckpointStats> jobStats;
+		if (getCheckpointStatsTracker() == null) {
+			jobStats = Option.empty();
+		} else {
+			jobStats = getCheckpointStatsTracker().getJobStats();
+		}
+
+		ArchivedCheckpointStatsTracker statsTracker = new ArchivedCheckpointStatsTracker(jobStats, operatorStats);
+
+		Map<String, SerializedValue<Object>> serializedUserAccumulators;
+		try {
+			serializedUserAccumulators = getAccumulatorsSerialized();
+		} catch (Exception e) {
+			LOG.warn("Error occurred while archiving user accumulators.", e);
+			serializedUserAccumulators = Collections.emptyMap();
+		}
+
+		return new ArchivedExecutionGraph(
+			getJobID(),
+			getJobName(),
+			archivedTasks,
+			archivedVerticesInCreationOrder,
+			stateTimestamps,
+			getState(),
+			getFailureCauseAsString(),
+			getJsonPlan(),
+			getAccumulatorResultsStringified(),
+			serializedUserAccumulators,
+			getArchivedExecutionConfig(),
+			isStoppable(),
+			statsTracker
+		);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index ead0852..e7f16a2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -28,7 +28,10 @@ import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
@@ -43,6 +46,7 @@ import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
+import scala.Option;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
@@ -51,7 +55,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-public class ExecutionJobVertex {
+public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable<ArchivedExecutionJobVertex> {
 
 	/** Use the same log for all ExecutionGraph classes */
 	private static final Logger LOG = ExecutionGraph.LOG;
@@ -197,10 +201,17 @@ public class ExecutionJobVertex {
 		return jobVertex;
 	}
 
+	@Override
+	public String getName() {
+		return getJobVertex().getName();
+	}
+
+	@Override
 	public int getParallelism() {
 		return parallelism;
 	}
 
+	@Override
 	public int getMaxParallelism() {
 		return maxParallelism;
 	}
@@ -209,10 +220,12 @@ public class ExecutionJobVertex {
 		return graph.getJobID();
 	}
 	
+	@Override
 	public JobVertexID getJobVertexId() {
 		return jobVertex.getID();
 	}
 	
+	@Override
 	public ExecutionVertex[] getTaskVertices() {
 		return taskVertices;
 	}
@@ -241,6 +254,7 @@ public class ExecutionJobVertex {
 		return numSubtasksInFinalState == parallelism;
 	}
 	
+	@Override
 	public ExecutionState getAggregateState() {
 		int[] num = new int[ExecutionState.values().length];
 		for (ExecutionVertex vertex : this.taskVertices) {
@@ -250,6 +264,16 @@ public class ExecutionJobVertex {
 		return getAggregateJobVertexState(num, parallelism);
 	}
 	
+	@Override
+	public Option<OperatorCheckpointStats> getCheckpointStats() {
+		CheckpointStatsTracker tracker = getGraph().getCheckpointStatsTracker();
+		if (tracker == null) {
+			return Option.empty();
+		} else {
+			return tracker.getOperatorStats(getJobVertexId());
+		}
+	}
+
 	//---------------------------------------------------------------------------------------------
 	
 	public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
@@ -371,36 +395,6 @@ public class ExecutionJobVertex {
 		}
 	}
 	
-	/**
-	 * This method cleans fields that are irrelevant for the archived execution attempt.
-	 */
-	public void prepareForArchiving() {
-		
-		for (ExecutionVertex vertex : taskVertices) {
-			vertex.prepareForArchiving();
-		}
-		
-		// clear intermediate results
-		inputs.clear();
-		producedDataSets = null;
-		
-		// reset shared groups
-		if (slotSharingGroup != null) {
-			slotSharingGroup.clearTaskAssignment();
-		}
-		if (coLocationGroup != null) {
-			coLocationGroup.resetConstraints();
-		}
-		
-		// reset splits and split assigner
-		splitAssigner = null;
-		if (inputSplits != null) {
-			for (int i = 0; i < inputSplits.length; i++) {
-				inputSplits[i] = null;
-			}
-		}
-	}
-	
 	//---------------------------------------------------------------------------------------------
 	//  Notifications
 	//---------------------------------------------------------------------------------------------
@@ -627,4 +621,9 @@ public class ExecutionJobVertex {
 			return ExecutionState.CREATED;
 		}
 	}
+
+	@Override
+	public ArchivedExecutionJobVertex archive() {
+		return new ArchivedExecutionJobVertex(this);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 4837803..96af91e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -42,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
@@ -72,7 +74,7 @@ import static org.apache.flink.runtime.execution.ExecutionState.FINISHED;
  * The ExecutionVertex is a parallel subtask of the execution. It may be executed once, or several times, each of
  * which time it spawns an {@link Execution}.
  */
-public class ExecutionVertex {
+public class ExecutionVertex implements AccessExecutionVertex, Archiveable<ArchivedExecutionVertex> {
 
 	private static final Logger LOG = ExecutionGraph.LOG;
 
@@ -176,6 +178,7 @@ public class ExecutionVertex {
 		return this.jobVertex.getJobVertex().getName();
 	}
 
+	@Override
 	public String getTaskNameWithSubtaskIndex() {
 		return this.taskNameWithSubtask;
 	}
@@ -188,6 +191,7 @@ public class ExecutionVertex {
 		return this.jobVertex.getMaxParallelism();
 	}
 
+	@Override
 	public int getParallelSubtaskIndex() {
 		return this.subTaskIndex;
 	}
@@ -207,18 +211,26 @@ public class ExecutionVertex {
 		return locationConstraint;
 	}
 
+	@Override
 	public Execution getCurrentExecutionAttempt() {
 		return currentExecution;
 	}
 
+	@Override
 	public ExecutionState getExecutionState() {
 		return currentExecution.getState();
 	}
 
+	@Override
 	public long getStateTimestamp(ExecutionState state) {
 		return currentExecution.getStateTimestamp(state);
 	}
 
+	@Override
+	public String getFailureCauseAsString() {
+		return ExceptionUtils.stringifyException(getFailureCause());
+	}
+
 	public Throwable getFailureCause() {
 		return currentExecution.getFailureCause();
 	}
@@ -227,10 +239,12 @@ public class ExecutionVertex {
 		return currentExecution.getAssignedResource();
 	}
 
+	@Override
 	public TaskManagerLocation getCurrentAssignedResourceLocation() {
 		return currentExecution.getAssignedResourceLocation();
 	}
 
+	@Override
 	public Execution getPriorExecutionAttempt(int attemptNumber) {
 		if (attemptNumber >= 0 && attemptNumber < priorExecutions.size()) {
 			return priorExecutions.get(attemptNumber);
@@ -240,6 +254,10 @@ public class ExecutionVertex {
 		}
 	}
 
+	List<Execution> getPriorExecutions() {
+		return priorExecutions;
+	}
+
 	public ExecutionGraph getExecutionGraph() {
 		return this.jobVertex.getGraph();
 	}
@@ -537,31 +555,6 @@ public class ExecutionVertex {
 		}
 	}
 
-	/**
-	 * This method cleans fields that are irrelevant for the archived execution attempt.
-	 */
-	public void prepareForArchiving() throws IllegalStateException {
-		Execution execution = currentExecution;
-
-		// sanity check
-		if (!execution.isFinished()) {
-			throw new IllegalStateException("Cannot archive ExecutionVertex that is not in a finished state.");
-		}
-
-		// prepare the current execution for archiving
-		execution.prepareForArchiving();
-
-		// prepare previous executions for archiving
-		for (Execution exec : priorExecutions) {
-			exec.prepareForArchiving();
-		}
-
-		// clear the unnecessary fields in this class
-		this.resultPartitions = null;
-		this.inputEdges = null;
-		this.locationConstraint = null;
-	}
-
 	public void cachePartitionInfo(PartialInputChannelDeploymentDescriptor partitionInfo){
 		getCurrentExecutionAttempt().cachePartitionInfo(partitionInfo);
 	}
@@ -708,4 +701,9 @@ public class ExecutionVertex {
 	public String toString() {
 		return getSimpleName();
 	}
+
+	@Override
+	public ArchivedExecutionVertex archive() {
+		return new ArchivedExecutionVertex(this);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
deleted file mode 100644
index ad4677f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/archive/ExecutionConfigSummary.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.executiongraph.archive;
-
-import org.apache.flink.api.common.ExecutionConfig;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.Map;
-
-/**
- * Serializable class which is created when archiving the job.
- * It can be used to display job information on the web interface
- * without having to keep the classloader around after job completion.
- */
-public class ExecutionConfigSummary implements Serializable {
-
-	private final String executionMode;
-	private final String restartStrategyDescription;
-	private final int parallelism;
-	private final boolean objectReuseEnabled;
-	private final Map<String, String> globalJobParameters;
-
-	public ExecutionConfigSummary(ExecutionConfig ec) {
-		executionMode = ec.getExecutionMode().name();
-		if (ec.getRestartStrategy() != null) {
-			restartStrategyDescription = ec.getRestartStrategy().getDescription();
-		} else {
-			restartStrategyDescription = "default";
-		}
-		parallelism = ec.getParallelism();
-		objectReuseEnabled = ec.isObjectReuseEnabled();
-		if (ec.getGlobalJobParameters() != null
-				&& ec.getGlobalJobParameters().toMap() != null) {
-			globalJobParameters = ec.getGlobalJobParameters().toMap();
-		} else {
-			globalJobParameters = Collections.emptyMap();
-		}
-	}
-
-	public String getExecutionMode() {
-		return executionMode;
-	}
-
-	public String getRestartStrategyDescription() {
-		return restartStrategyDescription;
-	}
-
-	public int getParallelism() {
-		return parallelism;
-	}
-
-	public boolean getObjectReuseEnabled() {
-		return objectReuseEnabled;
-	}
-
-	public Map<String, String> getGlobalJobParameters() {
-		return globalJobParameters;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 37a91b3..87df0d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -27,9 +27,9 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -164,7 +164,7 @@ public final class WebMonitorUtils {
 		}
 	}
 
-	public static JobDetails createDetailsForJob(ExecutionGraph job) {
+	public static JobDetails createDetailsForJob(AccessExecutionGraph job) {
 		JobStatus status = job.getState();
 
 		long started = job.getStatusTimestamp(JobStatus.CREATED);
@@ -174,11 +174,11 @@ public final class WebMonitorUtils {
 		long lastChanged = 0;
 		int numTotalTasks = 0;
 
-		for (ExecutionJobVertex ejv : job.getVerticesTopologically()) {
-			ExecutionVertex[] vertices = ejv.getTaskVertices();
+		for (AccessExecutionJobVertex ejv : job.getVerticesTopologically()) {
+			AccessExecutionVertex[] vertices = ejv.getTaskVertices();
 			numTotalTasks += vertices.length;
 
-			for (ExecutionVertex vertex : vertices) {
+			for (AccessExecutionVertex vertex : vertices) {
 				ExecutionState state = vertex.getExecutionState();
 				countsPerStatus[state.ordinal()]++;
 				lastChanged = Math.max(lastChanged, vertex.getStateTimestamp(state));

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index cca0124..8f3b82a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1685,12 +1685,9 @@ class JobManager(
           }(context.dispatcher))
 
           try {
-            eg.prepareForArchiving()
-
-            archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg))
+            archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg.archive()))
           } catch {
-            case t: Throwable => log.error(s"Could not prepare the execution graph $eg for " +
-              "archiving.", t)
+            case t: Throwable => log.error(s"Could not archive the execution graph $eg.", t)
           }
 
           futureOption

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
index 2d55b26..7f8fcd3 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/MemoryArchivist.scala
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.messages.accumulators._
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils
 import org.apache.flink.runtime.{FlinkActor, LogMessages}
 import org.apache.flink.runtime.messages.webmonitor._
-import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, ExecutionGraph}
 import org.apache.flink.runtime.messages.ArchiveMessages._
 import org.apache.flink.runtime.messages.JobManagerMessages._
 
@@ -66,7 +66,7 @@ class MemoryArchivist(private val max_entries: Int)
    * Map of execution graphs belonging to recently started jobs with the time stamp of the last
    * received job event. The insert order is preserved through a LinkedHashMap.
    */
-  protected val graphs = mutable.LinkedHashMap[JobID, ExecutionGraph]()
+  protected val graphs = mutable.LinkedHashMap[JobID, ArchivedExecutionGraph]()
 
   /* Counters for finished, canceled, and failed jobs */
   private[this] var finishedCnt: Int = 0

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
index c4e3f3e..435b736 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/ArchiveMessages.scala
@@ -19,14 +19,14 @@
 package org.apache.flink.runtime.messages
 
 import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.executiongraph.ExecutionGraph
+import org.apache.flink.runtime.executiongraph.{ArchivedExecutionGraph, ExecutionGraph}
 
 /**
  * This object contains the archive specific messages.
  */
 object ArchiveMessages {
   
-  case class ArchiveExecutionGraph(jobID: JobID, graph: ExecutionGraph)
+  case class ArchiveExecutionGraph(jobID: JobID, graph: ArchivedExecutionGraph)
 
   /**
    * Request the currently archived jobs in the archiver. The resulting response is [[ArchivedJobs]]
@@ -44,19 +44,19 @@ object ArchiveMessages {
    */
   case class RequestArchivedJob(jobID: JobID)
 
-  case class ArchivedJob(job: Option[ExecutionGraph])
+  case class ArchivedJob(job: Option[ArchivedExecutionGraph])
 
   /**
    * Response to [[RequestArchivedJobs]] message. The response contains the archived jobs.
    * @param jobs
    */
-  case class ArchivedJobs(jobs: Iterable[ExecutionGraph]){
-    def asJavaIterable: java.lang.Iterable[ExecutionGraph] = {
+  case class ArchivedJobs(jobs: Iterable[ArchivedExecutionGraph]){
+    def asJavaIterable: java.lang.Iterable[ArchivedExecutionGraph] = {
       import scala.collection.JavaConverters._
       jobs.asJava
     }
 
-    def asJavaCollection: java.util.Collection[ExecutionGraph] = {
+    def asJavaCollection: java.util.Collection[ArchivedExecutionGraph] = {
       import scala.collection.JavaConverters._
       jobs.asJavaCollection
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 4cf6a02..3df8c26 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -26,7 +26,7 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.akka.ListeningBehaviour
 import org.apache.flink.runtime.blob.BlobKey
 import org.apache.flink.runtime.client.{JobStatusMessage, SerializedJobExecutionResult}
-import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, ExecutionAttemptID, ExecutionGraph}
 import org.apache.flink.runtime.instance.{Instance, InstanceID}
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID
 import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, JobGraph, JobStatus, JobVertexID}
@@ -371,7 +371,7 @@ object JobManagerMessages {
    * @param jobID
    * @param executionGraph
    */
-  case class JobFound(jobID: JobID, executionGraph: ExecutionGraph) extends JobResponse
+  case class JobFound(jobID: JobID, executionGraph: AccessExecutionGraph) extends JobResponse
 
   /**
    * Denotes that there is no job with [[jobID]] retrievable. This message can be the response of

http://git-wip-us.apache.org/repos/asf/flink/blob/21e8e2dc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index ea4d322..0b2f4f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -81,7 +81,7 @@ public class CoordinatorShutdownTest {
 					new JobManagerMessages.RequestJob(testGraph.getJobID()),
 					timeout);
 			
-			ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
+			ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
 			
 			assertNotNull(graph);
 			graph.waitUntilFinished();
@@ -133,7 +133,7 @@ public class CoordinatorShutdownTest {
 					new JobManagerMessages.RequestJob(testGraph.getJobID()),
 					timeout);
 
-			ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
+			ExecutionGraph graph = (ExecutionGraph)((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
 
 			assertNotNull(graph);
 			graph.waitUntilFinished();


Mime
View raw message