flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [01/16] flink git commit: [FLINK-8449] [flip6] Extend OnCompletionActions to accept an SerializableExecutionGraph
Date Fri, 26 Jan 2018 14:41:27 GMT
Repository: flink
Updated Branches:
  refs/heads/master 976d004ce -> ac8225fd5


[FLINK-8449] [flip6] Extend OnCompletionActions to accept an SerializableExecutionGraph

This commit changes the OnCompletionActions interface such that it accepts an
ArchivedExecutionGraph instead of a plain JobResult. This allows to
archive the completed ExecutionGraph for further usage in the container
component of the JobMasterRunner.

This closes #5308.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8f9dbeca
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8f9dbeca
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8f9dbeca

Branch: refs/heads/master
Commit: 8f9dbeca8bbb8f74bc17410b2f39903ea1f95af1
Parents: 976d004
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Jan 25 17:03:39 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Jan 26 13:49:46 2018 +0100

----------------------------------------------------------------------
 .../common/accumulators/AccumulatorHelper.java  | 23 ++---
 .../FailedAccumulatorSerialization.java         | 73 ++++++++++++++++
 .../FailedAccumulatorSerializationTest.java     | 89 ++++++++++++++++++++
 .../flink/runtime/dispatcher/Dispatcher.java    | 41 ++++-----
 .../entrypoint/JobClusterEntrypoint.java        | 27 ++----
 .../executiongraph/AccessExecutionGraph.java    |  7 +-
 .../executiongraph/ArchivedExecutionGraph.java  | 59 +++++++++++--
 .../flink/runtime/executiongraph/ErrorInfo.java |  3 +-
 .../runtime/executiongraph/ExecutionGraph.java  | 59 ++++---------
 .../runtime/jobmanager/OnCompletionActions.java | 15 +---
 .../runtime/jobmaster/JobManagerRunner.java     | 21 +----
 .../flink/runtime/jobmaster/JobMaster.java      | 61 +-------------
 .../flink/runtime/jobmaster/JobResult.java      | 37 ++++++++
 .../minicluster/MiniClusterJobDispatcher.java   | 20 +----
 .../rest/handler/job/JobExceptionsHandler.java  |  2 +-
 .../handler/legacy/JobExceptionsHandler.java    |  2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  5 +-
 .../runtime/dispatcher/DispatcherTest.java      | 27 +++---
 .../ArchivedExecutionGraphTest.java             |  4 +-
 .../jobmaster/JobManagerRunnerMockTest.java     | 31 +++----
 .../flink/runtime/jobmaster/JobMasterTest.java  |  8 +-
 .../legacy/JobExceptionsHandlerTest.java        |  3 +-
 .../utils/ArchivedExecutionGraphBuilder.java    |  6 +-
 23 files changed, 377 insertions(+), 246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
index 3282302..78fb68b 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java
@@ -28,13 +28,16 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+/**
+ * Helper functions for the interaction with {@link Accumulator}.
+ */
 @Internal
 public class AccumulatorHelper {
 
 	/**
 	 * Merge two collections of accumulators. The second will be merged into the
 	 * first.
-	 * 
+	 *
 	 * @param target
 	 *            The collection of accumulators that will be updated
 	 * @param toMerge
@@ -59,7 +62,7 @@ public class AccumulatorHelper {
 	}
 
 	/**
-	 * Workaround method for type safety
+	 * Workaround method for type safety.
 	 */
 	private static <V, R extends Serializable> void mergeSingle(Accumulator<?, ?> target,
 															Accumulator<?, ?> toMerge) {
@@ -74,14 +77,13 @@ public class AccumulatorHelper {
 
 	/**
 	 * Compare both classes and throw {@link UnsupportedOperationException} if
-	 * they differ
+	 * they differ.
 	 */
 	@SuppressWarnings("rawtypes")
-	public static void compareAccumulatorTypes(Object name,
-												Class<? extends Accumulator> first,
-												Class<? extends Accumulator> second)
-			throws UnsupportedOperationException
-	{
+	public static void compareAccumulatorTypes(
+			Object name,
+			Class<? extends Accumulator> first,
+			Class<? extends Accumulator> second) throws UnsupportedOperationException {
 		if (first == null || second == null) {
 			throw new NullPointerException();
 		}
@@ -102,7 +104,7 @@ public class AccumulatorHelper {
 
 	/**
 	 * Transform the Map with accumulators into a Map containing only the
-	 * results
+	 * results.
 	 */
 	public static Map<String, Object> toResultMap(Map<String, Accumulator<?, ?>> accumulators) {
 		Map<String, Object> resultMap = new HashMap<String, Object>();
@@ -134,7 +136,7 @@ public class AccumulatorHelper {
 	public static Map<String, Accumulator<?, ?>> copy(Map<String, Accumulator<?, ?>> accumulators) {
 		Map<String, Accumulator<?, ?>> result = new HashMap<String, Accumulator<?, ?>>();
 
-		for(Map.Entry<String, Accumulator<?, ?>> entry: accumulators.entrySet()){
+		for (Map.Entry<String, Accumulator<?, ?>> entry: accumulators.entrySet()){
 			result.put(entry.getKey(), entry.getValue().clone());
 		}
 
@@ -172,5 +174,4 @@ public class AccumulatorHelper {
 
 		return accumulators;
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-core/src/main/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerialization.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerialization.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerialization.java
new file mode 100644
index 0000000..b208b9e
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerialization.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.accumulators;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+
+/**
+ * {@link Accumulator} implementation which indicates a serialization problem with the original
+ * accumulator. Accessing any of the {@link Accumulator} method will result in throwing the
+ * serialization exception.
+ *
+ * @param <V> type of the value
+ * @param <R> type of the accumulator result
+ */
+public class FailedAccumulatorSerialization<V, R extends Serializable> implements Accumulator<V, R> {
+	private static final long serialVersionUID = 6965908827065879760L;
+
+	private final Throwable throwable;
+
+	public FailedAccumulatorSerialization(Throwable throwable) {
+		this.throwable = Preconditions.checkNotNull(throwable);
+	}
+
+	public Throwable getThrowable() {
+		return throwable;
+	}
+
+	@Override
+	public void add(V value) {
+		ExceptionUtils.rethrow(throwable);
+	}
+
+	@Override
+	public R getLocalValue() {
+		ExceptionUtils.rethrow(throwable);
+		return null;
+	}
+
+	@Override
+	public void resetLocal() {
+		ExceptionUtils.rethrow(throwable);
+	}
+
+	@Override
+	public void merge(Accumulator<V, R> other) {
+		ExceptionUtils.rethrow(throwable);
+	}
+
+	@Override
+	public Accumulator<V, R> clone() {
+		ExceptionUtils.rethrow(throwable);
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-core/src/test/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerializationTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerializationTest.java b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerializationTest.java
new file mode 100644
index 0000000..7335d30
--- /dev/null
+++ b/flink-core/src/test/java/org/apache/flink/api/common/accumulators/FailedAccumulatorSerializationTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.accumulators;
+
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for the {@link FailedAccumulatorSerialization}.
+ */
+public class FailedAccumulatorSerializationTest extends TestLogger {
+
+	private static final IOException TEST_EXCEPTION = new IOException("Test exception");
+
+	/**
+	 * Tests that any method call will throw the contained throwable (wrapped in an
+	 * unchecked exception if it is checked).
+	 */
+	@Test
+	public void testMethodCallThrowsException() {
+		final FailedAccumulatorSerialization<Integer, Integer> accumulator = new FailedAccumulatorSerialization<>(TEST_EXCEPTION);
+
+		try {
+			accumulator.getLocalValue();
+		} catch (RuntimeException re) {
+			assertThat(ExceptionUtils.findThrowableWithMessage(re, TEST_EXCEPTION.getMessage()).isPresent(), is(true));
+		}
+
+		try {
+			accumulator.resetLocal();
+		} catch (RuntimeException re) {
+			assertThat(ExceptionUtils.findThrowableWithMessage(re, TEST_EXCEPTION.getMessage()).isPresent(), is(true));
+		}
+
+		try {
+			accumulator.add(1);
+		} catch (RuntimeException re) {
+			assertThat(ExceptionUtils.findThrowableWithMessage(re, TEST_EXCEPTION.getMessage()).isPresent(), is(true));
+		}
+
+		try {
+			accumulator.merge(new IntMinimum());
+		} catch (RuntimeException re) {
+			assertThat(ExceptionUtils.findThrowableWithMessage(re, TEST_EXCEPTION.getMessage()).isPresent(), is(true));
+		}
+	}
+
+	/**
+	 * Tests that the class can be serialized and deserialized using Java serialization.
+	 */
+	@Test
+	public void testSerialization() throws Exception {
+		final FailedAccumulatorSerialization<?, ?> accumulator = new FailedAccumulatorSerialization<>(TEST_EXCEPTION);
+
+		final byte[] serializedAccumulator = InstantiationUtil.serializeObject(accumulator);
+
+		final FailedAccumulatorSerialization<?, ?> deserializedAccumulator = InstantiationUtil.deserializeObject(serializedAccumulator, ClassLoader.getSystemClassLoader());
+
+		assertThat(deserializedAccumulator.getThrowable(), is(instanceOf(TEST_EXCEPTION.getClass())));
+		assertThat(deserializedAccumulator.getThrowable().getMessage(), is(equalTo(TEST_EXCEPTION.getMessage())));
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index d0b1591..0271913 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
@@ -485,6 +486,19 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		fatalErrorHandler.onFatalError(throwable);
 	}
 
+	private void jobReachedGloballyTerminalState(AccessExecutionGraph accessExecutionGraph) {
+		final JobResult jobResult = JobResult.createFrom(accessExecutionGraph);
+
+		jobExecutionResultCache.put(jobResult);
+		final JobID jobId = accessExecutionGraph.getJobID();
+
+		try {
+			removeJob(jobId, true);
+		} catch (Exception e) {
+			log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
+		}
+	}
+
 	protected abstract JobManagerRunner createJobManagerRunner(
 		ResourceID resourceId,
 		JobGraph jobGraph,
@@ -607,31 +621,10 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		}
 
 		@Override
-		public void jobFinished(JobResult result) {
-			log.info("Job {} finished.", jobId);
+		public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
+			log.info("Job {} reached globally terminal state {}.", jobId, executionGraph.getState());
 
-			runAsync(() -> {
-				jobExecutionResultCache.put(result);
-				try {
-					removeJob(jobId, true);
-				} catch (Exception e) {
-					log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
-				}
-			});
-		}
-
-		@Override
-		public void jobFailed(JobResult result) {
-			log.info("Job {} failed.", jobId);
-
-			runAsync(() -> {
-				jobExecutionResultCache.put(result);
-				try {
-					removeJob(jobId, true);
-				} catch (Exception e) {
-					log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
-				}
-			});
+			runAsync(() -> Dispatcher.this.jobReachedGloballyTerminalState(executionGraph));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index ede8d13..b90253a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -25,6 +25,8 @@ import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -34,7 +36,6 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRestEndpoint;
-import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
@@ -54,7 +55,6 @@ import org.apache.flink.util.ConfigurationException;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedThrowable;
 
 import akka.actor.ActorSystem;
 
@@ -62,8 +62,6 @@ import javax.annotation.Nullable;
 
 import java.util.concurrent.Executor;
 
-import static org.apache.flink.util.Preconditions.checkArgument;
-
 /**
  * Base class for per-job cluster entry points.
  */
@@ -298,23 +296,16 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 		}
 
 		@Override
-		public void jobFinished(JobResult result) {
+		public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
 			LOG.info("Job({}) finished.", jobId);
 
-			shutDownAndTerminate(true, ApplicationStatus.SUCCEEDED, null);
-		}
-
-		@Override
-		public void jobFailed(JobResult result) {
-			checkArgument(result.getSerializedThrowable().isPresent());
-
-			final SerializedThrowable serializedThrowable = result.getSerializedThrowable().get();
-
-			final String errorMessage = serializedThrowable.getMessage();
+			final ErrorInfo errorInfo = executionGraph.getFailureInfo();
 
-			LOG.info("Job({}) failed: {}.", jobId, errorMessage);
-
-			shutDownAndTerminate(true, ApplicationStatus.FAILED, errorMessage);
+			if (errorInfo == null) {
+				shutDownAndTerminate(true, ApplicationStatus.SUCCEEDED, null);
+			} else {
+				shutDownAndTerminate(true, ApplicationStatus.FAILED, errorInfo.getExceptionAsString());
+			}
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 8d1fa1d..cc56209 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
@@ -28,7 +29,6 @@ import org.apache.flink.util.SerializedValue;
 
 import javax.annotation.Nullable;
 
-import java.io.IOException;
 import java.util.Map;
 
 /**
@@ -103,7 +103,7 @@ public interface AccessExecutionGraph {
 	Iterable<? extends AccessExecutionVertex> getAllExecutionVertices();
 
 	/**
-	 * Returns the timestamp for the given {@link JobStatus}
+	 * Returns the timestamp for the given {@link JobStatus}.
 	 *
 	 * @param status status for which the timestamp should be returned
 	 * @return timestamp for the given job status
@@ -154,9 +154,8 @@ public interface AccessExecutionGraph {
 	 * Returns a map containing the serialized values of user-defined accumulators.
 	 *
 	 * @return map containing serialized values of user-defined accumulators
-	 * @throws IOException indicates that the serialization has failed
 	 */
-	Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException;
+	Map<String, SerializedValue<Object>> getAccumulatorsSerialized();
 
 	/**
 	 * Returns whether this execution graph was archived.

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 20c2c8f..d285b20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
@@ -30,12 +31,17 @@ import org.apache.flink.util.SerializedValue;
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 
+/**
+ * An archived execution graph represents a serializable form of the {@link ExecutionGraph}.
+ */
 public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializable {
 
 	private static final long serialVersionUID = 7231383912742578428L;
@@ -47,10 +53,10 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 	/** The name of the original job graph. */
 	private final String jobName;
 
-	/** All job vertices that are part of this graph */
+	/** All job vertices that are part of this graph. */
 	private final Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
 
-	/** All vertices, in the order in which they were created **/
+	/** All vertices, in the order in which they were created. **/
 	private final List<ArchivedExecutionJobVertex> verticesInCreationOrder;
 
 	/**
@@ -65,7 +71,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 
 	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
 
-	/** Current status of the job execution */
+	/** Current status of the job execution. */
 	private final JobStatus state;
 
 	/**
@@ -142,6 +148,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 		return state;
 	}
 
+	@Nullable
 	@Override
 	public ErrorInfo getFailureInfo() {
 		return failureCause;
@@ -253,12 +260,10 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 
 		private int currPos;
 
-
 		public AllVerticesIterator(Iterator<ArchivedExecutionJobVertex> jobVertices) {
 			this.jobVertices = jobVertices;
 		}
 
-
 		@Override
 		public boolean hasNext() {
 			while (true) {
@@ -291,4 +296,48 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
 			throw new UnsupportedOperationException();
 		}
 	}
+
+	/**
+	 * Create a {@link ArchivedExecutionGraph} from the given {@link ExecutionGraph}.
+	 *
+	 * @param executionGraph to create the ArchivedExecutionGraph from
+	 * @return ArchivedExecutionGraph created from the given ExecutionGraph
+	 */
+	public static ArchivedExecutionGraph createFrom(ExecutionGraph executionGraph) {
+		final int numberVertices = executionGraph.getTotalNumberOfVertices();
+
+		Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>(numberVertices);
+		List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>(numberVertices);
+
+		for (ExecutionJobVertex task : executionGraph.getVerticesTopologically()) {
+			ArchivedExecutionJobVertex archivedTask = task.archive();
+			archivedVerticesInCreationOrder.add(archivedTask);
+			archivedTasks.put(task.getJobVertexId(), archivedTask);
+		}
+
+		final Map<String, SerializedValue<Object>> serializedUserAccumulators = executionGraph.getAccumulatorsSerialized();
+
+		final long[] timestamps = new long[JobStatus.values().length];
+
+		for (JobStatus jobStatus : JobStatus.values()) {
+			final int ordinal = jobStatus.ordinal();
+			timestamps[ordinal] = executionGraph.getStatusTimestamp(jobStatus);
+		}
+
+		return new ArchivedExecutionGraph(
+			executionGraph.getJobID(),
+			executionGraph.getJobName(),
+			archivedTasks,
+			archivedVerticesInCreationOrder,
+			timestamps,
+			executionGraph.getState(),
+			executionGraph.getFailureInfo(),
+			executionGraph.getJsonPlan(),
+			executionGraph.getAccumulatorResultsStringified(),
+			serializedUserAccumulators,
+			executionGraph.getArchivedExecutionConfig(),
+			executionGraph.isStoppable(),
+			executionGraph.getCheckpointCoordinatorConfiguration(),
+			executionGraph.getCheckpointStatsSnapshot());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
index 9fe569f..311effb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ErrorInfo.java
@@ -34,7 +34,6 @@ public class ErrorInfo implements Serializable {
 	private final SerializedThrowable exception;
 
 	private final long timestamp;
-
 	public ErrorInfo(Throwable exception, long timestamp) {
 		Preconditions.checkNotNull(exception);
 		Preconditions.checkArgument(timestamp > 0);
@@ -48,7 +47,7 @@ public class ErrorInfo implements Serializable {
 	 * Returns the serialized form of the original exception.
 	 */
 	public SerializedThrowable getException() {
-		return this.exception;
+		return exception;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d187faa..188f6da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -19,12 +19,12 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
+import org.apache.flink.api.common.accumulators.FailedAccumulatorSerialization;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
@@ -51,7 +51,6 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -62,6 +61,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateBackend;
@@ -148,7 +148,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * local failover (meaning there is a concurrent global failover), the failover strategy has to
  * yield before the global failover.
  */
-public class ExecutionGraph implements AccessExecutionGraph, Archiveable<ArchivedExecutionGraph> {
+public class ExecutionGraph implements AccessExecutionGraph {
 
 	/** In place updater for the execution graph's current state. Avoids having to use an
 	 * AtomicReference and thus makes the frequent read access a bit faster */
@@ -761,16 +761,27 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/**
 	 * Gets a serialized accumulator map.
 	 * @return The accumulator map with serialized accumulator values.
-	 * @throws IOException
 	 */
 	@Override
-	public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() throws IOException {
+	public Map<String, SerializedValue<Object>> getAccumulatorsSerialized() {
 
 		Map<String, Accumulator<?, ?>> accumulatorMap = aggregateUserAccumulators();
 
 		Map<String, SerializedValue<Object>> result = new HashMap<>(accumulatorMap.size());
 		for (Map.Entry<String, Accumulator<?, ?>> entry : accumulatorMap.entrySet()) {
-			result.put(entry.getKey(), new SerializedValue<>(entry.getValue().getLocalValue()));
+
+			try {
+				final SerializedValue<Object> serializedValue = new SerializedValue<>(entry.getValue().getLocalValue());
+				result.put(entry.getKey(), serializedValue);
+			} catch (IOException ioe) {
+				LOG.error("Could not serialize accumulator " + entry.getKey() + '.', ioe);
+
+				try {
+					result.put(entry.getKey(), new SerializedValue<>(new FailedAccumulatorSerialization(ioe)));
+				} catch (IOException e) {
+					throw new RuntimeException("It should never happen that we cannot serialize the accumulator serialization exception.", e);
+				}
+			}
 		}
 
 		return result;
@@ -1687,40 +1698,4 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			}
 		}
 	}
-
-	@Override
-	public ArchivedExecutionGraph archive() {
-		Map<JobVertexID, ArchivedExecutionJobVertex> archivedTasks = new HashMap<>(verticesInCreationOrder.size());
-		List<ArchivedExecutionJobVertex> archivedVerticesInCreationOrder = new ArrayList<>(verticesInCreationOrder.size());
-
-		for (ExecutionJobVertex task : verticesInCreationOrder) {
-			ArchivedExecutionJobVertex archivedTask = task.archive();
-			archivedVerticesInCreationOrder.add(archivedTask);
-			archivedTasks.put(task.getJobVertexId(), archivedTask);
-		}
-
-		Map<String, SerializedValue<Object>> serializedUserAccumulators;
-		try {
-			serializedUserAccumulators = getAccumulatorsSerialized();
-		} catch (Exception e) {
-			LOG.warn("Error occurred while archiving user accumulators.", e);
-			serializedUserAccumulators = Collections.emptyMap();
-		}
-
-		return new ArchivedExecutionGraph(
-			getJobID(),
-			getJobName(),
-			archivedTasks,
-			archivedVerticesInCreationOrder,
-			stateTimestamps,
-			getState(),
-			failureInfo,
-			getJsonPlan(),
-			getAccumulatorResultsStringified(),
-			serializedUserAccumulators,
-			getArchivedExecutionConfig(),
-			isStoppable(),
-			getCheckpointCoordinatorConfiguration(),
-			getCheckpointStatsSnapshot());
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
index 149ea0f..66ca4ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 
 /**
  * Interface for completion actions once a Flink job has reached
@@ -27,18 +27,11 @@ import org.apache.flink.runtime.jobmaster.JobResult;
 public interface OnCompletionActions {
 
 	/**
-	 * Job finished successfully.
+	 * Job reached a globally terminal state.
 	 *
-	 * @param result of the job execution
+	 * @param executionGraph serializable execution graph
 	 */
-	void jobFinished(JobResult result);
-
-	/**
-	 * Job failed with an exception.
-	 *
-	 * @param result The result of the job carrying the failure cause.
-	 */
-	void jobFailed(JobResult result);
+	void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph);
 
 	/**
 	 * Job was finished by another JobMaster.

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index 4833cbd..5af6c67 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
@@ -251,30 +252,14 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	 * Job completion notification triggered by JobManager.
 	 */
 	@Override
-	public void jobFinished(JobResult result) {
+	public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
 		try {
 			unregisterJobFromHighAvailability();
 			shutdownInternally();
 		}
 		finally {
 			if (toNotifyOnComplete != null) {
-				toNotifyOnComplete.jobFinished(result);
-			}
-		}
-	}
-
-	/**
-	 * Job completion notification triggered by JobManager.
-	 */
-	@Override
-	public void jobFailed(JobResult result) {
-		try {
-			unregisterJobFromHighAvailability();
-			shutdownInternally();
-		}
-		finally {
-			if (toNotifyOnComplete != null) {
-				toNotifyOnComplete.jobFailed(result);
+				toNotifyOnComplete.jobReachedGloballyTerminalState(executionGraph);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 46518eb..ef99d52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -34,13 +34,13 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -100,8 +100,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.SerializedThrowable;
-import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 
@@ -787,7 +785,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 	@Override
 	public CompletableFuture<AccessExecutionGraph> requestArchivedExecutionGraph(Time timeout) {
-		return CompletableFuture.completedFuture(executionGraph.archive());
+		return CompletableFuture.completedFuture(ArchivedExecutionGraph.createFrom(executionGraph));
 	}
 
 	@Override
@@ -989,60 +987,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			@Nullable final Throwable error) {
 		validateRunsInMainThread();
 
-		final JobID jobID = executionGraph.getJobID();
-		final String jobName = executionGraph.getJobName();
-		final JobResult.Builder builder = new JobResult.Builder()
-			.jobId(jobID)
-			.netRuntime(0);
-
 		if (newJobStatus.isGloballyTerminalState()) {
-			switch (newJobStatus) {
-				case FINISHED:
-					try {
-						// TODO get correct job duration
-						// job done, let's get the accumulators
-						final Map<String, SerializedValue<Object>> accumulatorsSerialized = executionGraph.getAccumulatorsSerialized();
-						builder.accumulatorResults(accumulatorsSerialized);
-						executor.execute(() -> jobCompletionActions.jobFinished(builder.build()));
-					}
-					catch (Exception e) {
-						log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
-
-						final JobExecutionException exception = new JobExecutionException(jobID,
-								"Failed to retrieve accumulator results. " +
-								"The job is registered as 'FINISHED (successful), but this notification describes " +
-								"a failure, since the resulting accumulators could not be fetched.", e);
-
-						executor.execute(() -> jobCompletionActions.jobFailed(builder
-							.serializedThrowable(new SerializedThrowable(exception))
-							.build()));
-					}
-					break;
-
-				case CANCELED: {
-					final JobExecutionException exception = new JobExecutionException(
-						jobID, "Job was cancelled.", new Exception("The job was cancelled"));
-
-					executor.execute(() -> jobCompletionActions.jobFailed(builder
-						.serializedThrowable(new SerializedThrowable(exception))
-						.build()));
-					break;
-				}
-
-				case FAILED: {
-					final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
-					final JobExecutionException exception = new JobExecutionException(
-							jobID, "Job execution failed.", unpackedError);
-					executor.execute(() -> jobCompletionActions.jobFailed(builder
-						.serializedThrowable(new SerializedThrowable(exception))
-						.build()));
-					break;
-				}
-
-				default:
-					// this can happen only if the enum is buggy
-					throw new IllegalStateException(newJobStatus.toString());
-			}
+			final ArchivedExecutionGraph archivedExecutionGraph = ArchivedExecutionGraph.createFrom(executionGraph);
+			executor.execute(() -> jobCompletionActions.jobReachedGloballyTerminalState(archivedExecutionGraph));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
index 4a409d5..5a5e713 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.SerializedValue;
 
@@ -138,4 +141,38 @@ public class JobResult implements Serializable {
 		}
 	}
 
+	/**
+	 * Creates the {@link JobResult} from the given {@link AccessExecutionGraph} which
+	 * must be in a globally terminal state.
+	 *
+	 * @param accessExecutionGraph to create the JobResult from
+	 * @return JobResult of the given AccessExecutionGraph
+	 */
+	public static JobResult createFrom(AccessExecutionGraph accessExecutionGraph) {
+		final JobID jobId = accessExecutionGraph.getJobID();
+		final JobStatus jobStatus = accessExecutionGraph.getState();
+
+		checkArgument(
+			jobStatus.isGloballyTerminalState(),
+			"The job " + accessExecutionGraph.getJobName() + '(' + jobId + ") is not in a globally " +
+				"terminal state. It is in state " + jobStatus + '.');
+
+		final JobResult.Builder builder = new JobResult.Builder();
+		builder.jobId(jobId);
+
+		final long netRuntime = accessExecutionGraph.getStatusTimestamp(jobStatus) - accessExecutionGraph.getStatusTimestamp(JobStatus.CREATED);
+		builder.netRuntime(netRuntime);
+		builder.accumulatorResults(accessExecutionGraph.getAccumulatorsSerialized());
+
+		if (jobStatus != JobStatus.FINISHED) {
+			final ErrorInfo errorInfo = accessExecutionGraph.getFailureInfo();
+
+			if (errorInfo != null) {
+				builder.serializedThrowable(errorInfo.getException());
+			}
+		}
+
+		return builder.build();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index b9d76da..7e49cef 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -361,12 +362,7 @@ public class MiniClusterJobDispatcher {
 		}
 
 		@Override
-		public void jobFinished(JobResult result) {
-			decrementCheckAndCleanup();
-		}
-
-		@Override
-		public void jobFailed(JobResult result) {
+		public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
 			decrementCheckAndCleanup();
 		}
 
@@ -414,16 +410,8 @@ public class MiniClusterJobDispatcher {
 		}
 
 		@Override
-		public void jobFinished(JobResult result) {
-			this.result = result;
-			jobMastersToWaitFor.countDown();
-		}
-
-		@Override
-		public void jobFailed(JobResult result) {
-			checkArgument(result.getSerializedThrowable().isPresent());
-
-			this.result = result;
+		public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
+			this.result = JobResult.createFrom(executionGraph);
 			jobMastersToWaitFor.countDown();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 70b9d35..63dc604 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -71,7 +71,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExcep
 		ErrorInfo rootException = executionGraph.getFailureInfo();
 		String rootExceptionMessage = null;
 		Long rootTimestamp = null;
-		if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+		if (rootException != null) {
 			rootExceptionMessage = rootException.getExceptionAsString();
 			rootTimestamp = rootException.getTimestamp();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
index a6bae86..7b1487c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
@@ -93,7 +93,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 
 		// most important is the root failure cause
 		ErrorInfo rootException = graph.getFailureInfo();
-		if (rootException != null && !rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+		if (rootException != null) {
 			gen.writeStringField("root-exception", rootException.getExceptionAsString());
 			gen.writeNumberField("timestamp", rootException.getTimestamp());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 325e955..b795609 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1727,7 +1727,10 @@ class JobManager(
           }(context.dispatcher))
 
           try {
-            archive ! decorateMessage(ArchiveExecutionGraph(jobID, eg.archive()))
+            archive ! decorateMessage(
+              ArchiveExecutionGraph(
+                jobID,
+                ArchivedExecutionGraph.createFrom(eg)))
           } catch {
             case t: Throwable => log.warn(s"Could not archive the execution graph $eg.", t)
           }

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index b5fcd18..76a3c98 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -26,11 +26,14 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
@@ -45,6 +48,7 @@ import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
@@ -54,7 +58,6 @@ import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -270,11 +273,13 @@ public class DispatcherTest extends TestLogger {
 		final JobID failedJobId = new JobID();
 		onCompletionActions = dispatcher.new DispatcherOnCompleteActions(failedJobId);
 
-		onCompletionActions.jobFailed(new JobResult.Builder()
-			.jobId(failedJobId)
-			.serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
-			.netRuntime(Long.MAX_VALUE)
-			.build());
+		final ArchivedExecutionGraph failedExecutionGraph = new ArchivedExecutionGraphBuilder()
+			.setJobID(failedJobId)
+			.setState(JobStatus.FAILED)
+			.setFailureCause(new ErrorInfo(new RuntimeException("expected"), 1L))
+			.build();
+
+		onCompletionActions.jobReachedGloballyTerminalState(failedExecutionGraph);
 
 		assertThat(
 			dispatcherGateway.isJobExecutionResultPresent(failedJobId, TIMEOUT).get(),
@@ -288,10 +293,12 @@ public class DispatcherTest extends TestLogger {
 		final JobID successJobId = new JobID();
 		onCompletionActions = dispatcher.new DispatcherOnCompleteActions(successJobId);
 
-		onCompletionActions.jobFinished(new JobResult.Builder()
-			.jobId(successJobId)
-			.netRuntime(Long.MAX_VALUE)
-			.build());
+		final ArchivedExecutionGraph succeededExecutionGraph = new ArchivedExecutionGraphBuilder()
+			.setJobID(successJobId)
+			.setState(JobStatus.FINISHED)
+			.build();
+
+		onCompletionActions.jobReachedGloballyTerminalState(succeededExecutionGraph);
 
 		assertThat(
 			dispatcherGateway.isJobExecutionResultPresent(successJobId, TIMEOUT).get(),

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 5635763..8bc5170 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -142,14 +142,14 @@ public class ArchivedExecutionGraphTest extends TestLogger {
 
 	@Test
 	public void testArchive() throws IOException, ClassNotFoundException {
-		ArchivedExecutionGraph archivedGraph = runtimeGraph.archive();
+		ArchivedExecutionGraph archivedGraph = ArchivedExecutionGraph.createFrom(runtimeGraph);
 
 		compareExecutionGraph(runtimeGraph, archivedGraph);
 	}
 
 	@Test
 	public void testSerialization() throws IOException, ClassNotFoundException {
-		ArchivedExecutionGraph archivedGraph = runtimeGraph.archive();
+		ArchivedExecutionGraph archivedGraph = ArchivedExecutionGraph.createFrom(runtimeGraph);
 
 		verifySerializability(archivedGraph);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index 245ea27..a69235a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -24,20 +24,23 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.BlobStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.testutils.category.Flip6;
-import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -176,7 +179,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
 		assertTrue(!jobCompletion.isJobFinished());
 
 		// runner been told by JobManager that job is finished
-		runner.jobFinished(mock(JobResult.class));
+		runner.jobReachedGloballyTerminalState(mock(ArchivedExecutionGraph.class));
 
 		assertTrue(jobCompletion.isJobFinished());
 		assertFalse(jobCompletion.isJobFinishedByOther());
@@ -195,11 +198,13 @@ public class JobManagerRunnerMockTest extends TestLogger {
 		verify(jobManager).start(eq(jobMasterId), any(Time.class));
 		assertTrue(!jobCompletion.isJobFinished());
 
+		final ArchivedExecutionGraph failedExecutionGraph = new ArchivedExecutionGraphBuilder()
+			.setFailureCause(new ErrorInfo(new Exception("failed manually"), 1L))
+			.setState(JobStatus.FAILED)
+			.build();
+
 		// runner been told by JobManager that job is failed
-		runner.jobFailed(new JobResult.Builder()
-			.jobId(new JobID())
-			.serializedThrowable(new SerializedThrowable(new Exception("failed manually")))
-			.build());
+		runner.jobReachedGloballyTerminalState(failedExecutionGraph);
 
 		assertTrue(jobCompletion.isJobFailed());
 		verify(leaderElectionService).stop();
@@ -250,19 +255,15 @@ public class JobManagerRunnerMockTest extends TestLogger {
 		private volatile boolean finishedByOther;
 
 		@Override
-		public void jobFinished(JobResult result) {
+		public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
 			checkArgument(!isJobFinished(), "job finished already");
 			checkArgument(!isJobFailed(), "job failed already");
 
-			this.result = result;
-		}
-
-		@Override
-		public void jobFailed(JobResult result) {
-			checkArgument(!isJobFinished(), "job finished already");
-			checkArgument(!isJobFailed(), "job failed already");
+			this.result = JobResult.createFrom(executionGraph);
 
-			this.failedCause = result.getSerializedThrowable().get();
+			if (!result.isSuccess()) {
+				failedCause = result.getSerializedThrowable().get();
+			}
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index d77a1d4..d20e1fe 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
@@ -265,12 +266,7 @@ public class JobMasterTest extends TestLogger {
 	private static final class NoOpOnCompletionActions implements OnCompletionActions {
 
 		@Override
-		public void jobFinished(final JobResult result) {
-
-		}
-
-		@Override
-		public void jobFailed(final JobResult result) {
+		public void jobReachedGloballyTerminalState(ArchivedExecutionGraph executionGraph) {
 
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
index 0e96f36..7381a59 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
@@ -42,7 +43,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the JobExceptionsHandler.
  */
-public class JobExceptionsHandlerTest {
+public class JobExceptionsHandlerTest extends TestLogger {
 
 	@Test
 	public void testArchiver() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/8f9dbeca/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
index 68077ba..ee7ceda 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
@@ -117,9 +117,13 @@ public class ArchivedExecutionGraphBuilder {
 	}
 
 	public ArchivedExecutionGraph build() {
-		Preconditions.checkNotNull(tasks, "Tasks must not be null.");
 		JobID jobID = this.jobID != null ? this.jobID : new JobID();
 		String jobName = this.jobName != null ? this.jobName : "job_" + RANDOM.nextInt();
+
+		if (tasks == null) {
+			tasks = Collections.emptyMap();
+		}
+
 		return new ArchivedExecutionGraph(
 			jobID,
 			jobName,


Mime
View raw message