flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [16/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler to flink-runtime
Date Tue, 19 Sep 2017 22:44:26 GMT
[FLINK-7531] Move Flink legacy rest handler to flink-runtime

Move metrics handlers under o.a.f.runtime.webmonitor.handlers

Move StaticFileServerHandler under o.a.f.runtime.webmonitor.files

This closes #4600.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4fc019a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4fc019a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4fc019a9

Branch: refs/heads/master
Commit: 4fc019a96a08446d7ba5f57664904abcd585e31c
Parents: 3277010
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Fri Aug 18 09:52:30 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Wed Sep 20 00:40:24 2017 +0200

----------------------------------------------------------------------
 .../webmonitor/BackPressureStatsTracker.java    | 334 --------------
 .../webmonitor/ExecutionGraphHolder.java        |  86 ----
 .../runtime/webmonitor/NotFoundException.java   |  32 --
 .../webmonitor/OperatorBackPressureStats.java   | 126 ------
 .../webmonitor/RuntimeMonitorHandler.java       |   4 +-
 .../runtime/webmonitor/StackTraceSample.java    | 119 -----
 .../webmonitor/StackTraceSampleCoordinator.java | 392 -----------------
 .../flink/runtime/webmonitor/WebHandler.java    |  32 --
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  82 ++--
 .../files/StaticFileServerHandler.java          | 363 ---------------
 .../AbstractExecutionGraphRequestHandler.java   |  80 ----
 .../AbstractJobVertexRequestHandler.java        |  72 ---
 .../handlers/AbstractJsonRequestHandler.java    |  87 ----
 .../AbstractSubtaskAttemptRequestHandler.java   |  78 ----
 .../handlers/AbstractSubtaskRequestHandler.java |  66 ---
 .../handlers/ClusterOverviewHandler.java        | 105 -----
 .../handlers/ConstantTextHandler.java           |  57 ---
 .../handlers/CurrentJobIdsHandler.java          | 112 -----
 .../handlers/CurrentJobsOverviewHandler.java    | 182 --------
 .../handlers/DashboardConfigHandler.java        |  90 ----
 .../handlers/JarAccessDeniedHandler.java        |   1 +
 .../webmonitor/handlers/JarActionHandler.java   |   2 +
 .../webmonitor/handlers/JarDeleteHandler.java   |   2 +
 .../webmonitor/handlers/JarListHandler.java     |   2 +
 .../webmonitor/handlers/JarPlanHandler.java     |   1 +
 .../webmonitor/handlers/JarRunHandler.java      |   1 +
 .../webmonitor/handlers/JarUploadHandler.java   |   1 +
 .../handlers/JobAccumulatorsHandler.java        | 107 -----
 .../handlers/JobCancellationHandler.java        |  72 ---
 .../JobCancellationWithSavepointHandlers.java   | 428 ------------------
 .../webmonitor/handlers/JobConfigHandler.java   | 119 -----
 .../webmonitor/handlers/JobDetailsHandler.java  | 225 ----------
 .../handlers/JobExceptionsHandler.java          | 137 ------
 .../handlers/JobManagerConfigHandler.java       |  87 ----
 .../webmonitor/handlers/JobPlanHandler.java     |  67 ---
 .../webmonitor/handlers/JobStoppingHandler.java |  72 ---
 .../handlers/JobVertexAccumulatorsHandler.java  | 113 -----
 .../handlers/JobVertexBackPressureHandler.java  | 147 -------
 .../handlers/JobVertexDetailsHandler.java       | 160 -------
 .../handlers/JobVertexTaskManagersHandler.java  | 211 ---------
 .../webmonitor/handlers/JsonFactory.java        |  35 --
 .../webmonitor/handlers/RequestHandler.java     |  56 ---
 .../handlers/RequestHandlerException.java       |  31 --
 .../SubtaskCurrentAttemptDetailsHandler.java    |  49 ---
 ...taskExecutionAttemptAccumulatorsHandler.java | 134 ------
 .../SubtaskExecutionAttemptDetailsHandler.java  | 167 -------
 .../SubtasksAllAccumulatorsHandler.java         | 131 ------
 .../handlers/SubtasksTimesHandler.java          | 141 ------
 .../handlers/TaskManagerLogHandler.java         | 335 --------------
 .../handlers/TaskManagersHandler.java           | 205 ---------
 .../checkpoints/CheckpointConfigHandler.java    | 120 -----
 .../checkpoints/CheckpointStatsCache.java       |  81 ----
 .../CheckpointStatsDetailsHandler.java          | 203 ---------
 .../CheckpointStatsDetailsSubtasksHandler.java  | 234 ----------
 .../checkpoints/CheckpointStatsHandler.java     | 277 ------------
 .../webmonitor/history/HistoryServer.java       |   2 +-
 .../history/HistoryServerArchiveFetcher.java    |   2 +-
 .../HistoryServerStaticFileServerHandler.java   |   2 +-
 .../metrics/AbstractMetricsHandler.java         | 139 ------
 .../metrics/JobManagerMetricsHandler.java       |  57 ---
 .../webmonitor/metrics/JobMetricsHandler.java   |  55 ---
 .../metrics/JobVertexMetricsHandler.java        |  57 ---
 .../webmonitor/metrics/MetricFetcher.java       | 211 ---------
 .../runtime/webmonitor/metrics/MetricStore.java | 305 -------------
 .../metrics/TaskManagerMetricsHandler.java      |  59 ---
 .../webmonitor/utils/MutableIOMetrics.java      | 109 -----
 .../BackPressureStatsTrackerITCase.java         | 332 --------------
 .../BackPressureStatsTrackerTest.java           | 192 --------
 .../StackTraceSampleCoordinatorITCase.java      | 203 ---------
 .../StackTraceSampleCoordinatorTest.java        | 441 -------------------
 .../runtime/webmonitor/files/MimeTypesTest.java |  75 ----
 .../handlers/ClusterOverviewHandlerTest.java    |  38 --
 .../handlers/CurrentJobIdsHandlerTest.java      |  38 --
 .../CurrentJobsOverviewHandlerTest.java         | 121 -----
 .../handlers/DashboardConfigHandlerTest.java    |  59 ---
 .../handlers/HandlerRedirectUtilsTest.java      |  74 ----
 .../handlers/JarActionHandlerTest.java          |  13 +-
 .../handlers/JobAccumulatorsHandlerTest.java    |  83 ----
 .../handlers/JobCancellationHandlerTest.java    |  44 --
 ...obCancellationWithSavepointHandlersTest.java | 334 --------------
 .../handlers/JobConfigHandlerTest.java          |  92 ----
 .../handlers/JobDetailsHandlerTest.java         | 169 -------
 .../handlers/JobExceptionsHandlerTest.java      | 101 -----
 .../handlers/JobManagerConfigHandlerTest.java   |  37 --
 .../webmonitor/handlers/JobPlanHandlerTest.java |  60 ---
 .../handlers/JobStoppingHandlerTest.java        |  45 --
 .../JobVertexAccumulatorsHandlerTest.java       |  85 ----
 .../JobVertexBackPressureHandlerTest.java       | 211 ---------
 .../handlers/JobVertexDetailsHandlerTest.java   | 109 -----
 .../JobVertexTaskManagersHandlerTest.java       | 132 ------
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |  40 --
 ...ExecutionAttemptAccumulatorsHandlerTest.java |  91 ----
 ...btaskExecutionAttemptDetailsHandlerTest.java | 109 -----
 .../SubtasksAllAccumulatorsHandlerTest.java     |  97 ----
 .../handlers/SubtasksTimesHandlerTest.java      | 103 -----
 .../handlers/TaskManagerLogHandlerTest.java     | 149 -------
 .../handlers/TaskManagersHandlerTest.java       |  44 --
 .../CheckpointConfigHandlerTest.java            | 195 --------
 .../checkpoints/CheckpointStatsCacheTest.java   |  71 ---
 .../CheckpointStatsDetailsHandlerTest.java      | 358 ---------------
 .../checkpoints/CheckpointStatsHandlerTest.java | 432 ------------------
 ...heckpointStatsSubtaskDetailsHandlerTest.java | 389 ----------------
 .../webmonitor/history/FsJobArchivistTest.java  |   2 +-
 .../webmonitor/history/HistoryServerTest.java   |   2 +-
 .../metrics/AbstractMetricsHandlerTest.java     | 172 --------
 .../metrics/JobManagerMetricsHandlerTest.java   |  84 ----
 .../metrics/JobMetricsHandlerTest.java          |  86 ----
 .../metrics/JobVertexMetricsHandlerTest.java    |  90 ----
 .../webmonitor/metrics/MetricFetcherTest.java   | 195 --------
 .../webmonitor/metrics/MetricStoreTest.java     |  88 ----
 .../metrics/TaskManagerMetricsHandlerTest.java  |  86 ----
 .../utils/ArchivedExecutionBuilder.java         | 150 -------
 .../utils/ArchivedExecutionConfigBuilder.java   |  71 ---
 .../utils/ArchivedExecutionGraphBuilder.java    | 140 ------
 .../ArchivedExecutionJobVertexBuilder.java      |  84 ----
 .../utils/ArchivedExecutionVertexBuilder.java   |  73 ---
 .../utils/ArchivedJobGenerationUtils.java       | 164 -------
 .../flink/runtime/rest/NotFoundException.java   |  32 ++
 .../flink/runtime/rest/handler/WebHandler.java  |  32 ++
 .../AbstractExecutionGraphRequestHandler.java   |  79 ++++
 .../legacy/AbstractJobVertexRequestHandler.java |  71 +++
 .../legacy/AbstractJsonRequestHandler.java      |  88 ++++
 .../AbstractSubtaskAttemptRequestHandler.java   |  77 ++++
 .../legacy/AbstractSubtaskRequestHandler.java   |  65 +++
 .../handler/legacy/ClusterOverviewHandler.java  | 105 +++++
 .../handler/legacy/ConstantTextHandler.java     |  57 +++
 .../handler/legacy/CurrentJobIdsHandler.java    | 112 +++++
 .../legacy/CurrentJobsOverviewHandler.java      | 182 ++++++++
 .../handler/legacy/DashboardConfigHandler.java  |  90 ++++
 .../handler/legacy/ExecutionGraphHolder.java    |  82 ++++
 .../handler/legacy/JobAccumulatorsHandler.java  | 106 +++++
 .../handler/legacy/JobCancellationHandler.java  |  72 +++
 .../JobCancellationWithSavepointHandlers.java   | 427 ++++++++++++++++++
 .../rest/handler/legacy/JobConfigHandler.java   | 118 +++++
 .../rest/handler/legacy/JobDetailsHandler.java  | 224 ++++++++++
 .../handler/legacy/JobExceptionsHandler.java    | 136 ++++++
 .../handler/legacy/JobManagerConfigHandler.java |  87 ++++
 .../rest/handler/legacy/JobPlanHandler.java     |  66 +++
 .../rest/handler/legacy/JobStoppingHandler.java |  72 +++
 .../legacy/JobVertexAccumulatorsHandler.java    | 112 +++++
 .../legacy/JobVertexBackPressureHandler.java    | 145 ++++++
 .../handler/legacy/JobVertexDetailsHandler.java | 159 +++++++
 .../legacy/JobVertexTaskManagersHandler.java    | 210 +++++++++
 .../rest/handler/legacy/JsonFactory.java        |  35 ++
 .../rest/handler/legacy/RequestHandler.java     |  56 +++
 .../handler/legacy/RequestHandlerException.java |  31 ++
 .../SubtaskCurrentAttemptDetailsHandler.java    |  48 ++
 ...taskExecutionAttemptAccumulatorsHandler.java | 133 ++++++
 .../SubtaskExecutionAttemptDetailsHandler.java  | 166 +++++++
 .../legacy/SubtasksAllAccumulatorsHandler.java  | 130 ++++++
 .../handler/legacy/SubtasksTimesHandler.java    | 140 ++++++
 .../handler/legacy/TaskManagerLogHandler.java   | 335 ++++++++++++++
 .../handler/legacy/TaskManagersHandler.java     | 205 +++++++++
 .../backpressure/BackPressureStatsTracker.java  | 333 ++++++++++++++
 .../backpressure/OperatorBackPressureStats.java | 126 ++++++
 .../legacy/backpressure/StackTraceSample.java   | 119 +++++
 .../StackTraceSampleCoordinator.java            | 392 +++++++++++++++++
 .../checkpoints/CheckpointConfigHandler.java    | 120 +++++
 .../checkpoints/CheckpointStatsCache.java       |  81 ++++
 .../CheckpointStatsDetailsHandler.java          | 203 +++++++++
 .../CheckpointStatsDetailsSubtasksHandler.java  | 233 ++++++++++
 .../checkpoints/CheckpointStatsHandler.java     | 277 ++++++++++++
 .../legacy/files/StaticFileServerHandler.java   | 363 +++++++++++++++
 .../legacy/metrics/AbstractMetricsHandler.java  | 139 ++++++
 .../metrics/JobManagerMetricsHandler.java       |  57 +++
 .../legacy/metrics/JobMetricsHandler.java       |  55 +++
 .../legacy/metrics/JobVertexMetricsHandler.java |  57 +++
 .../handler/legacy/metrics/MetricFetcher.java   | 211 +++++++++
 .../handler/legacy/metrics/MetricStore.java     | 305 +++++++++++++
 .../metrics/TaskManagerMetricsHandler.java      |  59 +++
 .../rest/handler/util/MutableIOMetrics.java     | 109 +++++
 .../legacy/ClusterOverviewHandlerTest.java      |  38 ++
 .../legacy/CurrentJobIdsHandlerTest.java        |  38 ++
 .../legacy/CurrentJobsOverviewHandlerTest.java  | 121 +++++
 .../legacy/DashboardConfigHandlerTest.java      |  59 +++
 .../legacy/HandlerRedirectUtilsTest.java        |  74 ++++
 .../legacy/JobAccumulatorsHandlerTest.java      |  82 ++++
 .../legacy/JobCancellationHandlerTest.java      |  44 ++
 ...obCancellationWithSavepointHandlersTest.java | 333 ++++++++++++++
 .../handler/legacy/JobConfigHandlerTest.java    |  91 ++++
 .../handler/legacy/JobDetailsHandlerTest.java   | 168 +++++++
 .../legacy/JobExceptionsHandlerTest.java        | 100 +++++
 .../legacy/JobManagerConfigHandlerTest.java     |  37 ++
 .../rest/handler/legacy/JobPlanHandlerTest.java |  59 +++
 .../handler/legacy/JobStoppingHandlerTest.java  |  45 ++
 .../JobVertexAccumulatorsHandlerTest.java       |  84 ++++
 .../JobVertexBackPressureHandlerTest.java       | 209 +++++++++
 .../legacy/JobVertexDetailsHandlerTest.java     | 108 +++++
 .../JobVertexTaskManagersHandlerTest.java       | 132 ++++++
 ...SubtaskCurrentAttemptDetailsHandlerTest.java |  40 ++
 ...ExecutionAttemptAccumulatorsHandlerTest.java |  91 ++++
 ...btaskExecutionAttemptDetailsHandlerTest.java | 109 +++++
 .../SubtasksAllAccumulatorsHandlerTest.java     |  97 ++++
 .../legacy/SubtasksTimesHandlerTest.java        | 103 +++++
 .../legacy/TaskManagerLogHandlerTest.java       | 149 +++++++
 .../handler/legacy/TaskManagersHandlerTest.java |  44 ++
 .../BackPressureStatsTrackerITCase.java         | 329 ++++++++++++++
 .../BackPressureStatsTrackerTest.java           | 185 ++++++++
 .../StackTraceSampleCoordinatorITCase.java      | 203 +++++++++
 .../StackTraceSampleCoordinatorTest.java        | 432 ++++++++++++++++++
 .../CheckpointConfigHandlerTest.java            | 195 ++++++++
 .../checkpoints/CheckpointStatsCacheTest.java   |  71 +++
 .../CheckpointStatsDetailsHandlerTest.java      | 358 +++++++++++++++
 .../checkpoints/CheckpointStatsHandlerTest.java | 432 ++++++++++++++++++
 ...heckpointStatsSubtaskDetailsHandlerTest.java | 389 ++++++++++++++++
 .../handler/legacy/files/MimeTypesTest.java     |  75 ++++
 .../metrics/AbstractMetricsHandlerTest.java     | 172 ++++++++
 .../metrics/JobManagerMetricsHandlerTest.java   |  84 ++++
 .../legacy/metrics/JobMetricsHandlerTest.java   |  86 ++++
 .../metrics/JobVertexMetricsHandlerTest.java    |  90 ++++
 .../legacy/metrics/MetricFetcherTest.java       | 195 ++++++++
 .../handler/legacy/metrics/MetricStoreTest.java |  88 ++++
 .../metrics/TaskManagerMetricsHandlerTest.java  |  86 ++++
 .../legacy/utils/ArchivedExecutionBuilder.java  | 150 +++++++
 .../utils/ArchivedExecutionConfigBuilder.java   |  71 +++
 .../utils/ArchivedExecutionGraphBuilder.java    | 140 ++++++
 .../ArchivedExecutionJobVertexBuilder.java      |  84 ++++
 .../utils/ArchivedExecutionVertexBuilder.java   |  73 +++
 .../utils/ArchivedJobGenerationUtils.java       | 164 +++++++
 219 files changed, 14198 insertions(+), 14237 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
deleted file mode 100644
index 5e4e63a..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
+++ /dev/null
@@ -1,334 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-
-import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
-
-import scala.Option;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Back pressure statistics tracker.
- *
- * <p>Back pressure is determined by sampling running tasks. If a task is
- * slowed down by back pressure it will be stuck in memory requests to a
- * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}.
- *
- * <p>The back pressured stack traces look like this:
- *
- * <pre>
- * java.lang.Object.wait(Native Method)
- * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
- * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING
- * request
- * [...]
- * </pre>
- */
-public class BackPressureStatsTracker {
-
-	private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTracker.class);
-
-	/** Maximum stack trace depth for samples. */
-	static final int MAX_STACK_TRACE_DEPTH = 3;
-
-	/** Expected class name for back pressure indicating stack trace element. */
-	static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
-
-	/** Expected method name for back pressure indicating stack trace element. */
-	static final String EXPECTED_METHOD_NAME = "requestBufferBlocking";
-
-	/** Lock guarding trigger operations. */
-	private final Object lock = new Object();
-
-	/* Stack trace sample coordinator. */
-	private final StackTraceSampleCoordinator coordinator;
-
-	/**
-	 * Completed stats. Important: Job vertex IDs need to be scoped by job ID,
-	 * because they are potentially constant across runs messing up the cached
-	 * data.
-	 */
-	private final Cache<ExecutionJobVertex, OperatorBackPressureStats> operatorStatsCache;
-
-	/** Pending in progress stats. Important: Job vertex IDs need to be scoped
-	 * by job ID, because they are potentially constant across runs messing up
-	 * the cached data.*/
-	private final Set<ExecutionJobVertex> pendingStats = new HashSet<>();
-
-	/** Cleanup interval for completed stats cache. */
-	private final int cleanUpInterval;
-
-	private final int numSamples;
-
-	private final Time delayBetweenSamples;
-
-	/** Flag indicating whether the stats tracker has been shut down. */
-	private boolean shutDown;
-
-	/**
-	 * Creates a back pressure statistics tracker.
-	 *
-	 * @param cleanUpInterval     Clean up interval for completed stats.
-	 * @param numSamples          Number of stack trace samples when determining back pressure.
-	 * @param delayBetweenSamples Delay between samples when determining back pressure.
-	 */
-	public BackPressureStatsTracker(
-			StackTraceSampleCoordinator coordinator,
-			int cleanUpInterval,
-			int numSamples,
-			Time delayBetweenSamples) {
-
-		this.coordinator = checkNotNull(coordinator, "Stack trace sample coordinator");
-
-		checkArgument(cleanUpInterval >= 0, "Clean up interval");
-		this.cleanUpInterval = cleanUpInterval;
-
-		checkArgument(numSamples >= 1, "Number of samples");
-		this.numSamples = numSamples;
-
-		this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay between samples");
-
-		this.operatorStatsCache = CacheBuilder.newBuilder()
-				.concurrencyLevel(1)
-				.expireAfterAccess(cleanUpInterval, TimeUnit.MILLISECONDS)
-				.build();
-	}
-
-	/** Cleanup interval for completed stats cache. */
-	public long getCleanUpInterval() {
-		return cleanUpInterval;
-	}
-
-	/**
-	 * Returns back pressure statistics for a operator.
-	 *
-	 * @param vertex Operator to get the stats for.
-	 *
-	 * @return Back pressure statistics for an operator
-	 */
-	public Option<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex vertex) {
-		return Option.apply(operatorStatsCache.getIfPresent(vertex));
-	}
-
-	/**
-	 * Triggers a stack trace sample for a operator to gather the back pressure
-	 * statistics. If there is a sample in progress for the operator, the call
-	 * is ignored.
-	 *
-	 * @param vertex Operator to get the stats for.
-	 * @return Flag indicating whether a sample with triggered.
-	 */
-	@SuppressWarnings("unchecked")
-	public boolean triggerStackTraceSample(ExecutionJobVertex vertex) {
-		synchronized (lock) {
-			if (shutDown) {
-				return false;
-			}
-
-			if (!pendingStats.contains(vertex) &&
-					!vertex.getGraph().getState().isGloballyTerminalState()) {
-
-				Executor executor = vertex.getGraph().getFutureExecutor();
-
-				// Only trigger if still active job
-				if (executor != null) {
-					pendingStats.add(vertex);
-
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
-					}
-
-					CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample(
-							vertex.getTaskVertices(),
-							numSamples,
-							delayBetweenSamples,
-							MAX_STACK_TRACE_DEPTH);
-
-					sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor);
-
-					return true;
-				}
-			}
-
-			return false;
-		}
-	}
-
-	/**
-	 * Cleans up the operator stats cache if it contains timed out entries.
-	 *
-	 * <p>The Guava cache only evicts as maintenance during normal operations.
-	 * If this handler is inactive, it will never be cleaned.
-	 */
-	public void cleanUpOperatorStatsCache() {
-		operatorStatsCache.cleanUp();
-	}
-
-	/**
-	 * Shuts down the stats tracker.
-	 *
-	 * <p>Invalidates the cache and clears all pending stats.
-	 */
-	public void shutDown() {
-		synchronized (lock) {
-			if (!shutDown) {
-				operatorStatsCache.invalidateAll();
-				pendingStats.clear();
-
-				shutDown = true;
-			}
-		}
-	}
-
-	/**
-	 * Invalidates the cache (irrespective of clean up interval).
-	 */
-	void invalidateOperatorStatsCache() {
-		operatorStatsCache.invalidateAll();
-	}
-
-	/**
-	 * Callback on completed stack trace sample.
-	 */
-	class StackTraceSampleCompletionCallback implements BiFunction<StackTraceSample, Throwable, Void> {
-
-		private final ExecutionJobVertex vertex;
-
-		public StackTraceSampleCompletionCallback(ExecutionJobVertex vertex) {
-			this.vertex = vertex;
-		}
-
-		@Override
-		public Void apply(StackTraceSample stackTraceSample, Throwable throwable) {
-			synchronized (lock) {
-				try {
-					if (shutDown) {
-						return null;
-					}
-
-					// Job finished, ignore.
-					JobStatus jobState = vertex.getGraph().getState();
-					if (jobState.isGloballyTerminalState()) {
-						LOG.debug("Ignoring sample, because job is in state " + jobState + ".");
-					} else if (stackTraceSample != null) {
-						OperatorBackPressureStats stats = createStatsFromSample(stackTraceSample);
-						operatorStatsCache.put(vertex, stats);
-					} else {
-						LOG.debug("Failed to gather stack trace sample.", throwable);
-					}
-				} catch (Throwable t) {
-					LOG.error("Error during stats completion.", t);
-				} finally {
-					pendingStats.remove(vertex);
-				}
-
-				return null;
-			}
-		}
-
-		/**
-		 * Creates the back pressure stats from a stack trace sample.
-		 *
-		 * @param sample Stack trace sample to base stats on.
-		 *
-		 * @return Back pressure stats
-		 */
-		private OperatorBackPressureStats createStatsFromSample(StackTraceSample sample) {
-			Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = sample.getStackTraces();
-
-			// Map task ID to subtask index, because the web interface expects
-			// it like that.
-			Map<ExecutionAttemptID, Integer> subtaskIndexMap = Maps
-					.newHashMapWithExpectedSize(traces.size());
-
-			Set<ExecutionAttemptID> sampledTasks = sample.getStackTraces().keySet();
-
-			for (ExecutionVertex task : vertex.getTaskVertices()) {
-				ExecutionAttemptID taskId = task.getCurrentExecutionAttempt().getAttemptId();
-				if (sampledTasks.contains(taskId)) {
-					subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex());
-				} else {
-					LOG.debug("Outdated sample. A task, which is part of the " +
-							"sample has been reset.");
-				}
-			}
-
-			// Ratio of blocked samples to total samples per sub task. Array
-			// position corresponds to sub task index.
-			double[] backPressureRatio = new double[traces.size()];
-
-			for (Entry<ExecutionAttemptID, List<StackTraceElement[]>> entry : traces.entrySet()) {
-				int backPressureSamples = 0;
-
-				List<StackTraceElement[]> taskTraces = entry.getValue();
-
-				for (StackTraceElement[] trace : taskTraces) {
-					for (int i = trace.length - 1; i >= 0; i--) {
-						StackTraceElement elem = trace[i];
-
-						if (elem.getClassName().equals(EXPECTED_CLASS_NAME) &&
-								elem.getMethodName().equals(EXPECTED_METHOD_NAME)) {
-
-							backPressureSamples++;
-							break; // Continue with next stack trace
-						}
-					}
-				}
-
-				int subtaskIndex = subtaskIndexMap.get(entry.getKey());
-
-				int size = taskTraces.size();
-				double ratio = (size > 0)
-						? ((double) backPressureSamples) / size
-						: 0;
-
-				backPressureRatio[subtaskIndex] = ratio;
-			}
-
-			return new OperatorBackPressureStats(
-					sample.getSampleId(),
-					sample.getEndTime(),
-					backPressureRatio);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
deleted file mode 100644
index 8a96969..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Optional;
-import java.util.WeakHashMap;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archive.
- *
- * <p>The holder will cache the ExecutionGraph behind a weak reference, which will be cleared
- * at some point once no one else is pointing to the ExecutionGraph.
- * Note that while the holder runs in the same JVM as the JobManager or Archive, the reference should
- * stay valid.
- */
-public class ExecutionGraphHolder {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class);
-
-	private final Time timeout;
-
-	private final WeakHashMap<JobID, AccessExecutionGraph> cache = new WeakHashMap<>();
-
-	public ExecutionGraphHolder() {
-		this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT);
-	}
-
-	public ExecutionGraphHolder(Time timeout) {
-		this.timeout = checkNotNull(timeout);
-	}
-
-	/**
-	 * Retrieves the execution graph with {@link JobID} jid wrapped in {@link Optional} or
-	 * {@link Optional#empty()} if it cannot be found.
-	 *
-	 * @param jid jobID of the execution graph to be retrieved
-	 * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph
-	 */
-	public CompletableFuture<Optional<AccessExecutionGraph>> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) {
-		AccessExecutionGraph cached = cache.get(jid);
-		if (cached != null) {
-			if (cached.getState() == JobStatus.SUSPENDED) {
-				cache.remove(jid);
-			} else {
-				return CompletableFuture.completedFuture(Optional.of(cached));
-			}
-		}
-
-		CompletableFuture<Optional<AccessExecutionGraph>> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);
-
-		executionGraphFuture.thenAcceptAsync(
-			optExecutionGraph ->
-				optExecutionGraph.ifPresent(executionGraph -> cache.put(jid, executionGraph)));
-
-		return executionGraphFuture;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java
deleted file mode 100644
index 71125c9..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-/**
- * A special exception that indicates that an element was not found and that the
- * request should be answered with a {@code 404} return code.
- */
-public class NotFoundException extends Exception {
-
-	private static final long serialVersionUID = -4036006746423754639L;
-
-	public NotFoundException(String message) {
-		super(message);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java
deleted file mode 100644
index bfd5be2..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import java.util.Arrays;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Back pressure statistics of multiple tasks.
- *
- * <p>Statistics are gathered by sampling stack traces of running tasks. The
- * back pressure ratio denotes the ratio of traces indicating back pressure
- * to the total number of sampled traces.
- */
-public class OperatorBackPressureStats {
-
-	/** ID of the corresponding sample. */
-	private final int sampleId;
-
-	/** End time stamp of the corresponding sample. */
-	private final long endTimestamp;
-
-	/** Back pressure ratio per subtask. */
-	private final double[] subTaskBackPressureRatio;
-
-	/** Maximum back pressure ratio. */
-	private final double maxSubTaskBackPressureRatio;
-
-	public OperatorBackPressureStats(
-			int sampleId,
-			long endTimestamp,
-			double[] subTaskBackPressureRatio) {
-
-		this.sampleId = sampleId;
-		this.endTimestamp = endTimestamp;
-		this.subTaskBackPressureRatio = checkNotNull(subTaskBackPressureRatio, "Sub task back pressure ratio");
-		checkArgument(subTaskBackPressureRatio.length >= 1, "No Sub task back pressure ratio specified");
-
-		double max = 0;
-		for (double ratio : subTaskBackPressureRatio) {
-			if (ratio > max) {
-				max = ratio;
-			}
-		}
-
-		maxSubTaskBackPressureRatio = max;
-	}
-
-	/**
-	 * Returns the ID of the sample.
-	 *
-	 * @return ID of the sample
-	 */
-	public int getSampleId() {
-		return sampleId;
-	}
-
-	/**
-	 * Returns the time stamp, when all stack traces were collected at the
-	 * JobManager.
-	 *
-	 * @return Time stamp, when all stack traces were collected at the
-	 * JobManager
-	 */
-	public long getEndTimestamp() {
-		return endTimestamp;
-	}
-
-	/**
-	 * Returns the number of sub tasks.
-	 *
-	 * @return Number of sub tasks.
-	 */
-	public int getNumberOfSubTasks() {
-		return subTaskBackPressureRatio.length;
-	}
-
-	/**
-	 * Returns the ratio of stack traces indicating back pressure to total
-	 * number of sampled stack traces.
-	 *
-	 * @param index Subtask index.
-	 *
-	 * @return Ratio of stack traces indicating back pressure to total number
-	 * of sampled stack traces.
-	 */
-	public double getBackPressureRatio(int index) {
-		return subTaskBackPressureRatio[index];
-	}
-
-	/**
-	 * Returns the maximum back pressure ratio of all sub tasks.
-	 *
-	 * @return Maximum back pressure ratio of all sub tasks.
-	 */
-	public double getMaxBackPressureRatio() {
-		return maxSubTaskBackPressureRatio;
-	}
-
-	@Override
-	public String toString() {
-		return "OperatorBackPressureStats{" +
-				"sampleId=" + sampleId +
-				", endTimestamp=" + endTimestamp +
-				", subTaskBackPressureRatio=" + Arrays.toString(subTaskBackPressureRatio) +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index b393021..993a225 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -21,9 +21,11 @@ package org.apache.flink.runtime.webmonitor;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.WebHandler;
+import org.apache.flink.runtime.rest.handler.legacy.RequestHandler;
 import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils;
-import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java
deleted file mode 100644
index d60f8a4..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-
-/**
- * A sample of stack traces for one or more tasks.
- *
- * <p>The sampling is triggered in {@link StackTraceSampleCoordinator}.
- */
-public class StackTraceSample {
-
-	/** ID of this sample (unique per job). */
-	private final int sampleId;
-
-	/** Time stamp, when the sample was triggered. */
-	private final long startTime;
-
-	/** Time stamp, when all stack traces were collected at the JobManager. */
-	private final long endTime;
-
-	/** Map of stack traces by execution ID. */
-	private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
-
-	/**
-	 * Creates a stack trace sample.
-	 *
-	 * @param sampleId          ID of the sample.
-	 * @param startTime         Time stamp, when the sample was triggered.
-	 * @param endTime           Time stamp, when all stack traces were
-	 *                          collected at the JobManager.
-	 * @param stackTracesByTask Map of stack traces by execution ID.
-	 */
-	public StackTraceSample(
-			int sampleId,
-			long startTime,
-			long endTime,
-			Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask) {
-
-		checkArgument(sampleId >= 0, "Negative sample ID");
-		checkArgument(startTime >= 0, "Negative start time");
-		checkArgument(endTime >= startTime, "End time before start time");
-
-		this.sampleId = sampleId;
-		this.startTime = startTime;
-		this.endTime = endTime;
-		this.stackTracesByTask = Collections.unmodifiableMap(stackTracesByTask);
-	}
-
-	/**
-	 * Returns the ID of the sample.
-	 *
-	 * @return ID of the sample
-	 */
-	public int getSampleId() {
-		return sampleId;
-	}
-
-	/**
-	 * Returns the time stamp, when the sample was triggered.
-	 *
-	 * @return Time stamp, when the sample was triggered
-	 */
-	public long getStartTime() {
-		return startTime;
-	}
-
-	/**
-	 * Returns the time stamp, when all stack traces were collected at the
-	 * JobManager.
-	 *
-	 * @return Time stamp, when all stack traces were collected at the
-	 * JobManager
-	 */
-	public long getEndTime() {
-		return endTime;
-	}
-
-	/**
-	 * Returns the a map of stack traces by execution ID.
-	 *
-	 * @return Map of stack traces by execution ID
-	 */
-	public Map<ExecutionAttemptID, List<StackTraceElement[]>> getStackTraces() {
-		return stackTracesByTask;
-	}
-
-	@Override
-	public String toString() {
-		return "StackTraceSample{" +
-				"sampleId=" + sampleId +
-				", startTime=" + startTime +
-				", endTime=" + endTime +
-				'}';
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
deleted file mode 100644
index 534d2fa..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.Execution;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.messages.StackTraceSampleResponse;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayDeque;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A coordinator for triggering and collecting stack traces of running tasks.
- */
-public class StackTraceSampleCoordinator {
-
-	private static final Logger LOG = LoggerFactory.getLogger(StackTraceSampleCoordinator.class);
-
-	private static final int NUM_GHOST_SAMPLE_IDS = 10;
-
-	private final Object lock = new Object();
-
-	/** Executor used to run the futures. */
-	private final Executor executor;
-
-	/** Time out after the expected sampling duration. */
-	private final long sampleTimeout;
-
-	/** In progress samples (guarded by lock). */
-	private final Map<Integer, PendingStackTraceSample> pendingSamples = new HashMap<>();
-
-	/** A list of recent sample IDs to identify late messages vs. invalid ones. */
-	private final ArrayDeque<Integer> recentPendingSamples = new ArrayDeque<>(NUM_GHOST_SAMPLE_IDS);
-
-	/** Sample ID counter (guarded by lock). */
-	private int sampleIdCounter;
-
-	/**
-	 * Flag indicating whether the coordinator is still running (guarded by
-	 * lock).
-	 */
-	private boolean isShutDown;
-
-	/**
-	 * Creates a new coordinator for the job.
-	 *
-	 * @param executor to use to execute the futures
-	 * @param sampleTimeout Time out after the expected sampling duration.
-	 *                      This is added to the expected duration of a
-	 *                      sample, which is determined by the number of
-	 *                      samples and the delay between each sample.
-	 */
-	public StackTraceSampleCoordinator(Executor executor, long sampleTimeout) {
-		checkArgument(sampleTimeout >= 0L);
-		this.executor = Preconditions.checkNotNull(executor);
-		this.sampleTimeout = sampleTimeout;
-	}
-
-	/**
-	 * Triggers a stack trace sample to all tasks.
-	 *
-	 * @param tasksToSample       Tasks to sample.
-	 * @param numSamples          Number of stack trace samples to collect.
-	 * @param delayBetweenSamples Delay between consecutive samples.
-	 * @param maxStackTraceDepth  Maximum depth of the stack trace. 0 indicates
-	 *                            no maximum and keeps the complete stack trace.
-	 * @return A future of the completed stack trace sample
-	 */
-	@SuppressWarnings("unchecked")
-	public CompletableFuture<StackTraceSample> triggerStackTraceSample(
-			ExecutionVertex[] tasksToSample,
-			int numSamples,
-			Time delayBetweenSamples,
-			int maxStackTraceDepth) {
-
-		checkNotNull(tasksToSample, "Tasks to sample");
-		checkArgument(tasksToSample.length >= 1, "No tasks to sample");
-		checkArgument(numSamples >= 1, "No number of samples");
-		checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace depth");
-
-		// Execution IDs of running tasks
-		ExecutionAttemptID[] triggerIds = new ExecutionAttemptID[tasksToSample.length];
-		Execution[] executions = new Execution[tasksToSample.length];
-
-		// Check that all tasks are RUNNING before triggering anything. The
-		// triggering can still fail.
-		for (int i = 0; i < triggerIds.length; i++) {
-			Execution execution = tasksToSample[i].getCurrentExecutionAttempt();
-			if (execution != null && execution.getState() == ExecutionState.RUNNING) {
-				executions[i] = execution;
-				triggerIds[i] = execution.getAttemptId();
-			} else {
-				return FutureUtils.completedExceptionally(new IllegalStateException("Task " + tasksToSample[i]
-					.getTaskNameWithSubtaskIndex() + " is not running."));
-			}
-		}
-
-		synchronized (lock) {
-			if (isShutDown) {
-				return FutureUtils.completedExceptionally(new IllegalStateException("Shut down"));
-			}
-
-			final int sampleId = sampleIdCounter++;
-
-			LOG.debug("Triggering stack trace sample {}", sampleId);
-
-			final PendingStackTraceSample pending = new PendingStackTraceSample(
-					sampleId, triggerIds);
-
-			// Discard the sample if it takes too long. We don't send cancel
-			// messages to the task managers, but only wait for the responses
-			// and then ignore them.
-			long expectedDuration = numSamples * delayBetweenSamples.toMilliseconds();
-			Time timeout = Time.milliseconds(expectedDuration + sampleTimeout);
-
-			// Add the pending sample before scheduling the discard task to
-			// prevent races with removing it again.
-			pendingSamples.put(sampleId, pending);
-
-			// Trigger all samples
-			for (Execution execution: executions) {
-				final CompletableFuture<StackTraceSampleResponse> stackTraceSampleFuture = execution.requestStackTraceSample(
-					sampleId,
-					numSamples,
-					delayBetweenSamples,
-					maxStackTraceDepth,
-					timeout);
-
-				stackTraceSampleFuture.handleAsync(
-					(StackTraceSampleResponse stackTraceSampleResponse, Throwable throwable) -> {
-						if (stackTraceSampleResponse != null) {
-							collectStackTraces(
-								stackTraceSampleResponse.getSampleId(),
-								stackTraceSampleResponse.getExecutionAttemptID(),
-								stackTraceSampleResponse.getSamples());
-						} else {
-							cancelStackTraceSample(sampleId, throwable);
-						}
-
-						return null;
-					},
-					executor);
-			}
-
-			return pending.getStackTraceSampleFuture();
-		}
-	}
-
-	/**
-	 * Cancels a pending sample.
-	 *
-	 * @param sampleId ID of the sample to cancel.
-	 * @param cause Cause of the cancelling (can be <code>null</code>).
-	 */
-	public void cancelStackTraceSample(int sampleId, Throwable cause) {
-		synchronized (lock) {
-			if (isShutDown) {
-				return;
-			}
-
-			PendingStackTraceSample sample = pendingSamples.remove(sampleId);
-			if (sample != null) {
-				if (cause != null) {
-					LOG.info("Cancelling sample " + sampleId, cause);
-				} else {
-					LOG.info("Cancelling sample {}", sampleId);
-				}
-
-				sample.discard(cause);
-				rememberRecentSampleId(sampleId);
-			}
-		}
-	}
-
-	/**
-	 * Shuts down the coordinator.
-	 *
-	 * <p>After shut down, no further operations are executed.
-	 */
-	public void shutDown() {
-		synchronized (lock) {
-			if (!isShutDown) {
-				LOG.info("Shutting down stack trace sample coordinator.");
-
-				for (PendingStackTraceSample pending : pendingSamples.values()) {
-					pending.discard(new RuntimeException("Shut down"));
-				}
-
-				pendingSamples.clear();
-
-				isShutDown = true;
-			}
-		}
-	}
-
-	/**
-	 * Collects stack traces of a task.
-	 *
-	 * @param sampleId    ID of the sample.
-	 * @param executionId ID of the sampled task.
-	 * @param stackTraces Stack traces of the sampled task.
-	 *
-	 * @throws IllegalStateException If unknown sample ID and not recently
-	 *                               finished or cancelled sample.
-	 */
-	public void collectStackTraces(
-			int sampleId,
-			ExecutionAttemptID executionId,
-			List<StackTraceElement[]> stackTraces) {
-
-		synchronized (lock) {
-			if (isShutDown) {
-				return;
-			}
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Collecting stack trace sample {} of task {}", sampleId, executionId);
-			}
-
-			PendingStackTraceSample pending = pendingSamples.get(sampleId);
-
-			if (pending != null) {
-				pending.collectStackTraces(executionId, stackTraces);
-
-				// Publish the sample
-				if (pending.isComplete()) {
-					pendingSamples.remove(sampleId);
-					rememberRecentSampleId(sampleId);
-
-					pending.completePromiseAndDiscard();
-				}
-			} else if (recentPendingSamples.contains(sampleId)) {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Received late stack trace sample {} of task {}",
-							sampleId, executionId);
-				}
-			} else {
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Unknown sample ID " + sampleId);
-				}
-			}
-		}
-	}
-
-	private void rememberRecentSampleId(int sampleId) {
-		if (recentPendingSamples.size() >= NUM_GHOST_SAMPLE_IDS) {
-			recentPendingSamples.removeFirst();
-		}
-		recentPendingSamples.addLast(sampleId);
-	}
-
-	int getNumberOfPendingSamples() {
-		synchronized (lock) {
-			return pendingSamples.size();
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A pending stack trace sample, which collects stack traces and owns a
-	 * {@link StackTraceSample} promise.
-	 *
-	 * <p>Access pending sample in lock scope.
-	 */
-	private static class PendingStackTraceSample {
-
-		private final int sampleId;
-		private final long startTime;
-		private final Set<ExecutionAttemptID> pendingTasks;
-		private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
-		private final CompletableFuture<StackTraceSample> stackTraceFuture;
-
-		private boolean isDiscarded;
-
-		PendingStackTraceSample(
-				int sampleId,
-				ExecutionAttemptID[] tasksToCollect) {
-
-			this.sampleId = sampleId;
-			this.startTime = System.currentTimeMillis();
-			this.pendingTasks = new HashSet<>(Arrays.asList(tasksToCollect));
-			this.stackTracesByTask = Maps.newHashMapWithExpectedSize(tasksToCollect.length);
-			this.stackTraceFuture = new CompletableFuture<>();
-		}
-
-		int getSampleId() {
-			return sampleId;
-		}
-
-		long getStartTime() {
-			return startTime;
-		}
-
-		boolean isDiscarded() {
-			return isDiscarded;
-		}
-
-		boolean isComplete() {
-			if (isDiscarded) {
-				throw new IllegalStateException("Discarded");
-			}
-
-			return pendingTasks.isEmpty();
-		}
-
-		void discard(Throwable cause) {
-			if (!isDiscarded) {
-				pendingTasks.clear();
-				stackTracesByTask.clear();
-
-				stackTraceFuture.completeExceptionally(new RuntimeException("Discarded", cause));
-
-				isDiscarded = true;
-			}
-		}
-
-		void collectStackTraces(ExecutionAttemptID executionId, List<StackTraceElement[]> stackTraces) {
-			if (isDiscarded) {
-				throw new IllegalStateException("Discarded");
-			}
-
-			if (pendingTasks.remove(executionId)) {
-				stackTracesByTask.put(executionId, Collections.unmodifiableList(stackTraces));
-			} else if (isComplete()) {
-				throw new IllegalStateException("Completed");
-			} else {
-				throw new IllegalArgumentException("Unknown task " + executionId);
-			}
-		}
-
-		void completePromiseAndDiscard() {
-			if (isComplete()) {
-				isDiscarded = true;
-
-				long endTime = System.currentTimeMillis();
-
-				StackTraceSample stackTraceSample = new StackTraceSample(
-						sampleId,
-						startTime,
-						endTime,
-						stackTracesByTask);
-
-				stackTraceFuture.complete(stackTraceSample);
-			} else {
-				throw new IllegalStateException("Not completed yet");
-			}
-		}
-
-		@SuppressWarnings("unchecked")
-		CompletableFuture<StackTraceSample> getStackTraceSampleFuture() {
-			return stackTraceFuture;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
deleted file mode 100644
index 9839abd..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor;
-
-/**
- * Marker interface for web handlers which can describe their paths.
- */
-public interface WebHandler {
-
-	/**
-	 * Returns an array of REST URL's under which this handler can be registered.
-	 *
-	 * @return array containing REST URL's under which this handler can be registered.
-	 */
-	String[] getPaths();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 71e1593..cd128de 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -27,50 +27,54 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.SSLUtils;
-import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
-import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
-import org.apache.flink.runtime.webmonitor.handlers.ConstantTextHandler;
-import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
-import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.WebHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler;
+import org.apache.flink.runtime.rest.handler.legacy.CurrentJobIdsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
+import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JobAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobCancellationWithSavepointHandlers;
+import org.apache.flink.runtime.rest.handler.legacy.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobDetailsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobExceptionsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobManagerConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobPlanHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobStoppingHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobVertexBackPressureHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobVertexDetailsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JobVertexTaskManagersHandler;
+import org.apache.flink.runtime.rest.handler.legacy.RequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttemptDetailsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.SubtasksAllAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.SubtasksTimesHandler;
+import org.apache.flink.runtime.rest.handler.legacy.TaskManagerLogHandler;
+import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
+import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointConfigHandler;
+import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsCache;
+import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsSubtasksHandler;
+import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarAccessDeniedHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarListHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobCancellationWithSavepointHandlers;
-import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobStoppingHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobVertexTaskManagersHandler;
-import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
-import org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler;
-import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
-import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointConfigHandler;
-import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsCache;
-import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsSubtasksHandler;
-import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.metrics.JobManagerMetricsHandler;
-import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
-import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
-import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
@@ -189,7 +193,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 			this.uploadDir = null;
 		}
 
-		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder();
+		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(timeout);
 
 		// - Back pressure stats ----------------------------------------------
 
@@ -255,7 +259,7 @@ public class WebRuntimeMonitor implements WebMonitor {
 		get(router, new SubtasksTimesHandler(currentGraphs, executor));
 		get(router, new JobVertexTaskManagersHandler(currentGraphs, executor, metricFetcher));
 		get(router, new JobVertexAccumulatorsHandler(currentGraphs, executor));
-		get(router, new JobVertexBackPressureHandler(currentGraphs, executor,	backPressureStatsTracker, refreshInterval));
+		get(router, new JobVertexBackPressureHandler(currentGraphs, executor, backPressureStatsTracker, refreshInterval));
 		get(router, new JobVertexMetricsHandler(executor, metricFetcher));
 		get(router, new SubtasksAllAccumulatorsHandler(currentGraphs, executor));
 		get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, executor, metricFetcher));

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
deleted file mode 100644
index 2445d3f..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java
+++ /dev/null
@@ -1,363 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.files;
-
-/*****************************************************************************
- * This code is based on the "HttpStaticFileServerHandler" from the
- * Netty project's HTTP server example.
- *
- * See http://netty.io and
- * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
- *****************************************************************************/
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.rest.handler.RedirectHandler;
-import org.apache.flink.runtime.rest.handler.util.MimeTypes;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
-import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.RandomAccessFile;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Files;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.GregorianCalendar;
-import java.util.Locale;
-import java.util.TimeZone;
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.DATE;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Simple file server handler that serves requests to web frontend's static files, such as
- * HTML, CSS, or JS files.
- *
- * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server
- * example.</p>
- */
-@ChannelHandler.Sharable
-public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectHandler<T> {
-
-	/** Timezone in which this server answers its "if-modified" requests. */
-	private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT");
-
-	/** Date format for HTTP. */
-	public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";
-
-	/** Be default, we allow files to be cached for 5 minutes. */
-	private static final int HTTP_CACHE_SECONDS = 300;
-
-	// ------------------------------------------------------------------------
-
-	/** The path in which the static documents are. */
-	private final File rootPath;
-
-	public StaticFileServerHandler(
-			GatewayRetriever<T> retriever,
-			CompletableFuture<String> localJobManagerAddressFuture,
-			Time timeout,
-			File rootPath) throws IOException {
-
-		super(localJobManagerAddressFuture, retriever, timeout);
-
-		this.rootPath = checkNotNull(rootPath).getCanonicalFile();
-	}
-
-	// ------------------------------------------------------------------------
-	//  Responses to requests
-	// ------------------------------------------------------------------------
-
-	@Override
-	protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception {
-		final HttpRequest request = routed.request();
-		final String requestPath;
-
-		// make sure we request the "index.html" in case there is a directory request
-		if (routed.path().endsWith("/")) {
-			requestPath = routed.path() + "index.html";
-		}
-		// in case the files being accessed are logs or stdout files, find appropriate paths.
-		else if (routed.path().equals("/jobmanager/log") || routed.path().equals("/jobmanager/stdout")) {
-			requestPath = "";
-		} else {
-			requestPath = routed.path();
-		}
-
-		respondToRequest(channelHandlerContext, request, requestPath);
-	}
-
-	/**
-	 * Response when running with leading JobManager.
-	 */
-	private void respondToRequest(ChannelHandlerContext ctx, HttpRequest request, String requestPath)
-			throws IOException, ParseException, URISyntaxException {
-
-		// convert to absolute path
-		final File file = new File(rootPath, requestPath);
-
-		if (!file.exists()) {
-			// file does not exist. Try to load it with the classloader
-			ClassLoader cl = StaticFileServerHandler.class.getClassLoader();
-
-			try (InputStream resourceStream = cl.getResourceAsStream("web" + requestPath)) {
-				boolean success = false;
-				try {
-					if (resourceStream != null) {
-						URL root = cl.getResource("web");
-						URL requested = cl.getResource("web" + requestPath);
-
-						if (root != null && requested != null) {
-							URI rootURI = new URI(root.getPath()).normalize();
-							URI requestedURI = new URI(requested.getPath()).normalize();
-
-							// Check that we don't load anything from outside of the
-							// expected scope.
-							if (!rootURI.relativize(requestedURI).equals(requestedURI)) {
-								logger.debug("Loading missing file from classloader: {}", requestPath);
-								// ensure that directory to file exists.
-								file.getParentFile().mkdirs();
-								Files.copy(resourceStream, file.toPath());
-
-								success = true;
-							}
-						}
-					}
-				} catch (Throwable t) {
-					logger.error("error while responding", t);
-				} finally {
-					if (!success) {
-						logger.debug("Unable to load requested file {} from classloader", requestPath);
-						sendError(ctx, NOT_FOUND);
-						return;
-					}
-				}
-			}
-		}
-
-		if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) {
-			sendError(ctx, NOT_FOUND);
-			return;
-		}
-
-		if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) {
-			sendError(ctx, NOT_FOUND);
-			return;
-		}
-
-		// cache validation
-		final String ifModifiedSince = request.headers().get(IF_MODIFIED_SINCE);
-		if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) {
-			SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
-			Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince);
-
-			// Only compare up to the second because the datetime format we send to the client
-			// does not have milliseconds
-			long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000;
-			long fileLastModifiedSeconds = file.lastModified() / 1000;
-			if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) {
-				if (logger.isDebugEnabled()) {
-					logger.debug("Responding 'NOT MODIFIED' for file '" + file.getAbsolutePath() + '\'');
-				}
-
-				sendNotModified(ctx);
-				return;
-			}
-		}
-
-		if (logger.isDebugEnabled()) {
-			logger.debug("Responding with file '" + file.getAbsolutePath() + '\'');
-		}
-
-		// Don't need to close this manually. Netty's DefaultFileRegion will take care of it.
-		final RandomAccessFile raf;
-		try {
-			raf = new RandomAccessFile(file, "r");
-		}
-		catch (FileNotFoundException e) {
-			sendError(ctx, NOT_FOUND);
-			return;
-		}
-
-		try {
-			long fileLength = raf.length();
-
-			HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-			setContentTypeHeader(response, file);
-
-			// since the log and out files are rapidly changing, we don't want to browser to cache them
-			if (!(requestPath.contains("log") || requestPath.contains("out"))) {
-				setDateAndCacheHeaders(response, file);
-			}
-			if (HttpHeaders.isKeepAlive(request)) {
-				response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-			}
-			HttpHeaders.setContentLength(response, fileLength);
-
-			// write the initial line and the header.
-			ctx.write(response);
-
-			// write the content.
-			ChannelFuture lastContentFuture;
-			if (ctx.pipeline().get(SslHandler.class) == null) {
-				ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise());
-				lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-			} else {
-				lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
-					ctx.newProgressivePromise());
-				// HttpChunkedInput will write the end marker (LastHttpContent) for us.
-			}
-
-			// close the connection, if no keep-alive is needed
-			if (!HttpHeaders.isKeepAlive(request)) {
-				lastContentFuture.addListener(ChannelFutureListener.CLOSE);
-			}
-		} catch (Exception e) {
-			raf.close();
-			logger.error("Failed to serve file.", e);
-			sendError(ctx, INTERNAL_SERVER_ERROR);
-		}
-	}
-
-	@Override
-	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
-		if (ctx.channel().isActive()) {
-			logger.error("Caught exception", cause);
-			sendError(ctx, INTERNAL_SERVER_ERROR);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-	//  Utilities to encode headers and responses
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Writes a simple  error response message.
-	 *
-	 * @param ctx    The channel context to write the response to.
-	 * @param status The response status.
-	 */
-	public static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
-		FullHttpResponse response = new DefaultFullHttpResponse(
-				HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8));
-		response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
-
-		// close the connection as soon as the error message is sent.
-		ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
-	}
-
-	/**
-	 * Send the "304 Not Modified" response. This response can be used when the
-	 * file timestamp is the same as what the browser is sending up.
-	 *
-	 * @param ctx The channel context to write the response to.
-	 */
-	public static void sendNotModified(ChannelHandlerContext ctx) {
-		FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED);
-		setDateHeader(response);
-
-		// close the connection as soon as the error message is sent.
-		ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
-	}
-
-	/**
-	 * Sets the "date" header for the HTTP response.
-	 *
-	 * @param response HTTP response
-	 */
-	public static void setDateHeader(FullHttpResponse response) {
-		SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
-		dateFormatter.setTimeZone(GMT_TIMEZONE);
-
-		Calendar time = new GregorianCalendar();
-		response.headers().set(DATE, dateFormatter.format(time.getTime()));
-	}
-
-	/**
-	 * Sets the "date" and "cache" headers for the HTTP Response.
-	 *
-	 * @param response    The HTTP response object.
-	 * @param fileToCache File to extract the modification timestamp from.
-	 */
-	public static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) {
-		SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);
-		dateFormatter.setTimeZone(GMT_TIMEZONE);
-
-		// date header
-		Calendar time = new GregorianCalendar();
-		response.headers().set(DATE, dateFormatter.format(time.getTime()));
-
-		// cache headers
-		time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);
-		response.headers().set(EXPIRES, dateFormatter.format(time.getTime()));
-		response.headers().set(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);
-		response.headers().set(LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));
-	}
-
-	/**
-	 * Sets the content type header for the HTTP Response.
-	 *
-	 * @param response HTTP response
-	 * @param file     file to extract content type
-	 */
-	public static void setContentTypeHeader(HttpResponse response, File file) {
-		String mimeType = MimeTypes.getMimeTypeForFileName(file.getName());
-		String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType();
-		response.headers().set(CONTENT_TYPE, mimeFinal);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
deleted file mode 100644
index 053d3f7..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.NotFoundException;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for request handlers whose response depends on an ExecutionGraph
- * that can be retrieved via "jobid" parameter.
- */
-public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonRequestHandler {
-
-	private final ExecutionGraphHolder executionGraphHolder;
-
-	public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
-		super(executor);
-		this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder);
-	}
-
-	@Override
-	public CompletableFuture<String> handleJsonRequest(
-			Map<String, String> pathParams,
-			Map<String, String> queryParams,
-			JobManagerGateway jobManagerGateway) {
-		String jidString = pathParams.get("jobid");
-		if (jidString == null) {
-			throw new RuntimeException("JobId parameter missing");
-		}
-
-		JobID jid;
-		try {
-			jid = JobID.fromHexString(jidString);
-		}
-		catch (Exception e) {
-			return FutureUtils.completedExceptionally(new FlinkException("Invalid JobID string '" + jidString + "'", e));
-		}
-
-		final CompletableFuture<Optional<AccessExecutionGraph>> graphFuture = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
-
-		return graphFuture.thenComposeAsync(
-			(Optional<AccessExecutionGraph> optGraph) -> {
-				if (optGraph.isPresent()) {
-					return handleRequest(optGraph.get(), pathParams);
-				} else {
-					throw new FlinkFutureException(new NotFoundException("Could not find job with jobId " + jid + '.'));
-				}
-			}, executor);
-	}
-
-	public abstract CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
deleted file mode 100644
index df09225..0000000
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for request handlers whose response depends on a specific job vertex (defined
- * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter).
- */
-public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler {
-
-	public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) {
-		super(executionGraphHolder, executor);
-	}
-
-	@Override
-	public final CompletableFuture<String> handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
-		final JobVertexID vid = parseJobVertexId(params);
-
-		final AccessExecutionJobVertex jobVertex = graph.getJobVertex(vid);
-		if (jobVertex == null) {
-			throw new IllegalArgumentException("No vertex with ID '" + vid + "' exists.");
-		}
-
-		return handleRequest(jobVertex, params);
-	}
-
-	/**
-	 * Returns the job vertex ID parsed from the provided parameters.
-	 *
-	 * @param params Path parameters
-	 * @return Parsed job vertex ID or <code>null</code> if not available.
-	 */
-	public static JobVertexID parseJobVertexId(Map<String, String> params) {
-		String jobVertexIdParam = params.get("vertexid");
-		if (jobVertexIdParam == null) {
-			return null;
-		}
-
-		try {
-			return JobVertexID.fromHexString(jobVertexIdParam);
-		} catch (RuntimeException ignored) {
-			return null;
-		}
-	}
-
-	public abstract CompletableFuture<String> handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params);
-}


Mime
View raw message