flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/3] flink git commit: [FLINK-7409] [web] Make WebRuntimeMonitor reactive
Date Sun, 03 Sep 2017 22:00:17 GMT
Repository: flink
Updated Branches:
  refs/heads/master 1804aa33d -> ab1fbfdfe


http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
index bcc62cb..fde16fc 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.webmonitor.metrics;
 import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
 
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 /**
  * Request handler that returns for a given task manager a list of all available metrics or the values for a set of metrics.
@@ -37,8 +38,8 @@ public class TaskManagerMetricsHandler extends AbstractMetricsHandler {
 
 	private static final String TASKMANAGER_METRICS_REST_PATH = "/taskmanagers/:taskmanagerid/metrics";
 
-	public TaskManagerMetricsHandler(MetricFetcher fetcher) {
-		super(fetcher);
+	public TaskManagerMetricsHandler(Executor executor, MetricFetcher fetcher) {
+		super(executor, fetcher);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
index 865385f..69ee762 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -29,7 +30,7 @@ import org.junit.Test;
 public class ClusterOverviewHandlerTest {
 	@Test
 	public void testGetPaths() {
-		ClusterOverviewHandler handler = new ClusterOverviewHandler(Time.seconds(0L));
+		ClusterOverviewHandler handler = new ClusterOverviewHandler(Executors.directExecutor(), Time.seconds(0L));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/overview", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
index ea26f5d..6061e4b 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -29,7 +30,7 @@ import org.junit.Test;
 public class CurrentJobIdsHandlerTest {
 	@Test
 	public void testGetPaths() {
-		CurrentJobIdsHandler handler = new CurrentJobIdsHandler(Time.seconds(0L));
+		CurrentJobIdsHandler handler = new CurrentJobIdsHandler(Executors.directExecutor(), Time.seconds(0L));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
index 64360d3..ccfafd4 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
@@ -66,17 +67,17 @@ public class CurrentJobsOverviewHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Time.seconds(0L), true, true);
+		CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, true);
 		String[] pathsAll = handlerAll.getPaths();
 		Assert.assertEquals(1, pathsAll.length);
 		Assert.assertEquals("/joboverview", pathsAll[0]);
 
-		CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Time.seconds(0L), true, false);
+		CurrentJobsOverviewHandler handlerRunning = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), true, false);
 		String[] pathsRunning = handlerRunning.getPaths();
 		Assert.assertEquals(1, pathsRunning.length);
 		Assert.assertEquals("/joboverview/running", pathsRunning[0]);
 
-		CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Time.seconds(0L), false, true);
+		CurrentJobsOverviewHandler handlerCompleted = new CurrentJobsOverviewHandler(Executors.directExecutor(), Time.seconds(0L), false, true);
 		String[] pathsCompleted = handlerCompleted.getPaths();
 		Assert.assertEquals(1, pathsCompleted.length);
 		Assert.assertEquals("/joboverview/completed", pathsCompleted[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
index d17b55f..22b3e5e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 
@@ -33,7 +34,7 @@ import java.util.TimeZone;
 public class DashboardConfigHandlerTest {
 	@Test
 	public void testGetPaths() {
-		DashboardConfigHandler handler = new DashboardConfigHandler(10000L);
+		DashboardConfigHandler handler = new DashboardConfigHandler(Executors.directExecutor(), 10000L);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/config", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
index a498cf2..97091cf 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
+
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
 import org.junit.Assert;
@@ -31,7 +33,7 @@ import java.util.List;
 public class JarAccessDeniedHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JarAccessDeniedHandler handler = new JarAccessDeniedHandler();
+		JarAccessDeniedHandler handler = new JarAccessDeniedHandler(Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(5, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
index bcbb1ea..a067d8f 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -27,7 +29,7 @@ import org.junit.Test;
 public class JarDeleteHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JarDeleteHandler handler = new JarDeleteHandler(null);
+		JarDeleteHandler handler = new JarDeleteHandler(Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jars/:jarid", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java
index 863c248..5da4913 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -27,7 +29,7 @@ import org.junit.Test;
 public class JarListHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JarListHandler handler = new JarListHandler(null);
+		JarListHandler handler = new JarListHandler(Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jars", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java
index a3ded83..f5ed339 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -27,7 +29,7 @@ import org.junit.Test;
 public class JarPlanHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JarPlanHandler handler = new JarPlanHandler(null);
+		JarPlanHandler handler = new JarPlanHandler(Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jars/:jarid/plan", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index 82aa87a..67dad13 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.Executors;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -30,7 +31,7 @@ import org.junit.Test;
 public class JarRunHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JarRunHandler handler = new JarRunHandler(null, Time.seconds(0L), new Configuration());
+		JarRunHandler handler = new JarRunHandler(Executors.directExecutor(), null, Time.seconds(0L), new Configuration());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jars/:jarid/run", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
index e57ca34..ea8b524 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -27,7 +29,7 @@ import org.junit.Test;
 public class JarUploadHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JarUploadHandler handler = new JarUploadHandler(null);
+		JarUploadHandler handler = new JarUploadHandler(Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jars/upload", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
index fe55f51..5510fed 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -54,7 +55,7 @@ public class JobAccumulatorsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class));
+		JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
index a436b2d..86c5295 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -33,7 +34,7 @@ import java.util.List;
 public class JobCancellationHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JobCancellationHandler handler = new JobCancellationHandler(TestingUtils.TIMEOUT());
+		JobCancellationHandler handler = new JobCancellationHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(2, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
index b48ee66..529d130 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlersTest.java
@@ -92,7 +92,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
 		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
 		when(graph.getCheckpointCoordinator()).thenReturn(coord);
 		when(coord.getCheckpointTimeout()).thenReturn(timeout);
 
@@ -121,7 +121,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
 		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
 		when(graph.getCheckpointCoordinator()).thenReturn(coord);
 		when(coord.getCheckpointTimeout()).thenReturn(timeout);
 
@@ -152,7 +152,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		handler = handlers.getTriggerHandler();
 
 		try {
-			handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+			handler.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 			fail("Did not throw expected test Exception");
 		} catch (Exception e) {
 			IllegalStateException cause = (IllegalStateException) e.getCause();
@@ -169,7 +169,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
 		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
 		when(graph.getCheckpointCoordinator()).thenReturn(coord);
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
@@ -187,7 +187,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		when(jobManager.cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class))).thenReturn(successfulCancelWithSavepoint);
 
 		// Trigger
-		FullHttpResponse response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		FullHttpResponse response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 
 		verify(jobManager).cancelJobWithSavepoint(eq(jobId), eq("custom-directory"), any(Time.class));
 
@@ -206,7 +206,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		assertEquals(location, root.get("location").asText());
 
 		// Trigger again
-		response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		response = trigger.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
 		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
 		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
@@ -225,7 +225,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		// Query progress
 		params.put("requestId", "1");
 
-		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 		assertEquals(HttpResponseStatus.ACCEPTED, response.getStatus());
 		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
 		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
@@ -239,7 +239,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		// Complete
 		successfulCancelWithSavepoint.complete("_path-savepoint_");
 
-		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 
 		assertEquals(HttpResponseStatus.CREATED, response.getStatus());
 		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
@@ -255,7 +255,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 
 		// Query again, keep recent history
 
-		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 
 		assertEquals(HttpResponseStatus.CREATED, response.getStatus());
 		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
@@ -272,7 +272,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		// Query for unknown request
 		params.put("requestId", "9929");
 
-		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 		assertEquals(HttpResponseStatus.BAD_REQUEST, response.getStatus());
 		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
 		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));
@@ -295,7 +295,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class);
 		ExecutionGraph graph = mock(ExecutionGraph.class);
 		CheckpointCoordinator coord = mock(CheckpointCoordinator.class);
-		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(Optional.of(graph));
+		when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph)));
 		when(graph.getCheckpointCoordinator()).thenReturn(coord);
 
 		JobCancellationWithSavepointHandlers handlers = new JobCancellationWithSavepointHandlers(holder, executor);
@@ -319,7 +319,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger {
 		// Query progress
 		params.put("requestId", "1");
 
-		FullHttpResponse response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager);
+		FullHttpResponse response = progress.handleRequest(params, Collections.<String, String>emptyMap(), jobManager).get();
 		assertEquals(HttpResponseStatus.INTERNAL_SERVER_ERROR, response.getStatus());
 		assertEquals("application/json; charset=UTF-8", response.headers().get(HttpHeaders.Names.CONTENT_TYPE));
 		assertEquals(Integer.toString(response.content().readableBytes()), response.headers().get(HttpHeaders.Names.CONTENT_LENGTH));

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
index 104b0a3..1c08ae8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -55,7 +56,7 @@ public class JobConfigHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class));
+		JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/config", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
index bfeb40a..ee0498e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -68,7 +69,7 @@ public class JobDetailsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), null);
+		JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(2, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
index f54ab06..6e0f918 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -58,7 +59,7 @@ public class JobExceptionsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class));
+		JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
index 8e16e8a..94fd5a8 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandlerTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
+
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -27,7 +29,7 @@ import org.junit.Test;
 public class JobManagerConfigHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JobManagerConfigHandler handler = new JobManagerConfigHandler(null);
+		JobManagerConfigHandler handler = new JobManagerConfigHandler(Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobmanager/config", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
index 17b4c44..4a934ec 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -51,7 +52,7 @@ public class JobPlanHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class));
+		JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/plan", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
index 89bf426..8c05c83 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 
@@ -34,7 +35,7 @@ import java.util.List;
 public class JobStoppingHandlerTest extends TestLogger {
 	@Test
 	public void testGetPaths() {
-		JobStoppingHandler handler = new JobStoppingHandler(TestingUtils.TIMEOUT());
+		JobStoppingHandler handler = new JobStoppingHandler(Executors.directExecutor(), TestingUtils.TIMEOUT());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(2, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
index b7af323..5af9aa6 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -57,7 +58,7 @@ public class JobVertexAccumulatorsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class));
+		JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
index d2ac0d6..0d15e08 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -46,7 +47,7 @@ import static org.mockito.Mockito.when;
 public class JobVertexBackPressureHandlerTest {
 	@Test
 	public void testGetPaths() {
-		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), mock(BackPressureStatsTracker.class), 0);
+		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]);
@@ -63,10 +64,11 @@ public class JobVertexBackPressureHandlerTest {
 
 		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
 				mock(ExecutionGraphHolder.class),
+				Executors.directExecutor(),
 				statsTracker,
 				9999);
 
-		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
+		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode rootNode = mapper.readTree(response);
@@ -96,10 +98,11 @@ public class JobVertexBackPressureHandlerTest {
 
 		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
 				mock(ExecutionGraphHolder.class),
+				Executors.directExecutor(),
 				statsTracker,
 				9999);
 
-		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
+		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode rootNode = mapper.readTree(response);
@@ -157,10 +160,11 @@ public class JobVertexBackPressureHandlerTest {
 
 		JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(
 				mock(ExecutionGraphHolder.class),
+				Executors.directExecutor(),
 				statsTracker,
 				0); // <----- refresh interval should fire immediately
 
-		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap());
+		String response = handler.handleRequest(jobVertex, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode rootNode = mapper.readTree(response);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
index bc4fe9c..1b8d9aa 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -59,7 +60,7 @@ public class JobVertexDetailsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), null);
+		JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
index d5d877a..badb952 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -61,7 +62,7 @@ public class JobVertexTaskManagersHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), null);
+		JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
index d992b85..a80bac9 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
 import org.junit.Assert;
@@ -31,7 +32,7 @@ import static org.mockito.Mockito.mock;
 public class SubtaskCurrentAttemptDetailsHandlerTest {
 	@Test
 	public void testGetPaths() {
-		SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null);
+		SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index ce8e72f..6773fd4 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
@@ -64,7 +65,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class));
+		SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
index e1fbf92..7777d2d 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -73,7 +74,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), null);
+		SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(),  null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
index f33da80..7b400da 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
@@ -58,7 +59,7 @@ public class SubtasksAllAccumulatorsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class));
+		SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
index 548efaf..31c2212 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
@@ -59,7 +60,7 @@ public class SubtasksTimesHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class));
+		SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
index afc2764..e3a71a1 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -33,7 +34,7 @@ import java.util.List;
 public class TaskManagersHandlerTest {
 	@Test
 	public void testGetPaths() {
-		TaskManagersHandler handler = new TaskManagersHandler(Time.seconds(0L), null);
+		TaskManagersHandler handler = new TaskManagersHandler(Executors.directExecutor(), Time.seconds(0L), null);
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(2, paths.length);
 		List<String> pathsList = Lists.newArrayList(paths);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index ce943b1..47298be 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
@@ -79,7 +80,7 @@ public class CheckpointConfigHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/checkpoints/config", paths[0]);
@@ -95,8 +96,8 @@ public class CheckpointConfigHandlerTest {
 		AccessExecutionGraph graph = graphAndSettings.graph;
 		JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings;
 
-		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode rootNode = mapper.readTree(json);
@@ -121,8 +122,8 @@ public class CheckpointConfigHandlerTest {
 
 		AccessExecutionGraph graph = graphAndSettings.graph;
 
-		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode rootNode = mapper.readTree(json);
@@ -140,8 +141,8 @@ public class CheckpointConfigHandlerTest {
 		AccessExecutionGraph graph = graphAndSettings.graph;
 		ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
 
-		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+		CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode externalizedNode = mapper.readTree(json).get("externalization");

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index 0259aa5..f16d623 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -101,7 +102,7 @@ public class CheckpointStatsDetailsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid", paths[0]);
@@ -113,10 +114,10 @@ public class CheckpointStatsDetailsHandlerTest {
 	@Test
 	public void testIllegalCheckpointId() throws Exception {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "illegal checkpoint");
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 	}
@@ -127,8 +128,8 @@ public class CheckpointStatsDetailsHandlerTest {
 	@Test
 	public void testNoCheckpointIdParam() throws Exception {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
 
 		assertEquals("{}", json);
 	}
@@ -147,10 +148,10 @@ public class CheckpointStatsDetailsHandlerTest {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
-		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "123");
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 		verify(history, times(1)).getCheckpointById(anyLong());
@@ -318,10 +319,10 @@ public class CheckpointStatsDetailsHandlerTest {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
-		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "123");
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		return mapper.readTree(json);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index 9425a4c..ed73a62 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
 import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
@@ -86,7 +87,7 @@ public class CheckpointStatsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
+		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/checkpoints", paths[0]);
@@ -99,8 +100,8 @@ public class CheckpointStatsHandlerTest {
 	public void testCheckpointStatsRequest() throws Exception {
 		TestCheckpointStats testCheckpointStats = createTestCheckpointStats();
 
-		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
-		String json = handler.handleRequest(testCheckpointStats.graph, Collections.<String, String>emptyMap());
+		CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor());
+		String json = handler.handleRequest(testCheckpointStats.graph, Collections.<String, String>emptyMap()).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		JsonNode rootNode = mapper.readTree(json);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
index b8eb715..9c5e168 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
@@ -100,7 +101,7 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 
 	@Test
 	public void testGetPaths() {
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", paths[0]);
@@ -149,10 +150,10 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 	@Test
 	public void testIllegalCheckpointId() throws Exception {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "illegal checkpoint");
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 	}
@@ -163,8 +164,8 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 	@Test
 	public void testNoCheckpointIdParam() throws Exception {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
-		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
+		String json = handler.handleRequest(graph, Collections.<String, String>emptyMap()).get();
 
 		assertEquals("{}", json);
 	}
@@ -183,11 +184,11 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "123");
 		params.put("vertexid", new JobVertexID().toString());
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 		verify(history, times(1)).getCheckpointById(anyLong());
@@ -199,11 +200,11 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 	@Test
 	public void testIllegalJobVertexIdParam() throws Exception {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "1");
 		params.put("vertexid", "illegal vertex id");
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 	}
@@ -214,10 +215,10 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 	@Test
 	public void testNoJobVertexIdParam() throws Exception {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "1");
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 	}
@@ -238,11 +239,11 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "123");
 		params.put("vertexid", new JobVertexID().toString());
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		assertEquals("{}", json);
 		verify(inProgress, times(1)).getTaskStateStats(any(JobVertexID.class));
@@ -259,11 +260,11 @@ public class CheckpointStatsSubtaskDetailsHandlerTest {
 		AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
 		when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
-		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new CheckpointStatsCache(0));
+		CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0));
 		Map<String, String> params = new HashMap<>();
 		params.put("checkpointid", "123");
 		params.put("vertexid", new JobVertexID().toString());
-		String json = handler.handleRequest(graph, params);
+		String json = handler.handleRequest(graph, params).get();
 
 		ObjectMapper mapper = new ObjectMapper();
 		return mapper.readTree(json);

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index 90e032d..5296d33 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -49,7 +49,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 		Map<String, String> queryParams = new HashMap<>();
@@ -58,7 +58,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		pathParams.put("vertexid", "taskid");
 
 		// get list of available metrics
-		String availableList = handler.handleJsonRequest(pathParams, queryParams, null);
+		String availableList = handler.handleJsonRequest(pathParams, queryParams, null).get();
 
 		assertEquals("[" +
 				"{\"id\":\"8.opname.abc.metric5\"}," +
@@ -69,7 +69,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		// get value for a single metric
 		queryParams.put("get", "8.opname.abc.metric5");
 
-		String metricValue = handler.handleJsonRequest(pathParams, queryParams, null);
+		String metricValue = handler.handleJsonRequest(pathParams, queryParams, null).get();
 
 		assertEquals("[" +
 				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" +
@@ -80,7 +80,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		// get values for multiple metrics
 		queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4");
 
-		String metricValues = handler.handleJsonRequest(pathParams, queryParams, null);
+		String metricValues = handler.handleJsonRequest(pathParams, queryParams, null).get();
 
 		assertEquals("[" +
 				"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +
@@ -102,7 +102,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 		Map<String, String> queryParams = new HashMap<>();
@@ -114,7 +114,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		pathParams.put("jobid", "nonexistent");
 
 		try {
-			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
 		} catch (Exception e) {
 			fail();
 		}
@@ -132,7 +132,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 		Map<String, String> queryParams = new HashMap<>();
@@ -144,7 +144,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		queryParams.put("get", "");
 
 		try {
-			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
 		} catch (Exception e) {
 			fail(e.getMessage());
 		}
@@ -154,7 +154,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		queryParams.put("get", "subindex.opname.abc.metric5");
 
 		try {
-			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
 		} catch (Exception e) {
 			fail(e.getMessage());
 		}
@@ -164,7 +164,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
 		queryParams.put("get", "subindex.opname.abc.nonexistant");
 
 		try {
-			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null));
+			assertEquals("", handler.handleJsonRequest(pathParams, queryParams, null).get());
 		} catch (Exception e) {
 			fail(e.getMessage());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
index 994fc5e..b02949a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandlerTest.java
@@ -40,7 +40,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 public class JobManagerMetricsHandlerTest extends TestLogger {
 	@Test
 	public void testGetPaths() {
-		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(mock(MetricFetcher.class));
+		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobmanager/metrics", paths[0]);
@@ -55,7 +55,7 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher);
+		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 
@@ -73,7 +73,7 @@ public class JobManagerMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = fetcher.getMetricStore();
 
-		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(fetcher);
+		JobManagerMetricsHandler handler = new JobManagerMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
index a35af22..569f772 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandlerTest.java
@@ -41,7 +41,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 public class JobMetricsHandlerTest extends TestLogger {
 	@Test
 	public void testGetPaths() {
-		JobMetricsHandler handler = new JobMetricsHandler(mock(MetricFetcher.class));
+		JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/metrics", paths[0]);
@@ -56,7 +56,7 @@ public class JobMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		JobMetricsHandler handler = new JobMetricsHandler(fetcher);
+		JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 		pathParams.put(PARAMETER_JOB_ID, "jobid");
@@ -75,7 +75,7 @@ public class JobMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = fetcher.getMetricStore();
 
-		JobMetricsHandler handler = new JobMetricsHandler(fetcher);
+		JobMetricsHandler handler = new JobMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
index e84b11d..e6bbd2e 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandlerTest.java
@@ -42,7 +42,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 public class JobVertexMetricsHandlerTest extends TestLogger {
 	@Test
 	public void testGetPaths() {
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(mock(MetricFetcher.class));
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/metrics", paths[0]);
@@ -57,7 +57,7 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 		pathParams.put(PARAMETER_JOB_ID, "jobid");
@@ -79,7 +79,7 @@ public class JobVertexMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = fetcher.getMetricStore();
 
-		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(fetcher);
+		JobVertexMetricsHandler handler = new JobVertexMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
index c20ea98..c4c1c7a 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/TaskManagerMetricsHandlerTest.java
@@ -41,7 +41,7 @@ import static org.powermock.api.mockito.PowerMockito.mock;
 public class TaskManagerMetricsHandlerTest extends TestLogger {
 	@Test
 	public void testGetPaths() {
-		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(mock(MetricFetcher.class));
+		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), mock(MetricFetcher.class));
 		String[] paths = handler.getPaths();
 		Assert.assertEquals(1, paths.length);
 		Assert.assertEquals("/taskmanagers/:taskmanagerid/metrics", paths[0]);
@@ -56,7 +56,7 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = MetricStoreTest.setupStore(fetcher.getMetricStore());
 
-		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher);
+		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 		pathParams.put(TASK_MANAGER_ID_KEY, "tmid");
@@ -75,7 +75,7 @@ public class TaskManagerMetricsHandlerTest extends TestLogger {
 			TestingUtils.TIMEOUT());
 		MetricStore store = fetcher.getMetricStore();
 
-		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(fetcher);
+		TaskManagerMetricsHandler handler = new TaskManagerMetricsHandler(Executors.directExecutor(), fetcher);
 
 		Map<String, String> pathParams = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8c551a7..f0073db 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -2232,7 +2232,7 @@ object JobManager {
           new AkkaJobManagerRetriever(jobManagerSystem, timeout),
           new AkkaQueryServiceRetriever(jobManagerSystem, timeout),
           timeout,
-          jobManagerSystem.dispatcher)
+          futureExecutor)
 
         Option(webServer)
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab1fbfdf/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index e622130..41e5629 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -1094,7 +1094,7 @@ public class TaskExecutorTest extends TestLogger {
 
 				fail("The slot request should have failed.");
 			} catch (Exception e) {
-				assertTrue(ExceptionUtils.containsThrowable(e, SlotAllocationException.class));
+				assertTrue(ExceptionUtils.findThrowable(e, SlotAllocationException.class).isPresent());
 			}
 
 			// re-register


Mime
View raw message