flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [3/3] flink git commit: [FLINK-7409] [web] Make WebRuntimeMonitor reactive
Date Sun, 03 Sep 2017 22:00:19 GMT
[FLINK-7409] [web] Make WebRuntimeMonitor reactive

This commit changes the behaviour of the WebRuntimeMonitor to not longer block serving
threads by waiting on the result of futures. Instead the RequestHandler now returns a
CompletableFuture<FullHttpResponse> which is written out to the Netty channel upon
completion. This will improve the performance of our WebRuntimeMonitor.

This closes #4527.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab1fbfdf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab1fbfdf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab1fbfdf

Branch: refs/heads/master
Commit: ab1fbfdfe6c1f2b6885710f98b9480cb90396ac0
Parents: 1804aa3
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Aug 10 10:56:12 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Sun Sep 3 23:43:58 2017 +0200

----------------------------------------------------------------------
 .../RetryRejectedExecutionFailureHandler.java   |   2 +-
 .../org/apache/flink/util/ExceptionUtils.java   |  13 +-
 .../webmonitor/ExecutionGraphHolder.java        |  16 +-
 .../webmonitor/RuntimeMonitorHandler.java       |  60 +++--
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  90 +++----
 .../AbstractExecutionGraphRequestHandler.java   |  35 ++-
 .../AbstractJobVertexRequestHandler.java        |  10 +-
 .../handlers/AbstractJsonRequestHandler.java    |  33 ++-
 .../AbstractSubtaskAttemptRequestHandler.java   |  22 +-
 .../handlers/AbstractSubtaskRequestHandler.java |  18 +-
 .../handlers/ClusterOverviewHandler.java        |  61 +++--
 .../handlers/CurrentJobIdsHandler.java          | 101 ++++----
 .../handlers/CurrentJobsOverviewHandler.java    |  89 ++++---
 .../handlers/DashboardConfigHandler.java        |   9 +-
 .../handlers/JarAccessDeniedHandler.java        |  10 +-
 .../webmonitor/handlers/JarActionHandler.java   |   4 +-
 .../webmonitor/handlers/JarDeleteHandler.java   |  62 +++--
 .../webmonitor/handlers/JarListHandler.java     | 157 ++++++------
 .../webmonitor/handlers/JarPlanHandler.java     |  43 ++--
 .../webmonitor/handlers/JarRunHandler.java      |  59 +++--
 .../webmonitor/handlers/JarUploadHandler.java   |  51 ++--
 .../handlers/JobAccumulatorsHandler.java        |  19 +-
 .../handlers/JobCancellationHandler.java        |  38 +--
 .../JobCancellationWithSavepointHandlers.java   | 158 ++++++------
 .../webmonitor/handlers/JobConfigHandler.java   |  20 +-
 .../webmonitor/handlers/JobDetailsHandler.java  |  19 +-
 .../handlers/JobExceptionsHandler.java          |  20 +-
 .../handlers/JobManagerConfigHandler.java       |  60 +++--
 .../webmonitor/handlers/JobPlanHandler.java     |  10 +-
 .../webmonitor/handlers/JobStoppingHandler.java |  38 +--
 .../handlers/JobVertexAccumulatorsHandler.java  |  20 +-
 .../handlers/JobVertexBackPressureHandler.java  |  17 +-
 .../handlers/JobVertexDetailsHandler.java       |  19 +-
 .../handlers/JobVertexTaskManagersHandler.java  |  19 +-
 .../webmonitor/handlers/RequestHandler.java     |   9 +-
 .../SubtaskCurrentAttemptDetailsHandler.java    |   8 +-
 ...taskExecutionAttemptAccumulatorsHandler.java |  19 +-
 .../SubtaskExecutionAttemptDetailsHandler.java  |  19 +-
 .../SubtasksAllAccumulatorsHandler.java         |  19 +-
 .../handlers/SubtasksTimesHandler.java          |  19 +-
 .../handlers/TaskManagersHandler.java           | 256 ++++++++++---------
 .../checkpoints/CheckpointConfigHandler.java    |  19 +-
 .../CheckpointStatsDetailsHandler.java          |  63 +++--
 .../CheckpointStatsDetailsSubtasksHandler.java  |  33 ++-
 .../checkpoints/CheckpointStatsHandler.java     |  19 +-
 .../metrics/AbstractMetricsHandler.java         |  27 +-
 .../metrics/JobManagerMetricsHandler.java       |   5 +-
 .../webmonitor/metrics/JobMetricsHandler.java   |   5 +-
 .../metrics/JobVertexMetricsHandler.java        |   5 +-
 .../metrics/TaskManagerMetricsHandler.java      |   5 +-
 .../handlers/ClusterOverviewHandlerTest.java    |   3 +-
 .../handlers/CurrentJobIdsHandlerTest.java      |   3 +-
 .../CurrentJobsOverviewHandlerTest.java         |   7 +-
 .../handlers/DashboardConfigHandlerTest.java    |   3 +-
 .../handlers/JarAccessDeniedHandlerTest.java    |   4 +-
 .../handlers/JarDeleteHandlerTest.java          |   4 +-
 .../webmonitor/handlers/JarListHandlerTest.java |   4 +-
 .../webmonitor/handlers/JarPlanHandlerTest.java |   4 +-
 .../webmonitor/handlers/JarRunHandlerTest.java  |   3 +-
 .../handlers/JarUploadHandlerTest.java          |   4 +-
 .../handlers/JobAccumulatorsHandlerTest.java    |   3 +-
 .../handlers/JobCancellationHandlerTest.java    |   3 +-
 ...obCancellationWithSavepointHandlersTest.java |  24 +-
 .../handlers/JobConfigHandlerTest.java          |   3 +-
 .../handlers/JobDetailsHandlerTest.java         |   3 +-
 .../handlers/JobExceptionsHandlerTest.java      |   3 +-
 .../handlers/JobManagerConfigHandlerTest.java   |   4 +-
 .../webmonitor/handlers/JobPlanHandlerTest.java |   3 +-
 .../handlers/JobStoppingHandlerTest.java        |   3 +-
 .../JobVertexAccumulatorsHandlerTest.java       |   3 +-
 .../JobVertexBackPressureHandlerTest.java       |  12 +-
 .../handlers/JobVertexDetailsHandlerTest.java   |   3 +-
 .../JobVertexTaskManagersHandlerTest.java       |   3 +-
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |   3 +-
 ...ExecutionAttemptAccumulatorsHandlerTest.java |   3 +-
 ...btaskExecutionAttemptDetailsHandlerTest.java |   3 +-
 .../SubtasksAllAccumulatorsHandlerTest.java     |   3 +-
 .../handlers/SubtasksTimesHandlerTest.java      |   3 +-
 .../handlers/TaskManagersHandlerTest.java       |   3 +-
 .../CheckpointConfigHandlerTest.java            |  15 +-
 .../CheckpointStatsDetailsHandlerTest.java      |  19 +-
 .../checkpoints/CheckpointStatsHandlerTest.java |   7 +-
 ...heckpointStatsSubtaskDetailsHandlerTest.java |  31 +--
 .../metrics/AbstractMetricsHandlerTest.java     |  20 +-
 .../metrics/JobManagerMetricsHandlerTest.java   |   6 +-
 .../metrics/JobMetricsHandlerTest.java          |   6 +-
 .../metrics/JobVertexMetricsHandlerTest.java    |   6 +-
 .../metrics/TaskManagerMetricsHandlerTest.java  |   6 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |   2 +-
 90 files changed, 1278 insertions(+), 901 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
index 9380959..3706257 100644
--- a/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
+++ b/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/util/RetryRejectedExecutionFailureHandler.java
@@ -36,7 +36,7 @@ public class RetryRejectedExecutionFailureHandler implements ActionRequestFailur
 
 	@Override
 	public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
-		if (ExceptionUtils.containsThrowable(failure, EsRejectedExecutionException.class)) {
+		if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
 			indexer.add(action);
 		} else {
 			// rethrow all other failures

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 9c8907b..d141ecb 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -276,27 +277,27 @@ public final class ExceptionUtils {
 	}
 
 	/**
-	 * Checks whether a throwable chain contains a specific type of exception.
+	 * Checks whether a throwable chain contains a specific type of exception and returns it.
 	 *
 	 * @param throwable the throwable chain to check.
 	 * @param searchType the type of exception to search for in the chain.
-	 * @return True, if the searched type is nested in the throwable, false otherwise.
+	 * @return Optional throwable of the requested type if available, otherwise empty
 	 */
-	public static boolean containsThrowable(Throwable throwable, Class<?> searchType) {
+	public static Optional<Throwable> findThrowable(Throwable throwable, Class<?> searchType) {
 		if (throwable == null || searchType == null) {
-			return false;
+			return Optional.empty();
 		}
 
 		Throwable t = throwable;
 		while (t != null) {
 			if (searchType.isAssignableFrom(t.getClass())) {
-				return true;
+				return Optional.of(t);
 			} else {
 				t = t.getCause();
 			}
 		}
 
-		return false;
+		return Optional.empty();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
index 739b375..8a96969 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
@@ -31,7 +31,6 @@ import org.slf4j.LoggerFactory;
 import java.util.Optional;
 import java.util.WeakHashMap;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -65,26 +64,23 @@ public class ExecutionGraphHolder {
 	 *
 	 * @param jid jobID of the execution graph to be retrieved
 	 * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph
-	 * @throws Exception if the ExecutionGraph retrieval failed.
 	 */
-	public Optional<AccessExecutionGraph> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) throws Exception {
+	public CompletableFuture<Optional<AccessExecutionGraph>> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) {
 		AccessExecutionGraph cached = cache.get(jid);
 		if (cached != null) {
 			if (cached.getState() == JobStatus.SUSPENDED) {
 				cache.remove(jid);
 			} else {
-				return Optional.of(cached);
+				return CompletableFuture.completedFuture(Optional.of(cached));
 			}
 		}
 
 		CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);
 
-		Optional<AccessExecutionGraph> result = executionGraphFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		executionGraphFuture.thenAcceptAsync(
+			optExecutionGraph ->
+				optExecutionGraph.ifPresent(executionGraph -> cache.put(jid, executionGraph)));
 
-		return result.map((executionGraph) -> {
-			cache.put(jid, executionGraph);
-
-			return executionGraph;
-		});
+		return executionGraphFuture;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 35d13dd..6305537 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.runtime.webmonitor.retriever.JobManagerRetriever;
@@ -45,6 +46,7 @@ import java.net.URLDecoder;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -88,7 +90,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 
 	@Override
 	protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, JobManagerGateway jobManagerGateway) {
-		FullHttpResponse response;
+		CompletableFuture<FullHttpResponse> responseFuture;
 
 		try {
 			// we only pass the first element in the list to the handlers.
@@ -106,29 +108,41 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase {
 			queryParams.put(WEB_MONITOR_ADDRESS_KEY,
 				(httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort());
 
-			response = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
+			responseFuture = handler.handleRequest(pathParams, queryParams, jobManagerGateway);
+		} catch (Exception e) {
+			responseFuture = FutureUtils.completedExceptionally(e);
 		}
-		catch (NotFoundException e) {
-			// this should result in a 404 error code (not found)
-			ByteBuf message = e.getMessage() == null ? Unpooled.buffer(0)
-					: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
-			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
-			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
-			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-			LOG.debug("Error while handling request", e);
-		}
-		catch (Exception e) {
-			byte[] bytes = ExceptionUtils.stringifyException(e).getBytes(ENCODING);
-			response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
-					HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
-			response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
-			response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
-
-			LOG.debug("Error while handling request", e);
-		}
-
-		response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
 
-		KeepAliveWrite.flush(ctx, routed.request(), response);
+		responseFuture.whenComplete(
+			(FullHttpResponse httpResponse, Throwable throwable) -> {
+				final FullHttpResponse finalResponse;
+
+				if (throwable != null) {
+					LOG.debug("Error while handling request.", throwable);
+
+					Optional<Throwable> optNotFound = ExceptionUtils.findThrowable(throwable, NotFoundException.class);
+
+					if (optNotFound.isPresent()) {
+						// this should result in a 404 error code (not found)
+						Throwable e = optNotFound.get();
+						ByteBuf message = e.getMessage() == null ? Unpooled.buffer(0)
+							: Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
+						finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
+						finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+						finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
+					} else {
+						byte[] bytes = ExceptionUtils.stringifyException(throwable).getBytes(ENCODING);
+						finalResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
+							HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
+						finalResponse.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=" + ENCODING.name());
+						finalResponse.headers().set(HttpHeaders.Names.CONTENT_LENGTH, finalResponse.content().readableBytes());
+					}
+				} else {
+					finalResponse = httpResponse;
+				}
+
+				finalResponse.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, allowOrigin);
+				KeepAliveWrite.flush(ctx, routed.request(), finalResponse);
+			});
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 17f02f0..e74541e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -232,41 +232,41 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 		Router router = new Router();
 		// config how to interact with this web server
-		get(router, new DashboardConfigHandler(cfg.getRefreshInterval()));
+		get(router, new DashboardConfigHandler(executor, cfg.getRefreshInterval()));
 
 		// the overview - how many task managers, slots, free slots, ...
-		get(router, new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT));
+		get(router, new ClusterOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT));
 
 		// job manager configuration
-		get(router, new JobManagerConfigHandler(config));
+		get(router, new JobManagerConfigHandler(executor, config));
 
 		// overview over jobs
-		get(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true));
-		get(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false));
-		get(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true));
-
-		get(router, new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT));
-
-		get(router, new JobDetailsHandler(currentGraphs, metricFetcher));
-
-		get(router, new JobVertexDetailsHandler(currentGraphs, metricFetcher));
-		get(router, new SubtasksTimesHandler(currentGraphs));
-		get(router, new JobVertexTaskManagersHandler(currentGraphs, metricFetcher));
-		get(router, new JobVertexAccumulatorsHandler(currentGraphs));
-		get(router, new JobVertexBackPressureHandler(currentGraphs,	backPressureStatsTracker, refreshInterval));
-		get(router, new JobVertexMetricsHandler(metricFetcher));
-		get(router, new SubtasksAllAccumulatorsHandler(currentGraphs));
-		get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, metricFetcher));
-		get(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, metricFetcher));
-		get(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs));
-
-		get(router, new JobPlanHandler(currentGraphs));
-		get(router, new JobConfigHandler(currentGraphs));
-		get(router, new JobExceptionsHandler(currentGraphs));
-		get(router, new JobAccumulatorsHandler(currentGraphs));
-		get(router, new JobMetricsHandler(metricFetcher));
-
-		get(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, metricFetcher));
+		get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, true));
+		get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, false));
+		get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, false, true));
+
+		get(router, new CurrentJobIdsHandler(executor, DEFAULT_REQUEST_TIMEOUT));
+
+		get(router, new JobDetailsHandler(currentGraphs, executor, metricFetcher));
+
+		get(router, new JobVertexDetailsHandler(currentGraphs, executor, metricFetcher));
+		get(router, new SubtasksTimesHandler(currentGraphs, executor));
+		get(router, new JobVertexTaskManagersHandler(currentGraphs, executor, metricFetcher));
+		get(router, new JobVertexAccumulatorsHandler(currentGraphs, executor));
+		get(router, new JobVertexBackPressureHandler(currentGraphs, executor,	backPressureStatsTracker, refreshInterval));
+		get(router, new JobVertexMetricsHandler(executor, metricFetcher));
+		get(router, new SubtasksAllAccumulatorsHandler(currentGraphs, executor));
+		get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, executor, metricFetcher));
+		get(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, executor, metricFetcher));
+		get(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs, executor));
+
+		get(router, new JobPlanHandler(currentGraphs, executor));
+		get(router, new JobConfigHandler(currentGraphs, executor));
+		get(router, new JobExceptionsHandler(currentGraphs, executor));
+		get(router, new JobAccumulatorsHandler(currentGraphs, executor));
+		get(router, new JobMetricsHandler(executor, metricFetcher));
+
+		get(router, new TaskManagersHandler(executor, DEFAULT_REQUEST_TIMEOUT, metricFetcher));
 		get(router,
 			new TaskManagerLogHandler(
 				retriever,
@@ -287,7 +287,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 				config,
 				enableSSL,
 				blobView));
-		get(router, new TaskManagerMetricsHandler(metricFetcher));
+		get(router, new TaskManagerMetricsHandler(executor, metricFetcher));
 
 		router
 			// log and stdout
@@ -299,48 +299,48 @@ public class WebRuntimeMonitor implements WebMonitor {
 				new StaticFileServerHandler(retriever, jobManagerAddressFuture, timeout, logFiles.stdOutFile,
 					enableSSL));
 
-		get(router, new JobManagerMetricsHandler(metricFetcher));
+		get(router, new JobManagerMetricsHandler(executor, metricFetcher));
 
 		// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
-		get(router, new JobCancellationHandler(timeout));
+		get(router, new JobCancellationHandler(executor, timeout));
 		// DELETE is the preferred way of canceling a job (Rest-conform)
-		delete(router, new JobCancellationHandler(timeout));
+		delete(router, new JobCancellationHandler(executor, timeout));
 
 		get(router, triggerHandler);
 		get(router, inProgressHandler);
 
 		// stop a job via GET (for proper integration with YARN this has to be performed via GET)
-		get(router, new JobStoppingHandler(timeout));
+		get(router, new JobStoppingHandler(executor, timeout));
 		// DELETE is the preferred way of stopping a job (Rest-conform)
-		delete(router, new JobStoppingHandler(timeout));
+		delete(router, new JobStoppingHandler(executor, timeout));
 
 		int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
 		CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);
 
 		// Register the checkpoint stats handlers
-		get(router, new CheckpointStatsHandler(currentGraphs));
-		get(router, new CheckpointConfigHandler(currentGraphs));
-		get(router, new CheckpointStatsDetailsHandler(currentGraphs, cache));
-		get(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, cache));
+		get(router, new CheckpointStatsHandler(currentGraphs, executor));
+		get(router, new CheckpointConfigHandler(currentGraphs, executor));
+		get(router, new CheckpointStatsDetailsHandler(currentGraphs, executor, cache));
+		get(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, executor, cache));
 
 		if (webSubmitAllow) {
 			// fetch the list of uploaded jars.
-			get(router, new JarListHandler(uploadDir));
+			get(router, new JarListHandler(executor, uploadDir));
 
 			// get plan for an uploaded jar
-			get(router, new JarPlanHandler(uploadDir));
+			get(router, new JarPlanHandler(executor, uploadDir));
 
 			// run a jar
-			post(router, new JarRunHandler(uploadDir, timeout, config));
+			post(router, new JarRunHandler(executor, uploadDir, timeout, config));
 
 			// upload a jar
-			post(router, new JarUploadHandler(uploadDir));
+			post(router, new JarUploadHandler(executor, uploadDir));
 
 			// delete an uploaded jar from submission interface
-			delete(router, new JarDeleteHandler(uploadDir));
+			delete(router, new JarDeleteHandler(executor, uploadDir));
 		} else {
 			// send an Access Denied message
-			JarAccessDeniedHandler jad = new JarAccessDeniedHandler();
+			JarAccessDeniedHandler jad = new JarAccessDeniedHandler(executor);
 			get(router, jad);
 			post(router, jad);
 			delete(router, jad);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index 89108db..053d3f7 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -28,6 +30,8 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for request handlers whose response depends on an ExecutionGraph
@@ -37,12 +41,16 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
 
 	private final ExecutionGraphHolder executionGraphHolder;
 
-	public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder) {
+	public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executor);
 		this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder);
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
+	public CompletableFuture<String> handleJsonRequest(
+			Map<String, String> pathParams,
+			Map<String, String> queryParams,
+			JobManagerGateway jobManagerGateway) {
 		String jidString = pathParams.get("jobid");
 		if (jidString == null) {
 			throw new RuntimeException("JobId parameter missing");
@@ -53,21 +61,20 @@ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonR
 			jid = JobID.fromHexString(jidString);
 		}
 		catch (Exception e) {
-			throw new RuntimeException("Invalid JobID string '" + jidString + "': " + e.getMessage());
+			return FutureUtils.completedExceptionally(new FlinkException("Invalid JobID string '" + jidString + "'", e));
 		}
 
-		final Optional<AccessExecutionGraph> optGraph;
+		final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
 
-		try {
-			optGraph = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
-		} catch (Exception e) {
-			throw new FlinkException("Could not retrieve ExecutionGraph for job with jobId " + jid + " from the JobManager.", e);
-		}
-
-		final AccessExecutionGraph graph = optGraph.orElseThrow(() -> new NotFoundException("Could not find job with jobId " + jid + '.'));
-
-		return handleRequest(graph, pathParams);
+		return graphFuture.thenComposeAsync(
+			(Optional<AccessExecutionGraph> optGraph) -> {
+				if (optGraph.isPresent()) {
+					return handleRequest(optGraph.get(), pathParams);
+				} else {
+					throw new FlinkFutureException(new NotFoundException("Could not find job with jobId " + jid + '.'));
+				}
+			}, executor);
 	}
 
-	public abstract String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception;
+	public abstract CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
index af9fc6c..df09225 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
@@ -24,6 +24,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for request handlers whose response depends on a specific job vertex (defined
@@ -31,12 +33,12 @@ import java.util.Map;
  */
 public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler {
 
-	public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
-	public final String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
+	public final CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
 		final JobVertexID vid = parseJobVertexId(params);
 
 		final AccessExecutionJobVertex jobVertex = graph.getJobVertex(vid);
@@ -66,5 +68,5 @@ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionG
 		}
 	}
 
-	public abstract String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception;
+	public abstract CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
index 266ffb0..1ec3f9c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
@@ -29,6 +30,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
 
 import java.nio.charset.Charset;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for most request handlers. The handlers must produce a JSON response.
@@ -37,18 +40,28 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
 
 	private static final Charset ENCODING = Charset.forName("UTF-8");
 
+	protected final Executor executor;
+
+	protected AbstractJsonRequestHandler(Executor executor) {
+		this.executor = Preconditions.checkNotNull(executor);
+	}
+
 	@Override
-	public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		String result = handleJsonRequest(pathParams, queryParams, jobManagerGateway);
-		byte[] bytes = result.getBytes(ENCODING);
+	public CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		CompletableFuture<String> resultFuture = handleJsonRequest(pathParams, queryParams, jobManagerGateway);
+
+		return resultFuture.thenApplyAsync(
+			(String result) -> {
+				byte[] bytes = result.getBytes(ENCODING);
 
-		DefaultFullHttpResponse response = new DefaultFullHttpResponse(
-				HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
+				DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+					HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
 
-		response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
-		response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
+				response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; charset=" + ENCODING.name());
+				response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes());
 
-		return response;
+				return response;
+			});
 	}
 
 	/**
@@ -66,9 +79,9 @@ public abstract class AbstractJsonRequestHandler implements RequestHandler {
 	 *         response with the exception message, other exceptions will cause a HTTP 500 response
 	 *         with the exception stack trace.
 	 */
-	public abstract String handleJsonRequest(
+	public abstract CompletableFuture<String> handleJsonRequest(
 			Map<String, String> pathParams,
 			Map<String, String> queryParams,
-			JobManagerGateway jobManagerGateway) throws Exception;
+			JobManagerGateway jobManagerGateway);
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
index 2792008..1b20673 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.util.FlinkException;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for request handlers whose response depends on a specific subtask execution attempt
@@ -32,15 +36,15 @@ import java.util.Map;
  */
 public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubtaskRequestHandler {
 
-	public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception {
+	public CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params) {
 		final String attemptNumberString = params.get("attempt");
 		if (attemptNumberString == null) {
-			throw new RuntimeException("Attempt number parameter missing");
+			return FutureUtils.completedExceptionally(new FlinkException("Attempt number parameter missing"));
 		}
 
 		final int attempt;
@@ -48,7 +52,7 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
 			attempt = Integer.parseInt(attemptNumberString);
 		}
 		catch (NumberFormatException e) {
-			throw new RuntimeException("Invalid attempt number parameter");
+			return FutureUtils.completedExceptionally(new FlinkException("Invalid attempt number parameter"));
 		}
 
 		final AccessExecution currentAttempt = vertex.getCurrentExecutionAttempt();
@@ -61,14 +65,14 @@ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubta
 			if (exec != null) {
 				return handleRequest(exec, params);
 			} else {
-				throw new RequestHandlerException("Execution for attempt " + attempt +
-					" has already been deleted.");
+				return FutureUtils.completedExceptionally(new RequestHandlerException("Execution for attempt " + attempt +
+					" has already been deleted."));
 			}
 		}
 		else {
-			throw new RuntimeException("Attempt does not exist: " + attempt);
+			return FutureUtils.completedExceptionally(new FlinkException("Attempt does not exist: " + attempt));
 		}
 	}
 
-	public abstract String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception;
+	public abstract CompletableFuture<String> handleRequest(AccessExecution execAttempt, Map<String, String> params);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
index b977228..ab85034 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
@@ -18,11 +18,15 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.util.FlinkException;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Base class for request handlers whose response depends on a specific subtask (defined via the
@@ -31,15 +35,15 @@ import java.util.Map;
  */
 public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexRequestHandler {
 
-	public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
-	public final String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception {
+	public final CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
 		final String subtaskNumberString = params.get("subtasknum");
 		if (subtaskNumberString == null) {
-			throw new RuntimeException("Subtask number parameter missing");
+			return FutureUtils.completedExceptionally(new FlinkException("Subtask number parameter missing"));
 		}
 
 		final int subtask;
@@ -47,16 +51,16 @@ public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexReq
 			subtask = Integer.parseInt(subtaskNumberString);
 		}
 		catch (NumberFormatException e) {
-			throw new RuntimeException("Invalid subtask number parameter");
+			return FutureUtils.completedExceptionally(new FlinkException("Invalid subtask number parameter", e));
 		}
 
 		if (subtask < 0 || subtask >= jobVertex.getParallelism()) {
-			throw new RuntimeException("subtask does not exist: " + subtask);
+			return FutureUtils.completedExceptionally(new FlinkException("subtask does not exist: " + subtask));
 		}
 
 		final AccessExecutionVertex vertex = jobVertex.getTaskVertices()[subtask];
 		return handleRequest(vertex, params);
 	}
 
-	public abstract String handleRequest(AccessExecutionVertex vertex, Map<String, String> params) throws Exception;
+	public abstract CompletableFuture<String> handleRequest(AccessExecutionVertex vertex, Map<String, String> params);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index 4ebc4e7..17db2e8 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -19,16 +19,20 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
 import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.FlinkException;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -46,7 +50,8 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 
 	private final Time timeout;
 
-	public ClusterOverviewHandler(Time timeout) {
+	public ClusterOverviewHandler(Executor executor, Time timeout) {
+		super(executor);
 		this.timeout = checkNotNull(timeout);
 	}
 
@@ -56,39 +61,45 @@ public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
 		// we need no parameters, get all requests
 		try {
 			if (jobManagerGateway != null) {
 				CompletableFuture<StatusOverview> overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
 
-				StatusOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
-				StringWriter writer = new StringWriter();
-				JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-				gen.writeStartObject();
-				gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
-				gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
-				gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
-				gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
-				gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
-				gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
-				gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
-				gen.writeStringField("flink-version", version);
-				if (!commitID.equals(EnvironmentInformation.UNKNOWN)) {
-					gen.writeStringField("flink-commit", commitID);
-				}
-				gen.writeEndObject();
-
-				gen.close();
-				return writer.toString();
+				return overviewFuture.thenApplyAsync(
+					(StatusOverview overview) -> {
+						StringWriter writer = new StringWriter();
+						try {
+							JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+							gen.writeStartObject();
+							gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
+							gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
+							gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
+							gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
+							gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
+							gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
+							gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
+							gen.writeStringField("flink-version", version);
+							if (!commitID.equals(EnvironmentInformation.UNKNOWN)) {
+								gen.writeStringField("flink-commit", commitID);
+							}
+							gen.writeEndObject();
+
+							gen.close();
+							return writer.toString();
+						} catch (IOException exception) {
+							throw new FlinkFutureException("Could not write cluster overview.", exception);
+						}
+					},
+					executor);
 			} else {
 				throw new Exception("No connection to the leading JobManager.");
 			}
 		}
 		catch (Exception e) {
-			throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(), e);
+			return FutureUtils.completedExceptionally(new FlinkException("Failed to fetch list of all running jobs: ", e));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 778a300..acf1cd0 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 
@@ -28,6 +29,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.Objects.requireNonNull;
@@ -43,7 +45,8 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 
 	private final Time timeout;
 
-	public CurrentJobIdsHandler(Time timeout) {
+	public CurrentJobIdsHandler(Executor executor, Time timeout) {
+		super(executor);
 		this.timeout = requireNonNull(timeout);
 	}
 
@@ -53,53 +56,57 @@ public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		// we need no parameters, get all requests
-		try {
-			if (jobManagerGateway != null) {
-				CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
-				JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
-				StringWriter writer = new StringWriter();
-				JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-				gen.writeStartObject();
-
-				gen.writeArrayFieldStart("jobs-running");
-				for (JobID jid : overview.getJobsRunningOrPending()) {
-					gen.writeString(jid.toString());
-				}
-				gen.writeEndArray();
-
-				gen.writeArrayFieldStart("jobs-finished");
-				for (JobID jid : overview.getJobsFinished()) {
-					gen.writeString(jid.toString());
-				}
-				gen.writeEndArray();
-
-				gen.writeArrayFieldStart("jobs-cancelled");
-				for (JobID jid : overview.getJobsCancelled()) {
-					gen.writeString(jid.toString());
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				// we need no parameters, get all requests
+				try {
+					if (jobManagerGateway != null) {
+						CompletableFuture<JobsWithIDsOverview> overviewFuture = jobManagerGateway.requestJobsOverview(timeout);
+						JobsWithIDsOverview overview = overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+						StringWriter writer = new StringWriter();
+						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+						gen.writeStartObject();
+
+						gen.writeArrayFieldStart("jobs-running");
+						for (JobID jid : overview.getJobsRunningOrPending()) {
+							gen.writeString(jid.toString());
+						}
+						gen.writeEndArray();
+
+						gen.writeArrayFieldStart("jobs-finished");
+						for (JobID jid : overview.getJobsFinished()) {
+							gen.writeString(jid.toString());
+						}
+						gen.writeEndArray();
+
+						gen.writeArrayFieldStart("jobs-cancelled");
+						for (JobID jid : overview.getJobsCancelled()) {
+							gen.writeString(jid.toString());
+						}
+						gen.writeEndArray();
+
+						gen.writeArrayFieldStart("jobs-failed");
+						for (JobID jid : overview.getJobsFailed()) {
+							gen.writeString(jid.toString());
+						}
+						gen.writeEndArray();
+
+						gen.writeEndObject();
+
+						gen.close();
+						return writer.toString();
+					}
+					else {
+						throw new Exception("No connection to the leading JobManager.");
+					}
 				}
-				gen.writeEndArray();
-
-				gen.writeArrayFieldStart("jobs-failed");
-				for (JobID jid : overview.getJobsFailed()) {
-					gen.writeString(jid.toString());
+				catch (Exception e) {
+					throw new FlinkFutureException("Failed to fetch list of all running jobs.", e);
 				}
-				gen.writeEndArray();
-
-				gen.writeEndObject();
-
-				gen.close();
-				return writer.toString();
-			}
-			else {
-				throw new Exception("No connection to the leading JobManager.");
-			}
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Failed to fetch list of all running jobs: " + e.getMessage(), e);
-		}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index b324426..a5b116c 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -36,7 +38,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -55,10 +57,12 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 	private final boolean includeFinishedJobs;
 
 	public CurrentJobsOverviewHandler(
+			Executor executor,
 			Time timeout,
 			boolean includeRunningJobs,
 			boolean includeFinishedJobs) {
 
+		super(executor);
 		this.timeout = checkNotNull(timeout);
 		this.includeRunningJobs = includeRunningJobs;
 		this.includeFinishedJobs = includeFinishedJobs;
@@ -77,49 +81,50 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			if (jobManagerGateway != null) {
-				CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
-				MultipleJobsDetails result = jobDetailsFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
-				final long now = System.currentTimeMillis();
-
-				StringWriter writer = new StringWriter();
-				JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-				gen.writeStartObject();
-
-				if (includeRunningJobs && includeFinishedJobs) {
-					gen.writeArrayFieldStart("running");
-					for (JobDetails detail : result.getRunningJobs()) {
-						writeJobDetailOverviewAsJson(detail, gen, now);
-					}
-					gen.writeEndArray();
-
-					gen.writeArrayFieldStart("finished");
-					for (JobDetails detail : result.getFinishedJobs()) {
-						writeJobDetailOverviewAsJson(detail, gen, now);
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		if (jobManagerGateway != null) {
+			CompletableFuture<MultipleJobsDetails> jobDetailsFuture = jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, timeout);
+
+			return jobDetailsFuture.thenApplyAsync(
+				(MultipleJobsDetails result) -> {
+					final long now = System.currentTimeMillis();
+
+					StringWriter writer = new StringWriter();
+					try {
+						JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+						gen.writeStartObject();
+
+						if (includeRunningJobs && includeFinishedJobs) {
+							gen.writeArrayFieldStart("running");
+							for (JobDetails detail : result.getRunningJobs()) {
+								writeJobDetailOverviewAsJson(detail, gen, now);
+							}
+							gen.writeEndArray();
+
+							gen.writeArrayFieldStart("finished");
+							for (JobDetails detail : result.getFinishedJobs()) {
+								writeJobDetailOverviewAsJson(detail, gen, now);
+							}
+							gen.writeEndArray();
+						} else {
+							gen.writeArrayFieldStart("jobs");
+							for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
+								writeJobDetailOverviewAsJson(detail, gen, now);
+							}
+							gen.writeEndArray();
+						}
+
+						gen.writeEndObject();
+						gen.close();
+						return writer.toString();
+					} catch (IOException e) {
+						throw new FlinkFutureException("Could not write current jobs overview json.", e);
 					}
-					gen.writeEndArray();
-				}
-				else {
-					gen.writeArrayFieldStart("jobs");
-					for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
-						writeJobDetailOverviewAsJson(detail, gen, now);
-					}
-					gen.writeEndArray();
-				}
-
-				gen.writeEndObject();
-				gen.close();
-				return writer.toString();
-			}
-			else {
-				throw new Exception("No connection to the leading JobManager.");
-			}
+				},
+				executor);
 		}
-		catch (Exception e) {
-			throw new Exception("Failed to fetch the status overview: " + e.getMessage(), e);
+		else {
+			return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager."));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index fe1d06b..39984b1 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -27,6 +27,8 @@ import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.TimeZone;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Responder that returns the parameters that define how the asynchronous requests
@@ -39,7 +41,8 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
 
 	private final String configString;
 
-	public DashboardConfigHandler(long refreshInterval) {
+	public DashboardConfigHandler(Executor executor, long refreshInterval) {
+		super(executor);
 		try {
 			this.configString = createConfigJson(refreshInterval);
 		}
@@ -55,8 +58,8 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		return this.configString;
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.completedFuture(configString);
 	}
 
 	public static String createConfigJson(long refreshInterval) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index db55169..978432b 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Handler to deny access to jar-related REST calls.
@@ -30,6 +32,10 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
 	private static final String ERROR_MESSAGE = "{\"error\": \"Web submission interface is not " +
 			"available for this cluster. To enable it, set the configuration key ' jobmanager.web.submit.enable.'\"}";
 
+	public JarAccessDeniedHandler(Executor executor) {
+		super(executor);
+	}
+
 	@Override
 	public String[] getPaths() {
 		return new String[]{
@@ -42,7 +48,7 @@ public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		return ERROR_MESSAGE;
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.completedFuture(ERROR_MESSAGE);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index d86a21b..0b0d32e 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -46,6 +46,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Abstract handler for fetching plan for a jar or running a jar.
@@ -54,7 +55,8 @@ public abstract class JarActionHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
-	public JarActionHandler(File jarDirectory) {
+	public JarActionHandler(Executor executor, File jarDirectory) {
+		super(executor);
 		jarDir = jarDirectory;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 73771bd..d9df1d4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 
 import com.fasterxml.jackson.core.JsonGenerator;
@@ -26,6 +27,8 @@ import java.io.File;
 import java.io.FilenameFilter;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Handles requests for deletion of jars.
@@ -36,7 +39,8 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
-	public JarDeleteHandler(File jarDirectory) {
+	public JarDeleteHandler(Executor executor, File jarDirectory) {
+		super(executor);
 		jarDir = jarDirectory;
 	}
 
@@ -46,33 +50,37 @@ public class JarDeleteHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
 		final String file = pathParams.get("jarid");
-		try {
-			File[] list = jarDir.listFiles(new FilenameFilter() {
-				@Override
-				public boolean accept(File dir, String name) {
-					return name.equals(file);
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					File[] list = jarDir.listFiles(new FilenameFilter() {
+						@Override
+						public boolean accept(File dir, String name) {
+							return name.equals(file);
+						}
+					});
+					boolean success = false;
+					for (File f: list) {
+						// although next to impossible for multiple files, we still delete them.
+						success = success || f.delete();
+					}
+					StringWriter writer = new StringWriter();
+					JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+					gen.writeStartObject();
+					if (!success) {
+						// this seems to always fail on Windows.
+						gen.writeStringField("error", "The requested jar couldn't be deleted. Please try again.");
+					}
+					gen.writeEndObject();
+					gen.close();
+					return writer.toString();
 				}
-			});
-			boolean success = false;
-			for (File f: list) {
-				// although next to impossible for multiple files, we still delete them.
-				success = success || f.delete();
-			}
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-			gen.writeStartObject();
-			if (!success) {
-				// this seems to always fail on Windows.
-				gen.writeStringField("error", "The requested jar couldn't be deleted. Please try again.");
-			}
-			gen.writeEndObject();
-			gen.close();
-			return writer.toString();
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Failed to delete jar id " + pathParams.get("jarid") + ": " + e.getMessage(), e);
-		}
+				catch (Exception e) {
+					throw new FlinkFutureException("Failed to delete jar id " + pathParams.get("jarid") + '.', e);
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 4f9b188..95281a4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
 
@@ -29,6 +30,8 @@ import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
 
@@ -41,7 +44,8 @@ public class JarListHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
-	public  JarListHandler(File jarDirectory) {
+	public  JarListHandler(Executor executor, File jarDirectory) {
+		super(executor);
 		jarDir = jarDirectory;
 	}
 
@@ -51,88 +55,93 @@ public class JarListHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-			gen.writeStartObject();
-			gen.writeStringField("address", queryParams.get(RuntimeMonitorHandler.WEB_MONITOR_ADDRESS_KEY));
-			gen.writeArrayFieldStart("files");
-
-			File[] list = jarDir.listFiles(new FilenameFilter() {
-				@Override
-				public boolean accept(File dir, String name) {
-					return name.endsWith(".jar");
-				}
-			});
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					StringWriter writer = new StringWriter();
+					JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
 
-			for (File f : list) {
-				// separate the uuid and the name parts.
-				String id = f.getName();
+					gen.writeStartObject();
+					gen.writeStringField("address", queryParams.get(RuntimeMonitorHandler.WEB_MONITOR_ADDRESS_KEY));
+					gen.writeArrayFieldStart("files");
 
-				int startIndex = id.indexOf("_");
-				if (startIndex < 0) {
-					continue;
-				}
-				String name = id.substring(startIndex + 1);
-				if (name.length() < 5 || !name.endsWith(".jar")) {
-					continue;
-				}
+					File[] list = jarDir.listFiles(new FilenameFilter() {
+						@Override
+						public boolean accept(File dir, String name) {
+							return name.endsWith(".jar");
+						}
+					});
 
-				gen.writeStartObject();
-				gen.writeStringField("id", id);
-				gen.writeStringField("name", name);
-				gen.writeNumberField("uploaded", f.lastModified());
-				gen.writeArrayFieldStart("entry");
+					for (File f : list) {
+						// separate the uuid and the name parts.
+						String id = f.getName();
 
-				String[] classes = new String[0];
-				try {
-					JarFile jar = new JarFile(f);
-					Manifest manifest = jar.getManifest();
-					String assemblerClass = null;
-
-					if (manifest != null) {
-						assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
-						if (assemblerClass == null) {
-							assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
+						int startIndex = id.indexOf("_");
+						if (startIndex < 0) {
+							continue;
+						}
+						String name = id.substring(startIndex + 1);
+						if (name.length() < 5 || !name.endsWith(".jar")) {
+							continue;
 						}
-					}
-					if (assemblerClass != null) {
-						classes = assemblerClass.split(",");
-					}
-				} catch (IOException ignored) {
-					// we simply show no entries here
-				}
-
-				// show every entry class that can be loaded later on.
-				for (String clazz : classes) {
-					clazz = clazz.trim();
 
-					PackagedProgram program = null;
-					try {
-						program = new PackagedProgram(f, clazz, new String[0]);
-					} catch (Exception ignored) {
-						// ignore jar files which throw an error upon creating a PackagedProgram
-					}
-					if (program != null) {
 						gen.writeStartObject();
-						gen.writeStringField("name", clazz);
-						String desc = program.getDescription();
-						gen.writeStringField("description", desc == null ? "No description provided" : desc);
+						gen.writeStringField("id", id);
+						gen.writeStringField("name", name);
+						gen.writeNumberField("uploaded", f.lastModified());
+						gen.writeArrayFieldStart("entry");
+
+						String[] classes = new String[0];
+						try {
+							JarFile jar = new JarFile(f);
+							Manifest manifest = jar.getManifest();
+							String assemblerClass = null;
+
+							if (manifest != null) {
+								assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_ASSEMBLER_CLASS);
+								if (assemblerClass == null) {
+									assemblerClass = manifest.getMainAttributes().getValue(PackagedProgram.MANIFEST_ATTRIBUTE_MAIN_CLASS);
+								}
+							}
+							if (assemblerClass != null) {
+								classes = assemblerClass.split(",");
+							}
+						} catch (IOException ignored) {
+							// we simply show no entries here
+						}
+
+						// show every entry class that can be loaded later on.
+						for (String clazz : classes) {
+							clazz = clazz.trim();
+
+							PackagedProgram program = null;
+							try {
+								program = new PackagedProgram(f, clazz, new String[0]);
+							} catch (Exception ignored) {
+								// ignore jar files which throw an error upon creating a PackagedProgram
+							}
+							if (program != null) {
+								gen.writeStartObject();
+								gen.writeStringField("name", clazz);
+								String desc = program.getDescription();
+								gen.writeStringField("description", desc == null ? "No description provided" : desc);
+								gen.writeEndObject();
+							}
+						}
+						gen.writeEndArray();
 						gen.writeEndObject();
 					}
+					gen.writeEndArray();
+					gen.writeEndObject();
+					gen.close();
+					return writer.toString();
 				}
-				gen.writeEndArray();
-				gen.writeEndObject();
-			}
-			gen.writeEndArray();
-			gen.writeEndObject();
-			gen.close();
-			return writer.toString();
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Failed to fetch jar list: " + e.getMessage(), e);
-		}
+				catch (Exception e) {
+					throw new FlinkFutureException("Failed to fetch jar list.", e);
+				}
+			},
+			executor);
+
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index b239160..b117b3d 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
@@ -27,6 +28,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import java.io.File;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * This handler handles requests to fetch plan for a jar.
@@ -35,8 +38,8 @@ public class JarPlanHandler extends JarActionHandler {
 
 	static final String JAR_PLAN_REST_PATH = "/jars/:jarid/plan";
 
-	public JarPlanHandler(File jarDirectory) {
-		super(jarDirectory);
+	public JarPlanHandler(Executor executor, File jarDirectory) {
+		super(executor, jarDirectory);
 	}
 
 	@Override
@@ -45,21 +48,25 @@ public class JarPlanHandler extends JarActionHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
-			JobGraph graph = getJobGraphAndClassLoader(config).f0;
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-			gen.writeStartObject();
-			gen.writeFieldName("plan");
-			gen.writeRawValue(JsonPlanGenerator.generatePlan(graph));
-			gen.writeEndObject();
-			gen.close();
-			return writer.toString();
-		}
-		catch (Exception e) {
-			return sendError(e);
-		}
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+					JobGraph graph = getJobGraphAndClassLoader(config).f0;
+					StringWriter writer = new StringWriter();
+					JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+					gen.writeStartObject();
+					gen.writeFieldName("plan");
+					gen.writeRawValue(JsonPlanGenerator.generatePlan(graph));
+					gen.writeEndObject();
+					gen.close();
+					return writer.toString();
+				}
+				catch (Exception e) {
+					throw new FlinkFutureException(e);
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 12ffa4f..7ada0b4 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobClient;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.util.Preconditions;
@@ -33,6 +34,8 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import java.io.File;
 import java.io.StringWriter;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * This handler handles requests to fetch plan for a jar.
@@ -44,8 +47,8 @@ public class JarRunHandler extends JarActionHandler {
 	private final Time timeout;
 	private final Configuration clientConfig;
 
-	public JarRunHandler(File jarDirectory, Time timeout, Configuration clientConfig) {
-		super(jarDirectory);
+	public JarRunHandler(Executor executor, File jarDirectory, Time timeout, Configuration clientConfig) {
+		super(executor, jarDirectory);
 		this.timeout = Preconditions.checkNotNull(timeout);
 		this.clientConfig = Preconditions.checkNotNull(clientConfig);
 	}
@@ -56,31 +59,35 @@ public class JarRunHandler extends JarActionHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) throws Exception {
-		try {
-			JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
-			Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
+	public CompletableFuture<String> handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, JobManagerGateway jobManagerGateway) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					JarActionHandlerConfig config = JarActionHandlerConfig.fromParams(pathParams, queryParams);
+					Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(config);
 
-			try {
-				JobClient.submitJobDetached(
-					jobManagerGateway,
-					clientConfig,
-					graph.f0,
-					timeout,
-					graph.f1);
-			} catch (JobExecutionException e) {
-				throw new ProgramInvocationException("Failed to submit the job to the job manager", e);
-			}
+					try {
+						JobClient.submitJobDetached(
+							jobManagerGateway,
+							clientConfig,
+							graph.f0,
+							timeout,
+							graph.f1);
+					} catch (JobExecutionException e) {
+						throw new ProgramInvocationException("Failed to submit the job to the job manager", e);
+					}
 
-			StringWriter writer = new StringWriter();
-			JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-			gen.writeStartObject();
-			gen.writeStringField("jobid", graph.f0.getJobID().toString());
-			gen.writeEndObject();
-			gen.close();
-			return writer.toString();
-		} catch (Exception e) {
-			return sendError(e);
-		}
+					StringWriter writer = new StringWriter();
+					JsonGenerator gen = JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+					gen.writeStartObject();
+					gen.writeStringField("jobid", graph.f0.getJobID().toString());
+					gen.writeEndObject();
+					gen.close();
+					return writer.toString();
+				} catch (Exception e) {
+					throw new FlinkFutureException("Could not run the jar.", e);
+				}
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 705c321..61b3f58 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -23,6 +23,8 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import java.io.File;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Handles requests for uploading of jars.
@@ -33,7 +35,8 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
 
 	private final File jarDir;
 
-	public JarUploadHandler(File jarDir) {
+	public JarUploadHandler(Executor executor, File jarDir) {
+		super(executor);
 		this.jarDir = jarDir;
 	}
 
@@ -43,34 +46,38 @@ public class JarUploadHandler extends AbstractJsonRequestHandler {
 	}
 
 	@Override
-	public String handleJsonRequest(
+	public CompletableFuture<String> handleJsonRequest(
 			Map<String, String> pathParams,
 			Map<String, String> queryParams,
-			JobManagerGateway jobManagerGateway) throws Exception {
+			JobManagerGateway jobManagerGateway) {
 
 		String tempFilePath = queryParams.get("filepath");
 		String filename = queryParams.get("filename");
 
-		File tempFile;
-		if (tempFilePath != null && (tempFile = new File(tempFilePath)).exists()) {
-			if (!tempFile.getName().endsWith(".jar")) {
-				//noinspection ResultOfMethodCallIgnored
-				tempFile.delete();
-				return "{\"error\": \"Only Jar files are allowed.\"}";
-			}
+		return CompletableFuture.supplyAsync(
+			() -> {
+				File tempFile;
+				if (tempFilePath != null && (tempFile = new File(tempFilePath)).exists()) {
+					if (!tempFile.getName().endsWith(".jar")) {
+						//noinspection ResultOfMethodCallIgnored
+						tempFile.delete();
+						return "{\"error\": \"Only Jar files are allowed.\"}";
+					}
 
-			String filenameWithUUID = UUID.randomUUID() + "_" + filename;
-			File newFile = new File(jarDir, filenameWithUUID);
-			if (tempFile.renameTo(newFile)) {
-				// all went well
-				return "{\"status\": \"success\", \"filename\": \"" + filenameWithUUID + "\"}";
-			}
-			else {
-				//noinspection ResultOfMethodCallIgnored
-				tempFile.delete();
-			}
-		}
+					String filenameWithUUID = UUID.randomUUID() + "_" + filename;
+					File newFile = new File(jarDir, filenameWithUUID);
+					if (tempFile.renameTo(newFile)) {
+						// all went well
+						return "{\"status\": \"success\", \"filename\": \"" + filenameWithUUID + "\"}";
+					}
+					else {
+						//noinspection ResultOfMethodCallIgnored
+						tempFile.delete();
+					}
+				}
 
-		return "{\"error\": \"Failed to upload the file.\"}";
+				return "{\"error\": \"Failed to upload the file.\"}";
+			},
+			executor);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index 163e583..4dede3a 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -31,6 +32,8 @@ import java.io.StringWriter;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns the aggregated user accumulators of a job.
@@ -39,8 +42,8 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
 
 	private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators";
 
-	public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder) {
-		super(executionGraphHolder);
+	public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
+		super(executionGraphHolder, executor);
 	}
 
 	@Override
@@ -49,8 +52,16 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler
 	}
 
 	@Override
-	public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception {
-		return createJobAccumulatorsJson(graph);
+	public CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+		return CompletableFuture.supplyAsync(
+			() -> {
+				try {
+					return createJobAccumulatorsJson(graph);
+				} catch (IOException e) {
+					throw new FlinkFutureException("Could not create job accumulators json.", e);
+				}
+			},
+			executor);
 	}
 
 	/**


Mime
View raw message