flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/4] flink git commit: [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher
Date Thu, 11 Jan 2018 11:18:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master 6c078c0e5 -> 2ee4d06ac


[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults

[FLINK-8234][flip6] Rename JobExecutionResult -> JobResult

[FLINK-8234][flip6] Update MiniClusterJobDispatcher

Do not store job failure exception in a separate field because the JobResult
already contains the exception.

[FLINK-8234][flip6] Make JobResult Serializable

[FLINK-8234][flip6] Add Javadoc to JobResult builder

[FLINK-8234][flip6] Add Javadoc to JobResult#serializedThrowable

[FLINK-8234][flip6] Wrap JobResults in SoftReferences

Wrap instances of JobResult stored in JobExecutionResultCache in SoftReferences
so that the GC can free them according to memory demand.

[FLINK-8234][flip6] Fix checkstyle violations

[FLINK-8234][flip6] Add Javadoc to JobResult

This closes #5184.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/08e550c9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/08e550c9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/08e550c9

Branch: refs/heads/master
Commit: 08e550c98674738ab883ea84c0350093c9765ab6
Parents: 8353123
Author: gyao <gary@data-artisans.com>
Authored: Tue Dec 19 18:58:53 2017 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Jan 11 12:12:50 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java    |  71 +++++++---
 .../dispatcher/JobExecutionResultCache.java     |  92 ++++++++++++
 .../entrypoint/JobClusterEntrypoint.java        |  12 +-
 .../runtime/jobmanager/OnCompletionActions.java |  10 +-
 .../runtime/jobmaster/JobManagerRunner.java     |   7 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  24 ++--
 .../flink/runtime/jobmaster/JobResult.java      | 141 +++++++++++++++++++
 .../JobExecutionResultGoneException.java        |  36 +++++
 .../minicluster/MiniClusterJobDispatcher.java   |  41 ++++--
 .../runtime/webmonitor/RestfulGateway.java      |  45 ++++++
 .../runtime/dispatcher/DispatcherTest.java      |  67 +++++++++
 .../dispatcher/JobExecutionResultCacheTest.java |  93 ++++++++++++
 .../jobmaster/JobManagerRunnerMockTest.java     |  29 ++--
 .../flink/runtime/jobmaster/JobMasterTest.java  |   5 +-
 .../flink/runtime/jobmaster/JobResultTest.java  |  72 ++++++++++
 15 files changed, 680 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index ea3a6ad..299b315 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -39,10 +38,12 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.JobExecutionResultGoneException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -60,6 +61,7 @@ import org.apache.flink.util.Preconditions;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.lang.ref.SoftReference;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -97,6 +99,8 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 
 	private final LeaderElectionService leaderElectionService;
 
+	private final JobExecutionResultCache jobExecutionResultCache = new JobExecutionResultCache();
+
 	@Nullable
 	protected final String restAddress;
 
@@ -357,6 +361,36 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		return CompletableFuture.completedFuture(jobManagerServices.blobServer.getPort());
 	}
 
+	@Override
+	public CompletableFuture<JobResult> getJobExecutionResult(
+			final JobID jobId,
+			final Time timeout) {
+
+		final SoftReference<JobResult> jobResultRef = jobExecutionResultCache.get(jobId);
+		if (jobResultRef == null) {
+			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+		} else {
+			final JobResult jobResult = jobResultRef.get();
+			if (jobResult == null) {
+				return FutureUtils.completedExceptionally(new JobExecutionResultGoneException(jobId));
+			} else {
+				return CompletableFuture.completedFuture(jobResult);
+			}
+		}
+	}
+
+	@Override
+	public CompletableFuture<Boolean> isJobExecutionResultPresent(
+			final JobID jobId,
+			final Time timeout) {
+
+		final boolean jobExecutionResultPresent = jobExecutionResultCache.contains(jobId);
+		if (!jobManagerRunners.containsKey(jobId) && !jobExecutionResultPresent) {
+			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+		}
+		return CompletableFuture.completedFuture(jobExecutionResultPresent);
+	}
+
 	/**
 	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
 	 * the data will also be removed from HA.
@@ -549,38 +583,41 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	// Utility classes
 	//------------------------------------------------------
 
-	private class DispatcherOnCompleteActions implements OnCompletionActions {
+	@VisibleForTesting
+	class DispatcherOnCompleteActions implements OnCompletionActions {
 
 		private final JobID jobId;
 
-		private DispatcherOnCompleteActions(JobID jobId) {
+		DispatcherOnCompleteActions(JobID jobId) {
 			this.jobId = Preconditions.checkNotNull(jobId);
 		}
 
 		@Override
-		public void jobFinished(JobExecutionResult result) {
+		public void jobFinished(JobResult result) {
 			log.info("Job {} finished.", jobId);
 
 			runAsync(() -> {
-					try {
-						removeJob(jobId, true);
-					} catch (Exception e) {
-						log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
-					}
-				});
+				jobExecutionResultCache.put(result);
+				try {
+					removeJob(jobId, true);
+				} catch (Exception e) {
+					log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
+				}
+			});
 		}
 
 		@Override
-		public void jobFailed(Throwable cause) {
+		public void jobFailed(JobResult result) {
 			log.info("Job {} failed.", jobId);
 
 			runAsync(() -> {
-					try {
-						removeJob(jobId, true);
-					} catch (Exception e) {
-						log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
-					}
-				});
+				jobExecutionResultCache.put(result);
+				try {
+					removeJob(jobId, true);
+				} catch (Exception e) {
+					log.warn("Could not properly remove job {} from the dispatcher.", jobId, e);
+				}
+			});
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
new file mode 100644
index 0000000..6d3dc55
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCache.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+import java.lang.ref.SoftReference;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Caches {@link JobResult}s by their job id.
+ *
+ * <p>Entries are cached for a finite time. However, the JobResults are wrapped in
+ * {@link SoftReference}s so that the GC can free them according to memory demand.
+ */
+class JobExecutionResultCache {
+
+	private static final int MAX_RESULT_CACHE_DURATION_SECONDS = 300;
+
+	private final Cache<JobID, SoftReference<JobResult>>
+		jobExecutionResultCache =
+		CacheBuilder.newBuilder()
+			.expireAfterWrite(MAX_RESULT_CACHE_DURATION_SECONDS, TimeUnit.SECONDS)
+			.build();
+
+	/**
+	 * Adds a {@link JobResult} to the cache.
+	 *
+	 * @param result The entry to be added to the cache.
+	 */
+	public void put(final JobResult result) {
+		assertJobExecutionResultNotCached(result.getJobId());
+		jobExecutionResultCache.put(result.getJobId(), new SoftReference<>(result));
+	}
+
+	/**
+	 * Returns {@code true} if the cache contains a {@link JobResult} for the specified
+	 * {@link JobID}.
+	 *
+	 * @param jobId The job id for which the presence of the {@link JobResult} should be tested.
+	 * @return {@code true} if the cache contains an entry, {@code false} otherwise
+	 */
+	public boolean contains(final JobID jobId) {
+		return jobExecutionResultCache.getIfPresent(jobId) != null;
+	}
+
+	/**
+	 * Returns a {@link SoftReference} to the {@link JobResult} for the specified job, and removes
+	 * the entry from the cache.
+	 *
+	 * @param jobId The job id of the {@link JobResult}.
+	 * @return A {@link SoftReference} to the {@link JobResult} for the job, or {@code null} if the
+	 * entry cannot be found in the cache.
+	 */
+	@Nullable
+	public SoftReference<JobResult> get(final JobID jobId) {
+		final SoftReference<JobResult> jobResultRef = jobExecutionResultCache.getIfPresent(jobId);
+		jobExecutionResultCache.invalidate(jobId);
+		return jobResultRef;
+	}
+
+	private void assertJobExecutionResultNotCached(final JobID jobId) {
+		checkState(
+			jobExecutionResultCache.getIfPresent(jobId) == null,
+			"jobExecutionResultCache already contained entry for job %s", jobId);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
index 10ec659..21a7ba4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/JobClusterEntrypoint.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.entrypoint;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -34,6 +33,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRestEndpoint;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
@@ -60,6 +60,8 @@ import javax.annotation.Nullable;
 
 import java.util.concurrent.Executor;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * Base class for per-job cluster entry points.
  */
@@ -287,15 +289,17 @@ public abstract class JobClusterEntrypoint extends ClusterEntrypoint {
 		}
 
 		@Override
-		public void jobFinished(JobExecutionResult result) {
+		public void jobFinished(JobResult result) {
 			LOG.info("Job({}) finished.", jobId);
 
 			shutDownAndTerminate(true);
 		}
 
 		@Override
-		public void jobFailed(Throwable cause) {
-			LOG.info("Job({}) failed.", jobId, cause);
+		public void jobFailed(JobResult result) {
+			checkArgument(result.getSerializedThrowable().isPresent());
+
+			LOG.info("Job({}) failed.", jobId, result.getSerializedThrowable().get().getMessage());
 
 			shutDownAndTerminate(false);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
index 17167f2..149ea0f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/OnCompletionActions.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.jobmanager;
 
-import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.runtime.jobmaster.JobResult;
 
 /**
  * Interface for completion actions once a Flink job has reached
@@ -31,14 +31,14 @@ public interface OnCompletionActions {
 	 *
 	 * @param result of the job execution
 	 */
-	void jobFinished(JobExecutionResult result);
+	void jobFinished(JobResult result);
 
 	/**
-	 * Job failed with the given exception.
+	 * Job failed with an exception.
 	 *
-	 * @param cause of the job failure
+	 * @param result The result of the job carrying the failure cause.
 	 */
-	void jobFailed(Throwable cause);
+	void jobFailed(JobResult result);
 
 	/**
 	 * Job was finished by another JobMaster.

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index e699d6d..4833cbd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -252,7 +251,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	 * Job completion notification triggered by JobManager.
 	 */
 	@Override
-	public void jobFinished(JobExecutionResult result) {
+	public void jobFinished(JobResult result) {
 		try {
 			unregisterJobFromHighAvailability();
 			shutdownInternally();
@@ -268,14 +267,14 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	 * Job completion notification triggered by JobManager.
 	 */
 	@Override
-	public void jobFailed(Throwable cause) {
+	public void jobFailed(JobResult result) {
 		try {
 			unregisterJobFromHighAvailability();
 			shutdownInternally();
 		}
 		finally {
 			if (toNotifyOnComplete != null) {
-				toNotifyOnComplete.jobFailed(cause);
+				toNotifyOnComplete.jobFailed(result);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index b81a8c8..1de8e19 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
@@ -101,6 +100,7 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
 
 import org.slf4j.Logger;
 
@@ -970,6 +970,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 
 		final JobID jobID = executionGraph.getJobID();
 		final String jobName = executionGraph.getJobName();
+		final JobResult.Builder builder = new JobResult.Builder()
+			.jobId(jobID)
+			.netRuntime(0);
 
 		if (newJobStatus.isGloballyTerminalState()) {
 			switch (newJobStatus) {
@@ -977,10 +980,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 					try {
 						// TODO get correct job duration
 						// job done, let's get the accumulators
-						Map<String, Object> accumulatorResults = executionGraph.getAccumulators();
-						JobExecutionResult result = new JobExecutionResult(jobID, 0L, accumulatorResults);
-
-						executor.execute(() -> jobCompletionActions.jobFinished(result));
+						final Map<String, SerializedValue<Object>> accumulatorsSerialized = executionGraph.getAccumulatorsSerialized();
+						builder.accumulatorResults(accumulatorsSerialized);
+						executor.execute(() -> jobCompletionActions.jobFinished(builder.build()));
 					}
 					catch (Exception e) {
 						log.error("Cannot fetch final accumulators for job {} ({})", jobName, jobID, e);
@@ -990,7 +992,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 								"The job is registered as 'FINISHED (successful), but this notification describes " +
 								"a failure, since the resulting accumulators could not be fetched.", e);
 
-						executor.execute(() ->jobCompletionActions.jobFailed(exception));
+						executor.execute(() -> jobCompletionActions.jobFailed(builder
+							.serializedThrowable(new SerializedThrowable(exception))
+							.build()));
 					}
 					break;
 
@@ -998,7 +1002,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 					final JobExecutionException exception = new JobExecutionException(
 						jobID, "Job was cancelled.", new Exception("The job was cancelled"));
 
-					executor.execute(() -> jobCompletionActions.jobFailed(exception));
+					executor.execute(() -> jobCompletionActions.jobFailed(builder
+						.serializedThrowable(new SerializedThrowable(exception))
+						.build()));
 					break;
 				}
 
@@ -1006,7 +1012,9 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 					final Throwable unpackedError = SerializedThrowable.get(error, userCodeLoader);
 					final JobExecutionException exception = new JobExecutionException(
 							jobID, "Job execution failed.", unpackedError);
-					executor.execute(() -> jobCompletionActions.jobFailed(exception));
+					executor.execute(() -> jobCompletionActions.jobFailed(builder
+						.serializedThrowable(new SerializedThrowable(exception))
+						.build()));
 					break;
 				}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
new file mode 100644
index 0000000..4a409d5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobResult.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.SerializedValue;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * Similar to {@link org.apache.flink.api.common.JobExecutionResult} but with an optional
+ * {@link SerializedThrowable} when the job failed.
+ *
+ * <p>This is used by the {@link JobMaster} to send the results to the {@link Dispatcher}.
+ */
+public class JobResult implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	private final JobID jobId;
+
+	private final Map<String, SerializedValue<Object>> accumulatorResults;
+
+	private final long netRuntime;
+
+	/** Stores the cause of the job failure, or {@code null} if the job finished successfully. */
+	@Nullable
+	private final SerializedThrowable serializedThrowable;
+
+	private JobResult(
+			final JobID jobId,
+			final Map<String, SerializedValue<Object>> accumulatorResults,
+			final long netRuntime,
+			@Nullable final SerializedThrowable serializedThrowable) {
+
+		checkArgument(netRuntime >= 0, "netRuntime must be greater than or equals 0");
+
+		this.jobId = requireNonNull(jobId);
+		this.accumulatorResults = requireNonNull(accumulatorResults);
+		this.netRuntime = netRuntime;
+		this.serializedThrowable = serializedThrowable;
+	}
+
+	/**
+	 * Returns {@code true} if the job finished successfully.
+	 */
+	public boolean isSuccess() {
+		return serializedThrowable == null;
+	}
+
+	public JobID getJobId() {
+		return jobId;
+	}
+
+	public Map<String, SerializedValue<Object>> getAccumulatorResults() {
+		return accumulatorResults;
+	}
+
+	public long getNetRuntime() {
+		return netRuntime;
+	}
+
+	/**
+	 * Returns an empty {@code Optional} if the job finished successfully, otherwise the
+	 * {@code Optional} will carry the failure cause.
+	 */
+	public Optional<SerializedThrowable> getSerializedThrowable() {
+		return Optional.ofNullable(serializedThrowable);
+	}
+
+	/**
+	 * Builder for {@link JobResult}.
+	 */
+	@Internal
+	public static class Builder {
+
+		private JobID jobId;
+
+		private Map<String, SerializedValue<Object>> accumulatorResults;
+
+		private long netRuntime = -1;
+
+		private SerializedThrowable serializedThrowable;
+
+		public Builder jobId(final JobID jobId) {
+			this.jobId = jobId;
+			return this;
+		}
+
+		public Builder accumulatorResults(final Map<String, SerializedValue<Object>> accumulatorResults) {
+			this.accumulatorResults = accumulatorResults;
+			return this;
+		}
+
+		public Builder netRuntime(final long netRuntime) {
+			this.netRuntime = netRuntime;
+			return this;
+		}
+
+		public Builder serializedThrowable(final SerializedThrowable serializedThrowable) {
+			this.serializedThrowable = serializedThrowable;
+			return this;
+		}
+
+		public JobResult build() {
+			return new JobResult(
+				jobId,
+				accumulatorResults == null ? Collections.emptyMap() : accumulatorResults,
+				netRuntime,
+				serializedThrowable);
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java
new file mode 100644
index 0000000..d73b3a5
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/JobExecutionResultGoneException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.FlinkException;
+
+/**
+ * Exception indicating that the required {@link org.apache.flink.runtime.jobmaster.JobResult} was
+ * garbage collected.
+ * @see org.apache.flink.runtime.dispatcher.JobExecutionResultCache
+ */
+public class JobExecutionResultGoneException extends FlinkException {
+
+	private static final long serialVersionUID = 1L;
+
+	public JobExecutionResultGoneException(JobID jobId) {
+		super(String.format("Job execution result for job [%s] is gone.", jobId));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 3a1474d..b9d76da 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.minicluster;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobExecutionException;
@@ -30,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -39,6 +41,7 @@ import org.apache.flink.util.FlinkException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -358,12 +361,12 @@ public class MiniClusterJobDispatcher {
 		}
 
 		@Override
-		public void jobFinished(JobExecutionResult result) {
+		public void jobFinished(JobResult result) {
 			decrementCheckAndCleanup();
 		}
 
 		@Override
-		public void jobFailed(Throwable cause) {
+		public void jobFailed(JobResult result) {
 			decrementCheckAndCleanup();
 		}
 
@@ -401,11 +404,9 @@ public class MiniClusterJobDispatcher {
 
 		private final CountDownLatch jobMastersToWaitFor;
 
-		private volatile Throwable jobException;
-
 		private volatile Throwable runnerException;
 
-		private volatile JobExecutionResult result;
+		private volatile JobResult result;
 		
 		BlockingJobSync(JobID jobId, int numJobMastersToWaitFor) {
 			this.jobId = jobId;
@@ -413,14 +414,16 @@ public class MiniClusterJobDispatcher {
 		}
 
 		@Override
-		public void jobFinished(JobExecutionResult jobResult) {
-			this.result = jobResult;
+		public void jobFinished(JobResult result) {
+			this.result = result;
 			jobMastersToWaitFor.countDown();
 		}
 
 		@Override
-		public void jobFailed(Throwable cause) {
-			jobException = cause;
+		public void jobFailed(JobResult result) {
+			checkArgument(result.getSerializedThrowable().isPresent());
+
+			this.result = result;
 			jobMastersToWaitFor.countDown();
 		}
 
@@ -439,9 +442,8 @@ public class MiniClusterJobDispatcher {
 		public JobExecutionResult getResult() throws JobExecutionException, InterruptedException {
 			jobMastersToWaitFor.await();
 
-			final Throwable jobFailureCause = this.jobException;
 			final Throwable runnerException = this.runnerException;
-			final JobExecutionResult result = this.result;
+			final JobResult result = this.result;
 
 			// (1) we check if the job terminated with an exception
 			// (2) we check whether the job completed successfully
@@ -449,7 +451,11 @@ public class MiniClusterJobDispatcher {
 			//     completed successfully in that case, if multiple JobMasters were running
 			//     and other took over. only if all encounter a fatal error, the job cannot finish
 
-			if (jobFailureCause != null) {
+			if (result != null && !result.isSuccess()) {
+				checkState(result.getSerializedThrowable().isPresent());
+				final Throwable jobFailureCause = result.getSerializedThrowable()
+					.get()
+					.deserializeError(ClassLoader.getSystemClassLoader());
 				if (jobFailureCause instanceof JobExecutionException) {
 					throw (JobExecutionException) jobFailureCause;
 				}
@@ -458,7 +464,16 @@ public class MiniClusterJobDispatcher {
 				}
 			}
 			else if (result != null) {
-				return result;
+				try {
+					return new JobExecutionResult(
+						jobId,
+						result.getNetRuntime(),
+						AccumulatorHelper.deserializeAccumulators(
+							result.getAccumulatorResults(),
+							ClassLoader.getSystemClassLoader()));
+				} catch (final IOException | ClassNotFoundException e) {
+					throw new JobExecutionException(result.getJobId(), e);
+				}
 			}
 			else if (runnerException != null) {
 				throw new JobExecutionException(jobId,

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 61dca3b..0f26d3b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -23,7 +23,9 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.messages.JobExecutionResultGoneException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.dump.MetricQueryService;
@@ -92,4 +94,47 @@ public interface RestfulGateway extends RpcGateway {
 	 * @return Future containing the collection of instance ids and the corresponding metric query service path
 	 */
 	CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
+
+	/**
+	 * Returns the JobExecutionResult for a job, or in case the job failed, the failure cause.
+	 *
+	 * @param jobId ID of the job that we are interested in.
+	 * @param timeout Timeout for the asynchronous operation.
+	 *
+	 * @see #isJobExecutionResultPresent(JobID, Time)
+	 *
+	 * @return CompletableFuture containing the JobExecutionResult. The future is completed
+	 * exceptionally with:
+	 * <ul>
+	 * 	<li>{@link FlinkJobNotFoundException} if there is no result, or if the result has
+	 * 	expired
+	 * 	<li>{@link JobExecutionResultGoneException} if the result was removed due to memory demand.
+	 * </ul>
+	 */
+	default CompletableFuture<JobResult> getJobExecutionResult(
+			JobID jobId,
+			@RpcTimeout Time timeout) {
+		throw new UnsupportedOperationException();
+	}
+
+	/**
+	 * Tests if the {@link JobResult} is present.
+	 *
+	 * @param jobId ID of the job that we are interested in.
+	 * @param timeout Timeout for the asynchronous operation.
+	 *
+	 * @see #getJobExecutionResult(JobID, Time)
+	 *
+	 * @return {@link CompletableFuture} containing {@code true} when then the
+	 * {@link JobResult} is present. The future is completed exceptionally with:
+	 * <ul>
+	 * 	<li>{@link FlinkJobNotFoundException} if there is no job running with the specified ID, or
+	 * 	if the result has expired
+	 * 	<li>{@link JobExecutionResultGoneException} if the result was removed due to memory demand.
+	 * </ul>
+	 */
+	default CompletableFuture<Boolean> isJobExecutionResultPresent(
+			JobID jobId, @RpcTimeout Time timeout) {
+		throw new UnsupportedOperationException();
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index b75ae06..b5fcd18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -37,9 +37,11 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerServices;
+import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -51,6 +53,8 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
@@ -67,11 +71,14 @@ import org.mockito.Mockito;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThat;
@@ -249,6 +256,66 @@ public class DispatcherTest extends TestLogger {
 		assertThat(dispatcherGateway.listJobs(TIMEOUT).get(), hasSize(1));
 	}
 
+	/**
+	 * Test that {@link JobResult} is cached when the job finishes.
+	 */
+	@Test
+	public void testCacheJobExecutionResult() throws Exception {
+		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+
+		OnCompletionActions onCompletionActions;
+
+		final JobID failedJobId = new JobID();
+		onCompletionActions = dispatcher.new DispatcherOnCompleteActions(failedJobId);
+
+		onCompletionActions.jobFailed(new JobResult.Builder()
+			.jobId(failedJobId)
+			.serializedThrowable(new SerializedThrowable(new RuntimeException("expected")))
+			.netRuntime(Long.MAX_VALUE)
+			.build());
+
+		assertThat(
+			dispatcherGateway.isJobExecutionResultPresent(failedJobId, TIMEOUT).get(),
+			equalTo(true));
+		assertThat(
+			dispatcherGateway.getJobExecutionResult(failedJobId, TIMEOUT)
+				.get()
+				.isSuccess(),
+			equalTo(false));
+
+		final JobID successJobId = new JobID();
+		onCompletionActions = dispatcher.new DispatcherOnCompleteActions(successJobId);
+
+		onCompletionActions.jobFinished(new JobResult.Builder()
+			.jobId(successJobId)
+			.netRuntime(Long.MAX_VALUE)
+			.build());
+
+		assertThat(
+			dispatcherGateway.isJobExecutionResultPresent(successJobId, TIMEOUT).get(),
+			equalTo(true));
+		assertThat(
+			dispatcherGateway.getJobExecutionResult(successJobId, TIMEOUT)
+				.get()
+				.isSuccess(),
+			equalTo(true));
+	}
+
+	@Test
+	public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
+		dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
+
+		final DispatcherGateway dispatcherGateway = dispatcher.getSelfGateway(DispatcherGateway.class);
+		try {
+			dispatcherGateway.getJobExecutionResult(new JobID(), TIMEOUT).get();
+		} catch (ExecutionException e) {
+			final Throwable throwable = ExceptionUtils.stripExecutionException(e);
+			assertThat(throwable, instanceOf(FlinkJobNotFoundException.class));
+		}
+	}
+
 	private static class TestingDispatcher extends Dispatcher {
 
 		private final JobID expectedJobId;

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java
new file mode 100644
index 0000000..dfc059c
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobExecutionResultCacheTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.lang.ref.SoftReference;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+import static org.hamcrest.Matchers.sameInstance;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link JobExecutionResultCache}.
+ */
+@Category(Flip6.class)
+public class JobExecutionResultCacheTest extends TestLogger {
+
+	private JobExecutionResultCache jobExecutionResultCache;
+
+	@Before
+	public void setUp() {
+		jobExecutionResultCache = new JobExecutionResultCache();
+	}
+
+	@Test
+	public void testCacheResultUntilRetrieved() {
+		final JobID jobId = new JobID();
+		final JobResult jobResult = new JobResult.Builder()
+			.jobId(jobId)
+			.netRuntime(Long.MAX_VALUE)
+			.build();
+		jobExecutionResultCache.put(jobResult);
+
+		assertThat(jobExecutionResultCache.contains(jobId), equalTo(true));
+
+		SoftReference<JobResult> jobResultRef;
+		jobResultRef = jobExecutionResultCache.get(jobId);
+
+		assertThat(jobResultRef, notNullValue());
+		assertThat(jobResultRef.get(), sameInstance(jobResult));
+
+		assertThat(jobExecutionResultCache.contains(jobId), equalTo(false));
+
+		jobResultRef = jobExecutionResultCache.get(jobId);
+		assertThat(jobResultRef, nullValue());
+	}
+
+	@Test
+	public void testThrowExceptionIfEntryAlreadyExists() {
+		final JobID jobId = new JobID();
+		final JobResult build = new JobResult.Builder()
+			.jobId(jobId)
+			.netRuntime(Long.MAX_VALUE)
+			.build();
+		jobExecutionResultCache.put(build);
+
+		try {
+			jobExecutionResultCache.put(build);
+			fail("Expected exception not thrown.");
+		} catch (final IllegalStateException e) {
+			assertThat(e.getMessage(), containsString("already contained entry for job"));
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index a0959c0..245ea27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
@@ -33,12 +32,14 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -101,7 +102,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
 		SubmittedJobGraphStore submittedJobGraphStore = mock(SubmittedJobGraphStore.class);
 
 		blobStore = mock(BlobStore.class);
-		
+
 		HighAvailabilityServices haServices = mock(HighAvailabilityServices.class);
 		when(haServices.getJobManagerLeaderElectionService(any(JobID.class))).thenReturn(leaderElectionService);
 		when(haServices.getSubmittedJobGraphStore()).thenReturn(submittedJobGraphStore);
@@ -138,7 +139,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
 		assertTrue(!jobCompletion.isJobFailed());
 
 		verify(jobManager).start(any(JobMasterId.class), any(Time.class));
-		
+
 		runner.shutdown();
 		verify(leaderElectionService).stop();
 		verify(jobManager).shutDown();
@@ -175,7 +176,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
 		assertTrue(!jobCompletion.isJobFinished());
 
 		// runner been told by JobManager that job is finished
-		runner.jobFinished(mock(JobExecutionResult.class));
+		runner.jobFinished(mock(JobResult.class));
 
 		assertTrue(jobCompletion.isJobFinished());
 		assertFalse(jobCompletion.isJobFinishedByOther());
@@ -195,7 +196,10 @@ public class JobManagerRunnerMockTest extends TestLogger {
 		assertTrue(!jobCompletion.isJobFinished());
 
 		// runner been told by JobManager that job is failed
-		runner.jobFailed(new Exception("failed manually"));
+		runner.jobFailed(new JobResult.Builder()
+			.jobId(new JobID())
+			.serializedThrowable(new SerializedThrowable(new Exception("failed manually")))
+			.build());
 
 		assertTrue(jobCompletion.isJobFailed());
 		verify(leaderElectionService).stop();
@@ -239,14 +243,14 @@ public class JobManagerRunnerMockTest extends TestLogger {
 
 	private static class TestingOnCompletionActions implements OnCompletionActions, FatalErrorHandler {
 
-		private volatile JobExecutionResult result;
+		private volatile JobResult result;
 
 		private volatile Throwable failedCause;
 
 		private volatile boolean finishedByOther;
 
 		@Override
-		public void jobFinished(JobExecutionResult result) {
+		public void jobFinished(JobResult result) {
 			checkArgument(!isJobFinished(), "job finished already");
 			checkArgument(!isJobFailed(), "job failed already");
 
@@ -254,11 +258,11 @@ public class JobManagerRunnerMockTest extends TestLogger {
 		}
 
 		@Override
-		public void jobFailed(Throwable cause) {
+		public void jobFailed(JobResult result) {
 			checkArgument(!isJobFinished(), "job finished already");
 			checkArgument(!isJobFailed(), "job failed already");
 
-			this.failedCause = cause;
+			this.failedCause = result.getSerializedThrowable().get();
 		}
 
 		@Override
@@ -271,7 +275,10 @@ public class JobManagerRunnerMockTest extends TestLogger {
 
 		@Override
 		public void onFatalError(Throwable exception) {
-			jobFailed(exception);
+			checkArgument(!isJobFinished(), "job finished already");
+			checkArgument(!isJobFailed(), "job failed already");
+
+			this.failedCause = exception;
 		}
 
 		boolean isJobFinished() {

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index e4f9fc2..d77a1d4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -266,12 +265,12 @@ public class JobMasterTest extends TestLogger {
 	private static final class NoOpOnCompletionActions implements OnCompletionActions {
 
 		@Override
-		public void jobFinished(JobExecutionResult result) {
+		public void jobFinished(final JobResult result) {
 
 		}
 
 		@Override
-		public void jobFailed(Throwable cause) {
+		public void jobFailed(final JobResult result) {
 
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/08e550c9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
new file mode 100644
index 0000000..1c7f0dd
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobResultTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.SerializedThrowable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link JobResult}.
+ */
+@Category(Flip6.class)
+public class JobResultTest extends TestLogger {
+
+	@Test
+	public void testNetRuntimeMandatory() {
+		try {
+			new JobResult.Builder()
+				.jobId(new JobID())
+				.build();
+			fail("Expected exception not thrown");
+		} catch (final IllegalArgumentException e) {
+			assertThat(e.getMessage(), equalTo("netRuntime must be greater than or equals 0"));
+		}
+	}
+
+	@Test
+	public void testIsNotSuccess() throws Exception {
+		final JobResult jobResult = new JobResult.Builder()
+			.jobId(new JobID())
+			.serializedThrowable(new SerializedThrowable(new RuntimeException()))
+			.netRuntime(Long.MAX_VALUE)
+			.build();
+
+		assertThat(jobResult.isSuccess(), equalTo(false));
+	}
+
+	@Test
+	public void testIsSuccess() throws Exception {
+		final JobResult jobResult = new JobResult.Builder()
+			.jobId(new JobID())
+			.netRuntime(Long.MAX_VALUE)
+			.build();
+
+		assertThat(jobResult.isSuccess(), equalTo(true));
+	}
+
+}


Mime
View raw message