flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/5] flink git commit: [FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers
Date Mon, 02 Oct 2017 20:29:43 GMT
[FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST handlers

The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the latter, the former
does not expect the AccessExecutionGraph to be the true ExecutionGraph. Instead it assumes
it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache entries after
a given time to live period. This will trigger requesting the AccessExecutionGraph again
and, thus, updating the ExecutionGraph information for the ExecutionGraph based REST
handlers.

In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic cleanup task
which triggers ExecutionGraphCache.cleanup. This methods releases all cache entries which
have exceeded their time to live. Currently it is set to 20 * refreshInterval of the
web gui.

This closes #4728.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aae417f1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aae417f1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aae417f1

Branch: refs/heads/master
Commit: aae417f113d0f9db3ac3b4cbadd378134f30b440
Parents: c3472b9
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Sep 25 15:29:59 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Oct 2 19:57:58 2017 +0200

----------------------------------------------------------------------
 .../MesosApplicationMasterRunner.java           |   3 +-
 .../runtime/webmonitor/WebRuntimeMonitor.java   | 130 ++++---
 .../webmonitor/WebRuntimeMonitorITCase.java     |   4 +-
 .../runtime/akka/AkkaJobManagerGateway.java     |   7 +-
 .../clusterframework/BootstrapTools.java        |  24 +-
 .../flink/runtime/dispatcher/Dispatcher.java    |  14 +-
 .../runtime/dispatcher/DispatcherGateway.java   |   5 -
 .../runtime/jobmaster/JobManagerGateway.java    |  34 --
 .../flink/runtime/jobmaster/JobMaster.java      |   6 +
 .../runtime/jobmaster/JobMasterGateway.java     |  16 +
 .../messages/FlinkJobNotFoundException.java     |   6 +-
 .../AbstractExecutionGraphRequestHandler.java   |  24 +-
 .../legacy/AbstractJobVertexRequestHandler.java |   2 +-
 .../AbstractSubtaskAttemptRequestHandler.java   |   2 +-
 .../legacy/AbstractSubtaskRequestHandler.java   |   2 +-
 .../legacy/CurrentJobsOverviewHandler.java      |   2 +-
 .../handler/legacy/ExecutionGraphCache.java     | 187 ++++++++++
 .../handler/legacy/ExecutionGraphHolder.java    |  82 -----
 .../handler/legacy/JobAccumulatorsHandler.java  |   2 +-
 .../JobCancellationWithSavepointHandlers.java   |  62 ++--
 .../rest/handler/legacy/JobConfigHandler.java   |   4 +-
 .../rest/handler/legacy/JobDetailsHandler.java  |   2 +-
 .../handler/legacy/JobExceptionsHandler.java    |   2 +-
 .../rest/handler/legacy/JobPlanHandler.java     |   2 +-
 .../legacy/JobVertexAccumulatorsHandler.java    |   2 +-
 .../legacy/JobVertexBackPressureHandler.java    |   2 +-
 .../handler/legacy/JobVertexDetailsHandler.java |   2 +-
 .../legacy/JobVertexTaskManagersHandler.java    |   2 +-
 .../SubtaskCurrentAttemptDetailsHandler.java    |   2 +-
 ...taskExecutionAttemptAccumulatorsHandler.java |   2 +-
 .../SubtaskExecutionAttemptDetailsHandler.java  |   2 +-
 .../legacy/SubtasksAllAccumulatorsHandler.java  |   2 +-
 .../handler/legacy/SubtasksTimesHandler.java    |   2 +-
 .../checkpoints/CheckpointConfigHandler.java    |   4 +-
 .../CheckpointStatsDetailsHandler.java          |   4 +-
 .../CheckpointStatsDetailsSubtasksHandler.java  |   4 +-
 .../checkpoints/CheckpointStatsHandler.java     |   4 +-
 .../runtime/webmonitor/RestfulGateway.java      |  39 +-
 .../runtime/webmonitor/WebMonitorUtils.java     |  10 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   4 +-
 .../runtime/minicluster/FlinkMiniCluster.scala  |   4 +-
 .../handler/legacy/ExecutionGraphCacheTest.java | 353 +++++++++++++++++++
 .../legacy/JobAccumulatorsHandlerTest.java      |   2 +-
 ...obCancellationWithSavepointHandlersTest.java |  19 +-
 .../handler/legacy/JobConfigHandlerTest.java    |   2 +-
 .../handler/legacy/JobDetailsHandlerTest.java   |   2 +-
 .../legacy/JobExceptionsHandlerTest.java        |   2 +-
 .../rest/handler/legacy/JobPlanHandlerTest.java |   2 +-
 .../JobVertexAccumulatorsHandlerTest.java       |   2 +-
 .../JobVertexBackPressureHandlerTest.java       |   8 +-
 .../legacy/JobVertexDetailsHandlerTest.java     |   2 +-
 .../JobVertexTaskManagersHandlerTest.java       |   2 +-
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |   2 +-
 ...ExecutionAttemptAccumulatorsHandlerTest.java |   2 +-
 ...btaskExecutionAttemptDetailsHandlerTest.java |   2 +-
 .../SubtasksAllAccumulatorsHandlerTest.java     |   2 +-
 .../legacy/SubtasksTimesHandlerTest.java        |   2 +-
 .../CheckpointConfigHandlerTest.java            |  10 +-
 .../CheckpointStatsDetailsHandlerTest.java      |  12 +-
 .../checkpoints/CheckpointStatsHandlerTest.java |   6 +-
 ...heckpointStatsSubtaskDetailsHandlerTest.java |  18 +-
 .../flink/yarn/YarnApplicationMasterRunner.java |   3 +-
 62 files changed, 840 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
index 6abdb28..f465464 100755
--- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
+++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
@@ -33,6 +33,7 @@ import org.apache.flink.mesos.util.MesosConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.BootstrapTools;
 import org.apache.flink.runtime.clusterframework.ContainerSpecification;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobmanager.JobManager;
@@ -294,7 +295,7 @@ public class MesosApplicationMasterRunner {
 				new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)),
 				new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
 				webMonitorTimeout,
-				futureExecutor,
+				new ScheduledExecutorServiceAdapter(futureExecutor),
 				LOG);
 			if (webMonitor != null) {
 				final URL webMonitorURL = new URL(webMonitor.getRestAddress());

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 1af6ab6..a37ce2d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobView;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -34,7 +35,7 @@ import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler;
 import org.apache.flink.runtime.rest.handler.legacy.CurrentJobIdsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
 import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JobAccumulatorsHandler;
 import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler;
 import org.apache.flink.runtime.rest.handler.legacy.JobCancellationWithSavepointHandlers;
@@ -93,7 +94,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -143,6 +144,10 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private final WebMonitorConfig cfg;
 
+	private final ExecutionGraphCache executionGraphCache;
+
+	private final ScheduledFuture<?> executionGraphCleanupTask;
+
 	private AtomicBoolean cleanedUp = new AtomicBoolean();
 
 
@@ -155,7 +160,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 			LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever,
 			MetricQueryServiceRetriever queryServiceRetriever,
 			Time timeout,
-			Executor executor) throws IOException, InterruptedException {
+			ScheduledExecutor scheduledExecutor) throws IOException, InterruptedException {
 
 		this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
 		this.retriever = Preconditions.checkNotNull(jobManagerRetriever);
@@ -193,11 +198,23 @@ public class WebRuntimeMonitor implements WebMonitor {
 			this.uploadDir = null;
 		}
 
-		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(timeout);
+		final long timeToLive = cfg.getRefreshInterval() * 10L;
+
+		this.executionGraphCache = new ExecutionGraphCache(
+			timeout,
+			Time.milliseconds(timeToLive));
+
+		final long cleanupInterval = timeToLive * 2L;
+
+		this.executionGraphCleanupTask = scheduledExecutor.scheduleWithFixedDelay(
+			executionGraphCache::cleanup,
+			cleanupInterval,
+			cleanupInterval,
+			TimeUnit.MILLISECONDS);
 
 		// - Back pressure stats ----------------------------------------------
 
-		stackTraceSamples = new StackTraceSampleCoordinator(executor, 60000);
+		stackTraceSamples = new StackTraceSampleCoordinator(scheduledExecutor, 60000);
 
 		// Back pressure stats tracker config
 		int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
@@ -228,55 +245,55 @@ public class WebRuntimeMonitor implements WebMonitor {
 		} else {
 			serverSSLContext = null;
 		}
-		metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, executor, timeout);
+		metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, scheduledExecutor, timeout);
 
 		String defaultSavepointDir = config.getString(CoreOptions.SAVEPOINT_DIRECTORY);
 
-		JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, executor, defaultSavepointDir);
+		JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(executionGraphCache, scheduledExecutor, defaultSavepointDir);
 		RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler());
 		RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler());
 
 		Router router = new Router();
 		// config how to interact with this web server
-		get(router, new DashboardConfigHandler(executor, cfg.getRefreshInterval()));
+		get(router, new DashboardConfigHandler(scheduledExecutor, cfg.getRefreshInterval()));
 
 		// the overview - how many task managers, slots, free slots, ...
-		get(router, new ClusterOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT));
+		get(router, new ClusterOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT));
 
 		// job manager configuration
-		get(router, new ClusterConfigHandler(executor, config));
+		get(router, new ClusterConfigHandler(scheduledExecutor, config));
 
 		// overview over jobs
-		get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, true));
-		get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, false));
-		get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, false, true));
-
-		get(router, new CurrentJobIdsHandler(executor, DEFAULT_REQUEST_TIMEOUT));
-
-		get(router, new JobDetailsHandler(currentGraphs, executor, metricFetcher));
-
-		get(router, new JobVertexDetailsHandler(currentGraphs, executor, metricFetcher));
-		get(router, new SubtasksTimesHandler(currentGraphs, executor));
-		get(router, new JobVertexTaskManagersHandler(currentGraphs, executor, metricFetcher));
-		get(router, new JobVertexAccumulatorsHandler(currentGraphs, executor));
-		get(router, new JobVertexBackPressureHandler(currentGraphs, executor, backPressureStatsTracker, refreshInterval));
-		get(router, new JobVertexMetricsHandler(executor, metricFetcher));
-		get(router, new SubtasksAllAccumulatorsHandler(currentGraphs, executor));
-		get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, executor, metricFetcher));
-		get(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, executor, metricFetcher));
-		get(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs, executor));
-
-		get(router, new JobPlanHandler(currentGraphs, executor));
-		get(router, new JobConfigHandler(currentGraphs, executor));
-		get(router, new JobExceptionsHandler(currentGraphs, executor));
-		get(router, new JobAccumulatorsHandler(currentGraphs, executor));
-		get(router, new JobMetricsHandler(executor, metricFetcher));
-
-		get(router, new TaskManagersHandler(executor, DEFAULT_REQUEST_TIMEOUT, metricFetcher));
+		get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, true, true));
+		get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, true, false));
+		get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, false, true));
+
+		get(router, new CurrentJobIdsHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT));
+
+		get(router, new JobDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher));
+
+		get(router, new JobVertexDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher));
+		get(router, new SubtasksTimesHandler(executionGraphCache, scheduledExecutor));
+		get(router, new JobVertexTaskManagersHandler(executionGraphCache, scheduledExecutor, metricFetcher));
+		get(router, new JobVertexAccumulatorsHandler(executionGraphCache, scheduledExecutor));
+		get(router, new JobVertexBackPressureHandler(executionGraphCache, scheduledExecutor, backPressureStatsTracker, refreshInterval));
+		get(router, new JobVertexMetricsHandler(scheduledExecutor, metricFetcher));
+		get(router, new SubtasksAllAccumulatorsHandler(executionGraphCache, scheduledExecutor));
+		get(router, new SubtaskCurrentAttemptDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher));
+		get(router, new SubtaskExecutionAttemptDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher));
+		get(router, new SubtaskExecutionAttemptAccumulatorsHandler(executionGraphCache, scheduledExecutor));
+
+		get(router, new JobPlanHandler(executionGraphCache, scheduledExecutor));
+		get(router, new JobConfigHandler(executionGraphCache, scheduledExecutor));
+		get(router, new JobExceptionsHandler(executionGraphCache, scheduledExecutor));
+		get(router, new JobAccumulatorsHandler(executionGraphCache, scheduledExecutor));
+		get(router, new JobMetricsHandler(scheduledExecutor, metricFetcher));
+
+		get(router, new TaskManagersHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, metricFetcher));
 		get(router,
 			new TaskManagerLogHandler(
 				retriever,
-				executor,
+				scheduledExecutor,
 				localRestAddress,
 				timeout,
 				TaskManagerLogHandler.FileMode.LOG,
@@ -285,13 +302,13 @@ public class WebRuntimeMonitor implements WebMonitor {
 		get(router,
 			new TaskManagerLogHandler(
 				retriever,
-				executor,
+				scheduledExecutor,
 				localRestAddress,
 				timeout,
 				TaskManagerLogHandler.FileMode.STDOUT,
 				config,
 				blobView));
-		get(router, new TaskManagerMetricsHandler(executor, metricFetcher));
+		get(router, new TaskManagerMetricsHandler(scheduledExecutor, metricFetcher));
 
 		router
 			// log and stdout
@@ -305,48 +322,48 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
 				new StaticFileServerHandler<>(retriever, localRestAddress, timeout, logFiles.stdOutFile));
 
-		get(router, new JobManagerMetricsHandler(executor, metricFetcher));
+		get(router, new JobManagerMetricsHandler(scheduledExecutor, metricFetcher));
 
 		// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
-		get(router, new JobCancellationHandler(executor, timeout));
+		get(router, new JobCancellationHandler(scheduledExecutor, timeout));
 		// DELETE is the preferred way of canceling a job (Rest-conform)
-		delete(router, new JobCancellationHandler(executor, timeout));
+		delete(router, new JobCancellationHandler(scheduledExecutor, timeout));
 
 		get(router, triggerHandler);
 		get(router, inProgressHandler);
 
 		// stop a job via GET (for proper integration with YARN this has to be performed via GET)
-		get(router, new JobStoppingHandler(executor, timeout));
+		get(router, new JobStoppingHandler(scheduledExecutor, timeout));
 		// DELETE is the preferred way of stopping a job (Rest-conform)
-		delete(router, new JobStoppingHandler(executor, timeout));
+		delete(router, new JobStoppingHandler(scheduledExecutor, timeout));
 
 		int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
 		CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
 
 		// Register the checkpoint stats handlers
-		get(router, new CheckpointStatsHandler(currentGraphs, executor));
-		get(router, new CheckpointConfigHandler(currentGraphs, executor));
-		get(router, new CheckpointStatsDetailsHandler(currentGraphs, executor, cache));
-		get(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, executor, cache));
+		get(router, new CheckpointStatsHandler(executionGraphCache, scheduledExecutor));
+		get(router, new CheckpointConfigHandler(executionGraphCache, scheduledExecutor));
+		get(router, new CheckpointStatsDetailsHandler(executionGraphCache, scheduledExecutor, cache));
+		get(router, new CheckpointStatsDetailsSubtasksHandler(executionGraphCache, scheduledExecutor, cache));
 
 		if (webSubmitAllow) {
 			// fetch the list of uploaded jars.
-			get(router, new JarListHandler(executor, uploadDir));
+			get(router, new JarListHandler(scheduledExecutor, uploadDir));
 
 			// get plan for an uploaded jar
-			get(router, new JarPlanHandler(executor, uploadDir));
+			get(router, new JarPlanHandler(scheduledExecutor, uploadDir));
 
 			// run a jar
-			post(router, new JarRunHandler(executor, uploadDir, timeout, config));
+			post(router, new JarRunHandler(scheduledExecutor, uploadDir, timeout, config));
 
 			// upload a jar
-			post(router, new JarUploadHandler(executor, uploadDir));
+			post(router, new JarUploadHandler(scheduledExecutor, uploadDir));
 
 			// delete an uploaded jar from submission interface
-			delete(router, new JarDeleteHandler(executor, uploadDir));
+			delete(router, new JarDeleteHandler(scheduledExecutor, uploadDir));
 		} else {
 			// send an Access Denied message
-			JarAccessDeniedHandler jad = new JarAccessDeniedHandler(executor);
+			JarAccessDeniedHandler jad = new JarAccessDeniedHandler(scheduledExecutor);
 			get(router, jad);
 			post(router, jad);
 			delete(router, jad);
@@ -447,6 +464,11 @@ public class WebRuntimeMonitor implements WebMonitor {
 	@Override
 	public void stop() throws Exception {
 		synchronized (startupShutdownLock) {
+
+			executionGraphCleanupTask.cancel(false);
+
+			executionGraphCache.close();
+
 			leaderRetrievalService.stop();
 
 			netty.shutdown();

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index e67c5ce..95b5811 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -179,7 +179,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 					jobManagerRetrievers[i],
 					new AkkaQueryServiceRetriever(jobManagerSystem[i], TIMEOUT),
 					TIMEOUT,
-					TestingUtils.defaultExecutor());
+					TestingUtils.defaultScheduledExecutor());
 			}
 
 			ActorRef[] jobManager = new ActorRef[2];
@@ -323,7 +323,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 				new AkkaJobManagerRetriever(actorSystem, TIMEOUT, 0, Time.milliseconds(50L)),
 				new AkkaQueryServiceRetriever(actorSystem, TIMEOUT),
 				TIMEOUT,
-				TestingUtils.defaultExecutor());
+				TestingUtils.defaultScheduledExecutor());
 
 			webRuntimeMonitor.start();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
index 18025dd..2a2c414 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
@@ -226,7 +227,7 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 	}
 
 	@Override
-	public CompletableFuture<Optional<AccessExecutionGraph>> requestJob(JobID jobId, Time timeout) {
+	public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) {
 		CompletableFuture<JobManagerMessages.JobResponse> jobResponseFuture = FutureUtils.toJava(
 			jobManagerGateway
 				.ask(new JobManagerMessages.RequestJob(jobId), FutureUtils.toFiniteDuration(timeout))
@@ -235,9 +236,9 @@ public class AkkaJobManagerGateway implements JobManagerGateway {
 		return jobResponseFuture.thenApply(
 			(JobManagerMessages.JobResponse jobResponse) -> {
 				if (jobResponse instanceof JobManagerMessages.JobFound) {
-					return Optional.of(((JobManagerMessages.JobFound) jobResponse).executionGraph());
+					return ((JobManagerMessages.JobFound) jobResponse).executionGraph();
 				} else {
-					return Optional.empty();
+					throw new CompletionException(new FlinkJobNotFoundException(jobId));
 				}
 			});
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
index 62cddb0..0a56963 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java
@@ -18,13 +18,6 @@
 
 package org.apache.flink.runtime.clusterframework;
 
-import akka.actor.ActorSystem;
-import com.typesafe.config.Config;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.Option;
-import org.apache.commons.lang3.StringUtils;
-
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -32,6 +25,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.WebMonitor;
@@ -40,10 +34,13 @@ import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.util.NetUtils;
 
+import akka.actor.ActorSystem;
+import com.typesafe.config.Config;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
-
 import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
 import java.io.FileWriter;
@@ -55,7 +52,8 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.Executor;
+
+import scala.concurrent.duration.FiniteDuration;
 
 /**
  * Tools for starting JobManager and TaskManager processes, including the
@@ -178,7 +176,7 @@ public class BootstrapTools {
 	 * @param jobManagerRetriever to retrieve the leading JobManagerGateway
 	 * @param queryServiceRetriever to resolve a query service
 	 * @param timeout for asynchronous operations
-	 * @param executor to run asynchronous operations
+	 * @param scheduledExecutor to run asynchronous operations
 	 * @param logger Logger for log output
 	 * @return WebMonitor instance.
 	 * @throws Exception
@@ -189,7 +187,7 @@ public class BootstrapTools {
 			LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever,
 			MetricQueryServiceRetriever queryServiceRetriever,
 			Time timeout,
-			Executor executor,
+			ScheduledExecutor scheduledExecutor,
 			Logger logger) throws Exception {
 
 		if (config.getInteger(WebOptions.PORT, 0) >= 0) {
@@ -203,7 +201,7 @@ public class BootstrapTools {
 				jobManagerRetriever,
 				queryServiceRetriever,
 				timeout,
-				executor);
+				scheduledExecutor);
 
 			// start the web monitor
 			if (monitor != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 0396d52..4d89dc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
@@ -284,7 +285,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	}
 
 	@Override
-	public CompletableFuture<MultipleJobsDetails> requestJobDetails(Time timeout) {
+	public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean includeRunning, boolean includeFinished, Time timeout) {
 		final int numberJobsRunning = jobManagerRunners.size();
 
 		ArrayList<CompletableFuture<JobDetails>> individualJobDetails = new ArrayList<>(numberJobsRunning);
@@ -300,6 +301,17 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 				new MultipleJobsDetails(jobDetails, null));
 	}
 
+	@Override
+	public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) {
+		final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId);
+
+		if (jobManagerRunner == null) {
+			return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId));
+		} else {
+			return jobManagerRunner.getJobManagerGateway().requestArchivedExecutionGraph(timeout);
+		}
+	}
+
 	/**
 	 * Cleans up the job related data from the dispatcher. If cleanupHA is true, then
 	 * the data will also be removed from HA.

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
index 46b0cd9..1d0a9dd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
@@ -80,7 +78,4 @@ public interface DispatcherGateway extends FencedRpcGateway<DispatcherId>, Restf
 	 * @param timeout of the operation
 	 * @return Future {@link StatusOverview} containing the cluster information
 	 */
-	CompletableFuture<StatusOverview> requestStatusOverview(@RpcTimeout Time timeout);
-
-	CompletableFuture<MultipleJobsDetails> requestJobDetails(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
index 0ff8884..782d6d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java
@@ -21,15 +21,12 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 
 import javax.annotation.Nullable;
@@ -125,37 +122,6 @@ public interface JobManagerGateway extends RestfulGateway {
 	CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout);
 
 	/**
-	 * Requests job details currently being executed by the JobManager.
-	 *
-	 * @param includeRunning true if running jobs shall be included, otherwise false
-	 * @param includeFinished true if finished jobs shall be included, otherwise false
-	 * @param timeout for the asynchronous operation
-	 * @return Future containing the job details
-	 */
-	CompletableFuture<MultipleJobsDetails> requestJobDetails(
-		boolean includeRunning,
-		boolean includeFinished,
-		Time timeout);
-
-	/**
-	 * Requests the AccessExecutionGraph for the given jobId. If there is no such graph, then
-	 * {@link Optional#empty()} is returned.
-	 *
-	 * @param jobId identifying the job whose AccessExecutionGraph is requested
-	 * @param timeout for the asynchronous operation
-	 * @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link Optional#empty()}
-	 */
-	CompletableFuture<Optional<AccessExecutionGraph>> requestJob(JobID jobId, Time timeout);
-
-	/**
-	 * Requests the status overview from the JobManager.
-	 *
-	 * @param timeout for the asynchronous operation
-	 * @return Future containing the status overview
-	 */
-	CompletableFuture<StatusOverview> requestStatusOverview(Time timeout);
-
-	/**
 	 * Requests the job overview from the JobManager.
 	 *
 	 * @param timeout for the asynchronous operation

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 0e53436..7efcc0b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
@@ -744,6 +745,11 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 		return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(executionGraph), executor);
 	}
 
+	@Override
+	public CompletableFuture<AccessExecutionGraph> requestArchivedExecutionGraph(Time timeout) {
+		return CompletableFuture.completedFuture(executionGraph.archive());
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Internal methods
 	//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index f3ca5be..0628976 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -232,5 +234,19 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRp
 	 */
 	void heartbeatFromResourceManager(final ResourceID resourceID);
 
+	/**
+	 * Request the details of the executed job.
+	 *
+	 * @param timeout for the rpc call
+	 * @return Future details of the executed job
+	 */
 	CompletableFuture<JobDetails> requestJobDetails(@RpcTimeout Time timeout);
+
+	/**
+	 * Request the {@link ArchivedExecutionGraph} of the currently executed job.
+	 *
+	 * @param timeout for the rpc call
+	 * @return Future archived execution graph derived from the currently executed job
+	 */
+	CompletableFuture<AccessExecutionGraph> requestArchivedExecutionGraph(@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java
index 95686ac..f606071 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java
@@ -22,13 +22,13 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.util.FlinkException;
 
 /**
- * Exception which is returned if a Flink job could not be found.
+ * Exception indicating that we could not find a Flink job with the given job ID.
  */
 public class FlinkJobNotFoundException extends FlinkException {
 
-	private static final long serialVersionUID = -7803390762010615384L;
+	private static final long serialVersionUID = 2294698055059659025L;
 
 	public FlinkJobNotFoundException(JobID jobId) {
-		super("Could not find Flink job (" + jobId + ").");
+		super("Could not find Flink job (" + jobId + ')');
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
index 88932b5..7fbde15 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
@@ -27,7 +27,6 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
@@ -38,11 +37,11 @@ import java.util.concurrent.Executor;
  */
 public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonRequestHandler {
 
-	private final ExecutionGraphHolder executionGraphHolder;
+	private final ExecutionGraphCache executionGraphCache;
 
-	public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+	public AbstractExecutionGraphRequestHandler(ExecutionGraphCache executionGraphCache, Executor executor) {
 		super(executor);
-		this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder);
+		this.executionGraphCache = Preconditions.checkNotNull(executionGraphCache);
 	}
 
 	@Override
@@ -63,16 +62,15 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
 			return FutureUtils.completedExceptionally(new FlinkException("Invalid JobID string '" + jidString + "'", e));
 		}
 
-		final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
+		final CompletableFuture<AccessExecutionGraph> graphFuture = executionGraphCache.getExecutionGraph(jid, jobManagerGateway);
 
-		return graphFuture.thenComposeAsync(
-			(Optional<AccessExecutionGraph> optGraph) -> {
-				if (optGraph.isPresent()) {
-					return handleRequest(optGraph.get(), pathParams);
-				} else {
-					throw new CompletionException(new NotFoundException("Could not find job with jobId " + jid + '.'));
-				}
-			}, executor);
+		return graphFuture
+			.exceptionally(
+				throwable -> {
+					throw new CompletionException(new NotFoundException("Could not find job " + jid + '.'));
+				})
+			.thenComposeAsync(
+				(AccessExecutionGraph executionGraph) -> handleRequest(executionGraph, pathParams));
 	}
 
 	public abstract CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params);

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
index e2e4484..70606e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
@@ -32,7 +32,7 @@ import java.util.concurrent.Executor;
  */
 public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler {
 
-	public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+	public AbstractJobVertexRequestHandler(ExecutionGraphCache executionGraphHolder, Executor executor) {
 		super(executionGraphHolder, executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java
index ec277d8..9a225f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java
@@ -35,7 +35,7 @@ import java.util.concurrent.Executor;
  */
 public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubtaskRequestHandler {
 
-	public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+	public AbstractSubtaskAttemptRequestHandler(ExecutionGraphCache executionGraphHolder, Executor executor) {
 		super(executionGraphHolder, executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java
index d69038a..b1797b0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java
@@ -34,7 +34,7 @@ import java.util.concurrent.Executor;
  */
 public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexRequestHandler {
 
-	public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+	public AbstractSubtaskRequestHandler(ExecutionGraphCache executionGraphHolder, Executor executor) {
 		super(executionGraphHolder, executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
index 669ef32..a6640a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
@@ -75,7 +75,7 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler imple
 
 	@Override
 	public CompletableFuture<MultipleJobsDetails> handleRequest(HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, DispatcherGateway gateway) {
-		return gateway.requestJobDetails(timeout);
+		return gateway.requestJobDetails(true, true, timeout);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
new file mode 100644
index 0000000..f63b042
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Cache for {@link AccessExecutionGraph} which are obtained from the Flink cluster. Every cache entry
+ * has an associated time to live after which a new request will trigger the reloading of the
+ * {@link AccessExecutionGraph} from the cluster.
+ */
+public class ExecutionGraphCache implements Closeable {
+
+	private final Time timeout;
+
+	private final Time timeToLive;
+
+	private final ConcurrentHashMap<JobID, ExecutionGraphEntry> cachedExecutionGraphs;
+
+	private volatile boolean running = true;
+
+	public ExecutionGraphCache(
+			Time timeout,
+			Time timeToLive) {
+		this.timeout = checkNotNull(timeout);
+		this.timeToLive = checkNotNull(timeToLive);
+
+		cachedExecutionGraphs = new ConcurrentHashMap<>(4);
+	}
+
+	@Override
+	public void close() {
+		running = false;
+
+		// clear all cached AccessExecutionGraphs
+		cachedExecutionGraphs.clear();
+	}
+
+	/**
+	 * Gets the number of cache entries.
+	 */
+	public int size() {
+		return cachedExecutionGraphs.size();
+	}
+
+	/**
+	 * Gets the {@link AccessExecutionGraph} for the given {@link JobID} and caches it. The
+	 * {@link AccessExecutionGraph} will be requested again after the refresh interval has passed
+	 * or if the graph could not be retrieved from the given gateway.
+	 *
+	 * @param jobId identifying the {@link AccessExecutionGraph} to get
+	 * @param restfulGateway to request the {@link AccessExecutionGraph} from
+	 * @return Future containing the requested {@link AccessExecutionGraph}
+	 */
+	public CompletableFuture<AccessExecutionGraph> getExecutionGraph(JobID jobId, RestfulGateway restfulGateway) {
+
+		Preconditions.checkState(running, "ExecutionGraphCache is no longer running");
+
+		while (true) {
+			final ExecutionGraphEntry oldEntry = cachedExecutionGraphs.get(jobId);
+
+			final long currentTime = System.currentTimeMillis();
+
+			if (oldEntry != null) {
+				if (currentTime < oldEntry.getTTL()) {
+					if (oldEntry.getExecutionGraphFuture().isDone() && !oldEntry.getExecutionGraphFuture().isCompletedExceptionally()) {
+
+						// TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph
+						try {
+							if (oldEntry.getExecutionGraphFuture().get().getState() != JobStatus.SUSPENDED) {
+								return oldEntry.getExecutionGraphFuture();
+							}
+							// send a new request to get the ExecutionGraph from the new leader
+						} catch (InterruptedException | ExecutionException e) {
+							throw new RuntimeException("Could not retrieve ExecutionGraph from the orderly completed future. This should never happen.", e);
+						}
+					} else if (!oldEntry.getExecutionGraphFuture().isDone()) {
+						return oldEntry.getExecutionGraphFuture();
+					}
+					// otherwise it must be completed exceptionally
+				}
+			}
+
+			final ExecutionGraphEntry newEntry = new ExecutionGraphEntry(currentTime + timeToLive.toMilliseconds());
+
+			final boolean successfulUpdate;
+
+			if (oldEntry == null) {
+				successfulUpdate = cachedExecutionGraphs.putIfAbsent(jobId, newEntry) == null;
+			} else {
+				successfulUpdate = cachedExecutionGraphs.replace(jobId, oldEntry, newEntry);
+				// cancel potentially outstanding futures
+				oldEntry.getExecutionGraphFuture().cancel(false);
+			}
+
+			if (successfulUpdate) {
+				final CompletableFuture<AccessExecutionGraph> executionGraphFuture = restfulGateway.requestJob(jobId, timeout);
+
+				executionGraphFuture.whenComplete(
+					(AccessExecutionGraph executionGraph, Throwable throwable) -> {
+						if (throwable != null) {
+							newEntry.getExecutionGraphFuture().completeExceptionally(throwable);
+
+							// remove exceptionally completed entry because it doesn't help
+							cachedExecutionGraphs.remove(jobId, newEntry);
+						} else {
+							newEntry.getExecutionGraphFuture().complete(executionGraph);
+
+							// TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph
+							if (executionGraph.getState() == JobStatus.SUSPENDED) {
+								// remove the entry in case of suspension --> triggers new request when accessed next time
+								cachedExecutionGraphs.remove(jobId, newEntry);
+							}
+						}
+					});
+
+				if (!running) {
+					// delete newly added entry in case of a concurrent stopping operation
+					cachedExecutionGraphs.remove(jobId, newEntry);
+				}
+
+				return newEntry.getExecutionGraphFuture();
+			}
+		}
+	}
+
+	/**
+	 * Perform the cleanup of out dated {@link ExecutionGraphEntry}.
+	 */
+	public void cleanup() {
+		long currentTime = System.currentTimeMillis();
+
+		// remove entries which have exceeded their time to live
+		cachedExecutionGraphs.values().removeIf(
+			(ExecutionGraphEntry entry) -> currentTime >= entry.getTTL());
+	}
+
+	/**
+	 * Wrapper containing the current execution graph and it's time to live (TTL).
+	 */
+	private static final class ExecutionGraphEntry {
+		private final long ttl;
+
+		private final CompletableFuture<AccessExecutionGraph> executionGraphFuture;
+
+		ExecutionGraphEntry(long ttl) {
+			this.ttl = ttl;
+			this.executionGraphFuture = new CompletableFuture<>();
+		}
+
+		public long getTTL() {
+			return ttl;
+		}
+
+		public CompletableFuture<AccessExecutionGraph> getExecutionGraphFuture() {
+			return executionGraphFuture;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java
deleted file mode 100644
index 8a47e50..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rest.handler.legacy;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-import java.util.WeakHashMap;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archive.
- *
- * <p>The holder will cache the ExecutionGraph behind a weak reference, which will be cleared
- * at some point once no one else is pointing to the ExecutionGraph.
- * Note that while the holder runs in the same JVM as the JobManager or Archive, the reference should
- * stay valid.
- */
-public class ExecutionGraphHolder {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);
-
-	private final Time timeout;
-
-	private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>();
-
-	public ExecutionGraphHolder(Time timeout) {
-		this.timeout = checkNotNull(timeout);
-	}
-
-	/**
-	 * Retrieves the execution graph with {@link JobID} jid wrapped in {@link Optional} or
-	 * {@link Optional#empty()} if it cannot be found.
-	 *
-	 * @param jid jobID of the execution graph to be retrieved
-	 * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph
-	 */
-	public CompletableFuture<Optional<AccessExecutionGraph>> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) {
-		AccessExecutionGraph cached = cache.get(jid);
-		if (cached != null) {
-			if (cached.getState() == JobStatus.SUSPENDED) {
-				cache.remove(jid);
-			} else {
-				return CompletableFuture.completedFuture(Optional.of(cached));
-			}
-		}
-
-		CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);
-
-		executionGraphFuture.thenAcceptAsync(
-			optExecutionGraph ->
-				optExecutionGraph.ifPresent(executionGraph -> cache.put(jid, executionGraph)));
-
-		return executionGraphFuture;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
index 68810eb..b2a2488 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
@@ -42,7 +42,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
 
 	private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators";
 
-	public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+	public JobAccumulatorsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) {
 		super(executionGraphHolder, executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
index 2750c33..d31af4c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
@@ -48,7 +48,6 @@ import java.nio.charset.Charset;
 import java.util.ArrayDeque;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
@@ -91,13 +90,13 @@ public class JobCancellationWithSavepointHandlers {
 	private final String defaultSavepointDirectory;
 
 	public JobCancellationWithSavepointHandlers(
-			ExecutionGraphHolder currentGraphs,
+			ExecutionGraphCache currentGraphs,
 			Executor executor) {
 		this(currentGraphs, executor, null);
 	}
 
 	public JobCancellationWithSavepointHandlers(
-			ExecutionGraphHolder currentGraphs,
+			ExecutionGraphCache currentGraphs,
 			Executor executor,
 			@Nullable String defaultSavepointDirectory) {
 
@@ -124,12 +123,12 @@ public class JobCancellationWithSavepointHandlers {
 	class TriggerHandler implements RequestHandler {
 
 		/** Current execution graphs. */
-		private final ExecutionGraphHolder currentGraphs;
+		private final ExecutionGraphCache currentGraphs;
 
 		/** Execution context for futures. */
 		private final Executor executor;
 
-		public TriggerHandler(ExecutionGraphHolder currentGraphs, Executor executor) {
+		public TriggerHandler(ExecutionGraphCache currentGraphs, Executor executor) {
 			this.currentGraphs = checkNotNull(currentGraphs);
 			this.executor = checkNotNull(executor);
 		}
@@ -148,39 +147,40 @@ public class JobCancellationWithSavepointHandlers {
 
 			if (jobManagerGateway != null) {
 				JobID jobId = JobID.fromHexString(pathParams.get("jobid"));
-				final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture;
+				final CompletableFuture<AccessExecutionGraph> graphFuture;
 
 				graphFuture = currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
 
-				return graphFuture.thenApplyAsync(
-					(Optional<AccessExecutionGraph> optGraph) -> {
-						final AccessExecutionGraph graph = optGraph.orElseThrow(
-							() -> new CompletionException(
-								new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
-
-						CheckpointCoordinatorConfiguration jobCheckpointingConfiguration = graph.getCheckpointCoordinatorConfiguration();
-						if (jobCheckpointingConfiguration == null) {
-							throw new CompletionException(new FlinkException("Cannot find checkpoint coordinator configuration for job."));
-						}
+				return graphFuture.handleAsync(
+					(AccessExecutionGraph graph, Throwable throwable) -> {
+						if (throwable != null) {
+							throw new CompletionException(new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'));
+						} else {
+							CheckpointCoordinatorConfiguration jobCheckpointingConfiguration = graph.getCheckpointCoordinatorConfiguration();
+							if (jobCheckpointingConfiguration == null) {
+								throw new CompletionException(new FlinkException("Cannot find checkpoint coordinator configuration for job."));
+							}
 
-						String targetDirectory = pathParams.get("targetDirectory");
-						if (targetDirectory == null) {
-							if (defaultSavepointDirectory == null) {
-								throw new IllegalStateException("No savepoint directory configured. " +
-									"You can either specify a directory when triggering this savepoint or " +
-									"configure a cluster-wide default via key '" +
-									CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
-							} else {
-								targetDirectory = defaultSavepointDirectory;
+							String targetDirectory = pathParams.get("targetDirectory");
+							if (targetDirectory == null) {
+								if (defaultSavepointDirectory == null) {
+									throw new IllegalStateException("No savepoint directory configured. " +
+										"You can either specify a directory when triggering this savepoint or " +
+										"configure a cluster-wide default via key '" +
+										CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
+								} else {
+									targetDirectory = defaultSavepointDirectory;
+								}
 							}
-						}
 
-						try {
-							return handleNewRequest(jobManagerGateway, jobId, targetDirectory, jobCheckpointingConfiguration.getCheckpointTimeout());
-						} catch (IOException e) {
-							throw new CompletionException(new FlinkException("Could not cancel job with savepoint.", e));
+							try {
+								return handleNewRequest(jobManagerGateway, jobId, targetDirectory, jobCheckpointingConfiguration.getCheckpointTimeout());
+							} catch (IOException e) {
+								throw new CompletionException(new FlinkException("Could not cancel job with savepoint.", e));
+							}
 						}
-					}, executor);
+					},
+					executor);
 			} else {
 				return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
index 787217f..2d40496 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
@@ -42,8 +42,8 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
 
 	private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config";
 
-	public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
-		super(executionGraphHolder, executor);
+	public JobConfigHandler(ExecutionGraphCache executionGraphCache, Executor executor) {
+		super(executionGraphCache, executor);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
index b9f812b..31b1478 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
@@ -60,7 +60,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
 
 	private final MetricFetcher fetcher;
 
-	public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+	public JobDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) {
 		super(executionGraphHolder, executor);
 		this.fetcher = fetcher;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
index 566631e..62ee85c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
@@ -48,7 +48,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler {
 
 	static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
 
-	public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+	public JobExceptionsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) {
 		super(executionGraphHolder, executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java
index d9db1ff..ed8c702 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java
@@ -36,7 +36,7 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
 
 	private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan";
 
-	public JobPlanHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+	public JobPlanHandler(ExecutionGraphCache executionGraphHolder, Executor executor) {
 		super(executionGraphHolder, executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
index d448027..90b1f8c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
@@ -44,7 +44,7 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle
 
 	private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators";
 
-	public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+	public JobVertexAccumulatorsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) {
 		super(executionGraphHolder, executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java
index 59bfc0b..fb79f46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java
@@ -52,7 +52,7 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle
 	private final int refreshInterval;
 
 	public JobVertexBackPressureHandler(
-			ExecutionGraphHolder executionGraphHolder,
+			ExecutionGraphCache executionGraphHolder,
 			Executor executor,
 			BackPressureStatsTracker backPressureStatsTracker,
 			int refreshInterval) {

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
index bfa7020..2ef5faa 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
@@ -53,7 +53,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
 
 	private final MetricFetcher fetcher;
 
-	public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+	public JobVertexDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) {
 		super(executionGraphHolder, executor);
 		this.fetcher = fetcher;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
index 985ea1e..d2d5985 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
@@ -55,7 +55,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle
 
 	private final MetricFetcher fetcher;
 
-	public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+	public JobVertexTaskManagersHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) {
 		super(executionGraphHolder, executor);
 		this.fetcher = fetcher;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java
index ff4fb46..2abdeaf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java
@@ -32,7 +32,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt
 
 	public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum";
 
-	public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+	public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) {
 		super(executionGraphHolder, executor, fetcher);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
index 1570896..3749776 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -47,7 +47,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA
 
 	private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators";
 
-	public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+	public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) {
 		super(executionGraphHolder, executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
index b0b22ee..5aa8312 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
@@ -55,7 +55,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp
 
 	private final MetricFetcher fetcher;
 
-	public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+	public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) {
 		super(executionGraphHolder, executor);
 		this.fetcher = fetcher;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
index 10d9e02..d1b607a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
@@ -46,7 +46,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand
 
 	private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = 	"/jobs/:jobid/vertices/:vertexid/subtasks/accumulators";
 
-	public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+	public SubtasksAllAccumulatorsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) {
 		super(executionGraphHolder, executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
index bf1d87e..a968ab6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
@@ -47,7 +47,7 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
 
 	private static final String SUBTASK_TIMES_REST_PATH = 	"/jobs/:jobid/vertices/:vertexid/subtasktimes";
 
-	public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+	public SubtasksTimesHandler(ExecutionGraphCache executionGraphHolder, Executor executor) {
 		super(executionGraphHolder, executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
index 6ab6676..69a59f5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -46,7 +46,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
 
 	private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config";
 
-	public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+	public CheckpointConfigHandler(ExecutionGraphCache executionGraphHolder, Executor executor) {
 		super(executionGraphHolder, executor);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
index b61c5d0..e277971 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -54,7 +54,7 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest
 
 	private final CheckpointStatsCache cache;
 
-	public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+	public CheckpointStatsDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
 		super(executionGraphHolder, executor);
 		this.cache = cache;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index 22a8db2..5420cf4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -59,7 +59,7 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap
 
 	private final CheckpointStatsCache cache;
 
-	public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+	public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphCache executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
 		super(executionGraphHolder, executor);
 		this.cache = checkNotNull(cache);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aae417f1/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
index bea94f2..bbfcd8a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
-import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -55,7 +55,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler
 
 	private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints";
 
-	public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+	public CheckpointStatsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) {
 		super(executionGraphHolder, executor);
 	}
 


Mime
View raw message