Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 01F99200D2B for ; Wed, 20 Sep 2017 00:44:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id F40D91609DD; Tue, 19 Sep 2017 22:44:16 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0F06E1609EE for ; Wed, 20 Sep 2017 00:44:13 +0200 (CEST) Received: (qmail 69378 invoked by uid 500); 19 Sep 2017 22:44:13 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 68699 invoked by uid 99); 19 Sep 2017 22:44:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 19 Sep 2017 22:44:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9DF05F5A0B; Tue, 19 Sep 2017 22:44:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: trohrmann@apache.org To: commits@flink.apache.org Date: Tue, 19 Sep 2017 22:44:26 -0000 Message-Id: <02169505918e47cfb66650cb36b2cfc5@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [16/16] flink git commit: [FLINK-7531] Move Flink legacy rest handler to flink-runtime archived-at: Tue, 19 Sep 2017 22:44:17 -0000 [FLINK-7531] Move Flink legacy rest handler to flink-runtime Move metrics handlers under o.a.f.runtime.webmonitor.handlers Move StaticFileServerHandler under o.a.f.runtime.webmonitor.files This closes #4600. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4fc019a9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4fc019a9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4fc019a9 Branch: refs/heads/master Commit: 4fc019a96a08446d7ba5f57664904abcd585e31c Parents: 3277010 Author: Till Rohrmann Authored: Fri Aug 18 09:52:30 2017 +0200 Committer: Till Rohrmann Committed: Wed Sep 20 00:40:24 2017 +0200 ---------------------------------------------------------------------- .../webmonitor/BackPressureStatsTracker.java | 334 -------------- .../webmonitor/ExecutionGraphHolder.java | 86 ---- .../runtime/webmonitor/NotFoundException.java | 32 -- .../webmonitor/OperatorBackPressureStats.java | 126 ------ .../webmonitor/RuntimeMonitorHandler.java | 4 +- .../runtime/webmonitor/StackTraceSample.java | 119 ----- .../webmonitor/StackTraceSampleCoordinator.java | 392 ----------------- .../flink/runtime/webmonitor/WebHandler.java | 32 -- .../runtime/webmonitor/WebRuntimeMonitor.java | 82 ++-- .../files/StaticFileServerHandler.java | 363 --------------- .../AbstractExecutionGraphRequestHandler.java | 80 ---- .../AbstractJobVertexRequestHandler.java | 72 --- .../handlers/AbstractJsonRequestHandler.java | 87 ---- .../AbstractSubtaskAttemptRequestHandler.java | 78 ---- .../handlers/AbstractSubtaskRequestHandler.java | 66 --- .../handlers/ClusterOverviewHandler.java | 105 ----- .../handlers/ConstantTextHandler.java | 57 --- .../handlers/CurrentJobIdsHandler.java | 112 ----- .../handlers/CurrentJobsOverviewHandler.java | 182 -------- .../handlers/DashboardConfigHandler.java | 90 ---- .../handlers/JarAccessDeniedHandler.java | 1 + .../webmonitor/handlers/JarActionHandler.java | 2 + .../webmonitor/handlers/JarDeleteHandler.java | 2 + .../webmonitor/handlers/JarListHandler.java | 2 + .../webmonitor/handlers/JarPlanHandler.java | 1 + .../webmonitor/handlers/JarRunHandler.java | 1 + .../webmonitor/handlers/JarUploadHandler.java | 1 + .../handlers/JobAccumulatorsHandler.java | 107 ----- .../handlers/JobCancellationHandler.java | 72 --- .../JobCancellationWithSavepointHandlers.java | 428 ------------------ .../webmonitor/handlers/JobConfigHandler.java | 119 ----- .../webmonitor/handlers/JobDetailsHandler.java | 225 ---------- .../handlers/JobExceptionsHandler.java | 137 ------ .../handlers/JobManagerConfigHandler.java | 87 ---- .../webmonitor/handlers/JobPlanHandler.java | 67 --- .../webmonitor/handlers/JobStoppingHandler.java | 72 --- .../handlers/JobVertexAccumulatorsHandler.java | 113 ----- .../handlers/JobVertexBackPressureHandler.java | 147 ------- .../handlers/JobVertexDetailsHandler.java | 160 ------- .../handlers/JobVertexTaskManagersHandler.java | 211 --------- .../webmonitor/handlers/JsonFactory.java | 35 -- .../webmonitor/handlers/RequestHandler.java | 56 --- .../handlers/RequestHandlerException.java | 31 -- .../SubtaskCurrentAttemptDetailsHandler.java | 49 --- ...taskExecutionAttemptAccumulatorsHandler.java | 134 ------ .../SubtaskExecutionAttemptDetailsHandler.java | 167 ------- .../SubtasksAllAccumulatorsHandler.java | 131 ------ .../handlers/SubtasksTimesHandler.java | 141 ------ .../handlers/TaskManagerLogHandler.java | 335 -------------- .../handlers/TaskManagersHandler.java | 205 --------- .../checkpoints/CheckpointConfigHandler.java | 120 ----- .../checkpoints/CheckpointStatsCache.java | 81 ---- .../CheckpointStatsDetailsHandler.java | 203 --------- .../CheckpointStatsDetailsSubtasksHandler.java | 234 ---------- .../checkpoints/CheckpointStatsHandler.java | 277 ------------ .../webmonitor/history/HistoryServer.java | 2 +- .../history/HistoryServerArchiveFetcher.java | 2 +- .../HistoryServerStaticFileServerHandler.java | 2 +- .../metrics/AbstractMetricsHandler.java | 139 ------ .../metrics/JobManagerMetricsHandler.java | 57 --- .../webmonitor/metrics/JobMetricsHandler.java | 55 --- .../metrics/JobVertexMetricsHandler.java | 57 --- .../webmonitor/metrics/MetricFetcher.java | 211 --------- .../runtime/webmonitor/metrics/MetricStore.java | 305 ------------- .../metrics/TaskManagerMetricsHandler.java | 59 --- .../webmonitor/utils/MutableIOMetrics.java | 109 ----- .../BackPressureStatsTrackerITCase.java | 332 -------------- .../BackPressureStatsTrackerTest.java | 192 -------- .../StackTraceSampleCoordinatorITCase.java | 203 --------- .../StackTraceSampleCoordinatorTest.java | 441 ------------------- .../runtime/webmonitor/files/MimeTypesTest.java | 75 ---- .../handlers/ClusterOverviewHandlerTest.java | 38 -- .../handlers/CurrentJobIdsHandlerTest.java | 38 -- .../CurrentJobsOverviewHandlerTest.java | 121 ----- .../handlers/DashboardConfigHandlerTest.java | 59 --- .../handlers/HandlerRedirectUtilsTest.java | 74 ---- .../handlers/JarActionHandlerTest.java | 13 +- .../handlers/JobAccumulatorsHandlerTest.java | 83 ---- .../handlers/JobCancellationHandlerTest.java | 44 -- ...obCancellationWithSavepointHandlersTest.java | 334 -------------- .../handlers/JobConfigHandlerTest.java | 92 ---- .../handlers/JobDetailsHandlerTest.java | 169 ------- .../handlers/JobExceptionsHandlerTest.java | 101 ----- .../handlers/JobManagerConfigHandlerTest.java | 37 -- .../webmonitor/handlers/JobPlanHandlerTest.java | 60 --- .../handlers/JobStoppingHandlerTest.java | 45 -- .../JobVertexAccumulatorsHandlerTest.java | 85 ---- .../JobVertexBackPressureHandlerTest.java | 211 --------- .../handlers/JobVertexDetailsHandlerTest.java | 109 ----- .../JobVertexTaskManagersHandlerTest.java | 132 ------ ...SubtaskCurrentAttemptDetailsHandlerTest.java | 40 -- ...ExecutionAttemptAccumulatorsHandlerTest.java | 91 ---- ...btaskExecutionAttemptDetailsHandlerTest.java | 109 ----- .../SubtasksAllAccumulatorsHandlerTest.java | 97 ---- .../handlers/SubtasksTimesHandlerTest.java | 103 ----- .../handlers/TaskManagerLogHandlerTest.java | 149 ------- .../handlers/TaskManagersHandlerTest.java | 44 -- .../CheckpointConfigHandlerTest.java | 195 -------- .../checkpoints/CheckpointStatsCacheTest.java | 71 --- .../CheckpointStatsDetailsHandlerTest.java | 358 --------------- .../checkpoints/CheckpointStatsHandlerTest.java | 432 ------------------ ...heckpointStatsSubtaskDetailsHandlerTest.java | 389 ---------------- .../webmonitor/history/FsJobArchivistTest.java | 2 +- .../webmonitor/history/HistoryServerTest.java | 2 +- .../metrics/AbstractMetricsHandlerTest.java | 172 -------- .../metrics/JobManagerMetricsHandlerTest.java | 84 ---- .../metrics/JobMetricsHandlerTest.java | 86 ---- .../metrics/JobVertexMetricsHandlerTest.java | 90 ---- .../webmonitor/metrics/MetricFetcherTest.java | 195 -------- .../webmonitor/metrics/MetricStoreTest.java | 88 ---- .../metrics/TaskManagerMetricsHandlerTest.java | 86 ---- .../utils/ArchivedExecutionBuilder.java | 150 ------- .../utils/ArchivedExecutionConfigBuilder.java | 71 --- .../utils/ArchivedExecutionGraphBuilder.java | 140 ------ .../ArchivedExecutionJobVertexBuilder.java | 84 ---- .../utils/ArchivedExecutionVertexBuilder.java | 73 --- .../utils/ArchivedJobGenerationUtils.java | 164 ------- .../flink/runtime/rest/NotFoundException.java | 32 ++ .../flink/runtime/rest/handler/WebHandler.java | 32 ++ .../AbstractExecutionGraphRequestHandler.java | 79 ++++ .../legacy/AbstractJobVertexRequestHandler.java | 71 +++ .../legacy/AbstractJsonRequestHandler.java | 88 ++++ .../AbstractSubtaskAttemptRequestHandler.java | 77 ++++ .../legacy/AbstractSubtaskRequestHandler.java | 65 +++ .../handler/legacy/ClusterOverviewHandler.java | 105 +++++ .../handler/legacy/ConstantTextHandler.java | 57 +++ .../handler/legacy/CurrentJobIdsHandler.java | 112 +++++ .../legacy/CurrentJobsOverviewHandler.java | 182 ++++++++ .../handler/legacy/DashboardConfigHandler.java | 90 ++++ .../handler/legacy/ExecutionGraphHolder.java | 82 ++++ .../handler/legacy/JobAccumulatorsHandler.java | 106 +++++ .../handler/legacy/JobCancellationHandler.java | 72 +++ .../JobCancellationWithSavepointHandlers.java | 427 ++++++++++++++++++ .../rest/handler/legacy/JobConfigHandler.java | 118 +++++ .../rest/handler/legacy/JobDetailsHandler.java | 224 ++++++++++ .../handler/legacy/JobExceptionsHandler.java | 136 ++++++ .../handler/legacy/JobManagerConfigHandler.java | 87 ++++ .../rest/handler/legacy/JobPlanHandler.java | 66 +++ .../rest/handler/legacy/JobStoppingHandler.java | 72 +++ .../legacy/JobVertexAccumulatorsHandler.java | 112 +++++ .../legacy/JobVertexBackPressureHandler.java | 145 ++++++ .../handler/legacy/JobVertexDetailsHandler.java | 159 +++++++ .../legacy/JobVertexTaskManagersHandler.java | 210 +++++++++ .../rest/handler/legacy/JsonFactory.java | 35 ++ .../rest/handler/legacy/RequestHandler.java | 56 +++ .../handler/legacy/RequestHandlerException.java | 31 ++ .../SubtaskCurrentAttemptDetailsHandler.java | 48 ++ ...taskExecutionAttemptAccumulatorsHandler.java | 133 ++++++ .../SubtaskExecutionAttemptDetailsHandler.java | 166 +++++++ .../legacy/SubtasksAllAccumulatorsHandler.java | 130 ++++++ .../handler/legacy/SubtasksTimesHandler.java | 140 ++++++ .../handler/legacy/TaskManagerLogHandler.java | 335 ++++++++++++++ .../handler/legacy/TaskManagersHandler.java | 205 +++++++++ .../backpressure/BackPressureStatsTracker.java | 333 ++++++++++++++ .../backpressure/OperatorBackPressureStats.java | 126 ++++++ .../legacy/backpressure/StackTraceSample.java | 119 +++++ .../StackTraceSampleCoordinator.java | 392 +++++++++++++++++ .../checkpoints/CheckpointConfigHandler.java | 120 +++++ .../checkpoints/CheckpointStatsCache.java | 81 ++++ .../CheckpointStatsDetailsHandler.java | 203 +++++++++ .../CheckpointStatsDetailsSubtasksHandler.java | 233 ++++++++++ .../checkpoints/CheckpointStatsHandler.java | 277 ++++++++++++ .../legacy/files/StaticFileServerHandler.java | 363 +++++++++++++++ .../legacy/metrics/AbstractMetricsHandler.java | 139 ++++++ .../metrics/JobManagerMetricsHandler.java | 57 +++ .../legacy/metrics/JobMetricsHandler.java | 55 +++ .../legacy/metrics/JobVertexMetricsHandler.java | 57 +++ .../handler/legacy/metrics/MetricFetcher.java | 211 +++++++++ .../handler/legacy/metrics/MetricStore.java | 305 +++++++++++++ .../metrics/TaskManagerMetricsHandler.java | 59 +++ .../rest/handler/util/MutableIOMetrics.java | 109 +++++ .../legacy/ClusterOverviewHandlerTest.java | 38 ++ .../legacy/CurrentJobIdsHandlerTest.java | 38 ++ .../legacy/CurrentJobsOverviewHandlerTest.java | 121 +++++ .../legacy/DashboardConfigHandlerTest.java | 59 +++ .../legacy/HandlerRedirectUtilsTest.java | 74 ++++ .../legacy/JobAccumulatorsHandlerTest.java | 82 ++++ .../legacy/JobCancellationHandlerTest.java | 44 ++ ...obCancellationWithSavepointHandlersTest.java | 333 ++++++++++++++ .../handler/legacy/JobConfigHandlerTest.java | 91 ++++ .../handler/legacy/JobDetailsHandlerTest.java | 168 +++++++ .../legacy/JobExceptionsHandlerTest.java | 100 +++++ .../legacy/JobManagerConfigHandlerTest.java | 37 ++ .../rest/handler/legacy/JobPlanHandlerTest.java | 59 +++ .../handler/legacy/JobStoppingHandlerTest.java | 45 ++ .../JobVertexAccumulatorsHandlerTest.java | 84 ++++ .../JobVertexBackPressureHandlerTest.java | 209 +++++++++ .../legacy/JobVertexDetailsHandlerTest.java | 108 +++++ .../JobVertexTaskManagersHandlerTest.java | 132 ++++++ ...SubtaskCurrentAttemptDetailsHandlerTest.java | 40 ++ ...ExecutionAttemptAccumulatorsHandlerTest.java | 91 ++++ ...btaskExecutionAttemptDetailsHandlerTest.java | 109 +++++ .../SubtasksAllAccumulatorsHandlerTest.java | 97 ++++ .../legacy/SubtasksTimesHandlerTest.java | 103 +++++ .../legacy/TaskManagerLogHandlerTest.java | 149 +++++++ .../handler/legacy/TaskManagersHandlerTest.java | 44 ++ .../BackPressureStatsTrackerITCase.java | 329 ++++++++++++++ .../BackPressureStatsTrackerTest.java | 185 ++++++++ .../StackTraceSampleCoordinatorITCase.java | 203 +++++++++ .../StackTraceSampleCoordinatorTest.java | 432 ++++++++++++++++++ .../CheckpointConfigHandlerTest.java | 195 ++++++++ .../checkpoints/CheckpointStatsCacheTest.java | 71 +++ .../CheckpointStatsDetailsHandlerTest.java | 358 +++++++++++++++ .../checkpoints/CheckpointStatsHandlerTest.java | 432 ++++++++++++++++++ ...heckpointStatsSubtaskDetailsHandlerTest.java | 389 ++++++++++++++++ .../handler/legacy/files/MimeTypesTest.java | 75 ++++ .../metrics/AbstractMetricsHandlerTest.java | 172 ++++++++ .../metrics/JobManagerMetricsHandlerTest.java | 84 ++++ .../legacy/metrics/JobMetricsHandlerTest.java | 86 ++++ .../metrics/JobVertexMetricsHandlerTest.java | 90 ++++ .../legacy/metrics/MetricFetcherTest.java | 195 ++++++++ .../handler/legacy/metrics/MetricStoreTest.java | 88 ++++ .../metrics/TaskManagerMetricsHandlerTest.java | 86 ++++ .../legacy/utils/ArchivedExecutionBuilder.java | 150 +++++++ .../utils/ArchivedExecutionConfigBuilder.java | 71 +++ .../utils/ArchivedExecutionGraphBuilder.java | 140 ++++++ .../ArchivedExecutionJobVertexBuilder.java | 84 ++++ .../utils/ArchivedExecutionVertexBuilder.java | 73 +++ .../utils/ArchivedJobGenerationUtils.java | 164 +++++++ 219 files changed, 14198 insertions(+), 14237 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java deleted file mode 100644 index 5e4e63a..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java +++ /dev/null @@ -1,334 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; - -import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; -import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; -import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; - -import scala.Option; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Back pressure statistics tracker. - * - *

Back pressure is determined by sampling running tasks. If a task is - * slowed down by back pressure it will be stuck in memory requests to a - * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}. - * - *

The back pressured stack traces look like this: - * - *

- * java.lang.Object.wait(Native Method)
- * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
- * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING
- * request
- * [...]
- * 
- */ -public class BackPressureStatsTracker { - - private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTracker.class); - - /** Maximum stack trace depth for samples. */ - static final int MAX_STACK_TRACE_DEPTH = 3; - - /** Expected class name for back pressure indicating stack trace element. */ - static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool"; - - /** Expected method name for back pressure indicating stack trace element. */ - static final String EXPECTED_METHOD_NAME = "requestBufferBlocking"; - - /** Lock guarding trigger operations. */ - private final Object lock = new Object(); - - /* Stack trace sample coordinator. */ - private final StackTraceSampleCoordinator coordinator; - - /** - * Completed stats. Important: Job vertex IDs need to be scoped by job ID, - * because they are potentially constant across runs messing up the cached - * data. - */ - private final Cache operatorStatsCache; - - /** Pending in progress stats. Important: Job vertex IDs need to be scoped - * by job ID, because they are potentially constant across runs messing up - * the cached data.*/ - private final Set pendingStats = new HashSet<>(); - - /** Cleanup interval for completed stats cache. */ - private final int cleanUpInterval; - - private final int numSamples; - - private final Time delayBetweenSamples; - - /** Flag indicating whether the stats tracker has been shut down. */ - private boolean shutDown; - - /** - * Creates a back pressure statistics tracker. - * - * @param cleanUpInterval Clean up interval for completed stats. - * @param numSamples Number of stack trace samples when determining back pressure. - * @param delayBetweenSamples Delay between samples when determining back pressure. - */ - public BackPressureStatsTracker( - StackTraceSampleCoordinator coordinator, - int cleanUpInterval, - int numSamples, - Time delayBetweenSamples) { - - this.coordinator = checkNotNull(coordinator, "Stack trace sample coordinator"); - - checkArgument(cleanUpInterval >= 0, "Clean up interval"); - this.cleanUpInterval = cleanUpInterval; - - checkArgument(numSamples >= 1, "Number of samples"); - this.numSamples = numSamples; - - this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay between samples"); - - this.operatorStatsCache = CacheBuilder.newBuilder() - .concurrencyLevel(1) - .expireAfterAccess(cleanUpInterval, TimeUnit.MILLISECONDS) - .build(); - } - - /** Cleanup interval for completed stats cache. */ - public long getCleanUpInterval() { - return cleanUpInterval; - } - - /** - * Returns back pressure statistics for a operator. - * - * @param vertex Operator to get the stats for. - * - * @return Back pressure statistics for an operator - */ - public Option getOperatorBackPressureStats(ExecutionJobVertex vertex) { - return Option.apply(operatorStatsCache.getIfPresent(vertex)); - } - - /** - * Triggers a stack trace sample for a operator to gather the back pressure - * statistics. If there is a sample in progress for the operator, the call - * is ignored. - * - * @param vertex Operator to get the stats for. - * @return Flag indicating whether a sample with triggered. - */ - @SuppressWarnings("unchecked") - public boolean triggerStackTraceSample(ExecutionJobVertex vertex) { - synchronized (lock) { - if (shutDown) { - return false; - } - - if (!pendingStats.contains(vertex) && - !vertex.getGraph().getState().isGloballyTerminalState()) { - - Executor executor = vertex.getGraph().getFutureExecutor(); - - // Only trigger if still active job - if (executor != null) { - pendingStats.add(vertex); - - if (LOG.isDebugEnabled()) { - LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices())); - } - - CompletableFuture sample = coordinator.triggerStackTraceSample( - vertex.getTaskVertices(), - numSamples, - delayBetweenSamples, - MAX_STACK_TRACE_DEPTH); - - sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor); - - return true; - } - } - - return false; - } - } - - /** - * Cleans up the operator stats cache if it contains timed out entries. - * - *

The Guava cache only evicts as maintenance during normal operations. - * If this handler is inactive, it will never be cleaned. - */ - public void cleanUpOperatorStatsCache() { - operatorStatsCache.cleanUp(); - } - - /** - * Shuts down the stats tracker. - * - *

Invalidates the cache and clears all pending stats. - */ - public void shutDown() { - synchronized (lock) { - if (!shutDown) { - operatorStatsCache.invalidateAll(); - pendingStats.clear(); - - shutDown = true; - } - } - } - - /** - * Invalidates the cache (irrespective of clean up interval). - */ - void invalidateOperatorStatsCache() { - operatorStatsCache.invalidateAll(); - } - - /** - * Callback on completed stack trace sample. - */ - class StackTraceSampleCompletionCallback implements BiFunction { - - private final ExecutionJobVertex vertex; - - public StackTraceSampleCompletionCallback(ExecutionJobVertex vertex) { - this.vertex = vertex; - } - - @Override - public Void apply(StackTraceSample stackTraceSample, Throwable throwable) { - synchronized (lock) { - try { - if (shutDown) { - return null; - } - - // Job finished, ignore. - JobStatus jobState = vertex.getGraph().getState(); - if (jobState.isGloballyTerminalState()) { - LOG.debug("Ignoring sample, because job is in state " + jobState + "."); - } else if (stackTraceSample != null) { - OperatorBackPressureStats stats = createStatsFromSample(stackTraceSample); - operatorStatsCache.put(vertex, stats); - } else { - LOG.debug("Failed to gather stack trace sample.", throwable); - } - } catch (Throwable t) { - LOG.error("Error during stats completion.", t); - } finally { - pendingStats.remove(vertex); - } - - return null; - } - } - - /** - * Creates the back pressure stats from a stack trace sample. - * - * @param sample Stack trace sample to base stats on. - * - * @return Back pressure stats - */ - private OperatorBackPressureStats createStatsFromSample(StackTraceSample sample) { - Map> traces = sample.getStackTraces(); - - // Map task ID to subtask index, because the web interface expects - // it like that. - Map subtaskIndexMap = Maps - .newHashMapWithExpectedSize(traces.size()); - - Set sampledTasks = sample.getStackTraces().keySet(); - - for (ExecutionVertex task : vertex.getTaskVertices()) { - ExecutionAttemptID taskId = task.getCurrentExecutionAttempt().getAttemptId(); - if (sampledTasks.contains(taskId)) { - subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex()); - } else { - LOG.debug("Outdated sample. A task, which is part of the " + - "sample has been reset."); - } - } - - // Ratio of blocked samples to total samples per sub task. Array - // position corresponds to sub task index. - double[] backPressureRatio = new double[traces.size()]; - - for (Entry> entry : traces.entrySet()) { - int backPressureSamples = 0; - - List taskTraces = entry.getValue(); - - for (StackTraceElement[] trace : taskTraces) { - for (int i = trace.length - 1; i >= 0; i--) { - StackTraceElement elem = trace[i]; - - if (elem.getClassName().equals(EXPECTED_CLASS_NAME) && - elem.getMethodName().equals(EXPECTED_METHOD_NAME)) { - - backPressureSamples++; - break; // Continue with next stack trace - } - } - } - - int subtaskIndex = subtaskIndexMap.get(entry.getKey()); - - int size = taskTraces.size(); - double ratio = (size > 0) - ? ((double) backPressureSamples) / size - : 0; - - backPressureRatio[subtaskIndex] = ratio; - } - - return new OperatorBackPressureStats( - sample.getSampleId(), - sample.getEndTime(), - backPressureRatio); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java deleted file mode 100644 index 8a96969..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/ExecutionGraphHolder.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Optional; -import java.util.WeakHashMap; -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archive. - * - *

The holder will cache the ExecutionGraph behind a weak reference, which will be cleared - * at some point once no one else is pointing to the ExecutionGraph. - * Note that while the holder runs in the same JVM as the JobManager or Archive, the reference should - * stay valid. - */ -public class ExecutionGraphHolder { - - private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class); - - private final Time timeout; - - private final WeakHashMap cache = new WeakHashMap<>(); - - public ExecutionGraphHolder() { - this(WebRuntimeMonitor.DEFAULT_REQUEST_TIMEOUT); - } - - public ExecutionGraphHolder(Time timeout) { - this.timeout = checkNotNull(timeout); - } - - /** - * Retrieves the execution graph with {@link JobID} jid wrapped in {@link Optional} or - * {@link Optional#empty()} if it cannot be found. - * - * @param jid jobID of the execution graph to be retrieved - * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph - */ - public CompletableFuture> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) { - AccessExecutionGraph cached = cache.get(jid); - if (cached != null) { - if (cached.getState() == JobStatus.SUSPENDED) { - cache.remove(jid); - } else { - return CompletableFuture.completedFuture(Optional.of(cached)); - } - } - - CompletableFuture> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout); - - executionGraphFuture.thenAcceptAsync( - optExecutionGraph -> - optExecutionGraph.ifPresent(executionGraph -> cache.put(jid, executionGraph))); - - return executionGraphFuture; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java deleted file mode 100644 index 71125c9..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/NotFoundException.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor; - -/** - * A special exception that indicates that an element was not found and that the - * request should be answered with a {@code 404} return code. - */ -public class NotFoundException extends Exception { - - private static final long serialVersionUID = -4036006746423754639L; - - public NotFoundException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java deleted file mode 100644 index bfd5be2..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor; - -import java.util.Arrays; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Back pressure statistics of multiple tasks. - * - *

Statistics are gathered by sampling stack traces of running tasks. The - * back pressure ratio denotes the ratio of traces indicating back pressure - * to the total number of sampled traces. - */ -public class OperatorBackPressureStats { - - /** ID of the corresponding sample. */ - private final int sampleId; - - /** End time stamp of the corresponding sample. */ - private final long endTimestamp; - - /** Back pressure ratio per subtask. */ - private final double[] subTaskBackPressureRatio; - - /** Maximum back pressure ratio. */ - private final double maxSubTaskBackPressureRatio; - - public OperatorBackPressureStats( - int sampleId, - long endTimestamp, - double[] subTaskBackPressureRatio) { - - this.sampleId = sampleId; - this.endTimestamp = endTimestamp; - this.subTaskBackPressureRatio = checkNotNull(subTaskBackPressureRatio, "Sub task back pressure ratio"); - checkArgument(subTaskBackPressureRatio.length >= 1, "No Sub task back pressure ratio specified"); - - double max = 0; - for (double ratio : subTaskBackPressureRatio) { - if (ratio > max) { - max = ratio; - } - } - - maxSubTaskBackPressureRatio = max; - } - - /** - * Returns the ID of the sample. - * - * @return ID of the sample - */ - public int getSampleId() { - return sampleId; - } - - /** - * Returns the time stamp, when all stack traces were collected at the - * JobManager. - * - * @return Time stamp, when all stack traces were collected at the - * JobManager - */ - public long getEndTimestamp() { - return endTimestamp; - } - - /** - * Returns the number of sub tasks. - * - * @return Number of sub tasks. - */ - public int getNumberOfSubTasks() { - return subTaskBackPressureRatio.length; - } - - /** - * Returns the ratio of stack traces indicating back pressure to total - * number of sampled stack traces. - * - * @param index Subtask index. - * - * @return Ratio of stack traces indicating back pressure to total number - * of sampled stack traces. - */ - public double getBackPressureRatio(int index) { - return subTaskBackPressureRatio[index]; - } - - /** - * Returns the maximum back pressure ratio of all sub tasks. - * - * @return Maximum back pressure ratio of all sub tasks. - */ - public double getMaxBackPressureRatio() { - return maxSubTaskBackPressureRatio; - } - - @Override - public String toString() { - return "OperatorBackPressureStats{" + - "sampleId=" + sampleId + - ", endTimestamp=" + endTimestamp + - ", subTaskBackPressureRatio=" + Arrays.toString(subTaskBackPressureRatio) + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java index b393021..993a225 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java @@ -21,9 +21,11 @@ package org.apache.flink.runtime.webmonitor; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.rest.NotFoundException; import org.apache.flink.runtime.rest.handler.RedirectHandler; +import org.apache.flink.runtime.rest.handler.WebHandler; +import org.apache.flink.runtime.rest.handler.legacy.RequestHandler; import org.apache.flink.runtime.rest.handler.util.HandlerRedirectUtils; -import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.ExceptionUtils; http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java deleted file mode 100644 index d60f8a4..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor; - -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; - -import java.util.Collections; -import java.util.List; -import java.util.Map; - -import static org.apache.flink.util.Preconditions.checkArgument; - -/** - * A sample of stack traces for one or more tasks. - * - *

The sampling is triggered in {@link StackTraceSampleCoordinator}. - */ -public class StackTraceSample { - - /** ID of this sample (unique per job). */ - private final int sampleId; - - /** Time stamp, when the sample was triggered. */ - private final long startTime; - - /** Time stamp, when all stack traces were collected at the JobManager. */ - private final long endTime; - - /** Map of stack traces by execution ID. */ - private final Map> stackTracesByTask; - - /** - * Creates a stack trace sample. - * - * @param sampleId ID of the sample. - * @param startTime Time stamp, when the sample was triggered. - * @param endTime Time stamp, when all stack traces were - * collected at the JobManager. - * @param stackTracesByTask Map of stack traces by execution ID. - */ - public StackTraceSample( - int sampleId, - long startTime, - long endTime, - Map> stackTracesByTask) { - - checkArgument(sampleId >= 0, "Negative sample ID"); - checkArgument(startTime >= 0, "Negative start time"); - checkArgument(endTime >= startTime, "End time before start time"); - - this.sampleId = sampleId; - this.startTime = startTime; - this.endTime = endTime; - this.stackTracesByTask = Collections.unmodifiableMap(stackTracesByTask); - } - - /** - * Returns the ID of the sample. - * - * @return ID of the sample - */ - public int getSampleId() { - return sampleId; - } - - /** - * Returns the time stamp, when the sample was triggered. - * - * @return Time stamp, when the sample was triggered - */ - public long getStartTime() { - return startTime; - } - - /** - * Returns the time stamp, when all stack traces were collected at the - * JobManager. - * - * @return Time stamp, when all stack traces were collected at the - * JobManager - */ - public long getEndTime() { - return endTime; - } - - /** - * Returns the a map of stack traces by execution ID. - * - * @return Map of stack traces by execution ID - */ - public Map> getStackTraces() { - return stackTracesByTask; - } - - @Override - public String toString() { - return "StackTraceSample{" + - "sampleId=" + sampleId + - ", startTime=" + startTime + - ", endTime=" + endTime + - '}'; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java deleted file mode 100644 index 534d2fa..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java +++ /dev/null @@ -1,392 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor; - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.Execution; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.messages.StackTraceSampleResponse; -import org.apache.flink.util.Preconditions; - -import org.apache.flink.shaded.guava18.com.google.common.collect.Maps; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayDeque; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -import static org.apache.flink.util.Preconditions.checkArgument; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * A coordinator for triggering and collecting stack traces of running tasks. - */ -public class StackTraceSampleCoordinator { - - private static final Logger LOG = LoggerFactory.getLogger(StackTraceSampleCoordinator.class); - - private static final int NUM_GHOST_SAMPLE_IDS = 10; - - private final Object lock = new Object(); - - /** Executor used to run the futures. */ - private final Executor executor; - - /** Time out after the expected sampling duration. */ - private final long sampleTimeout; - - /** In progress samples (guarded by lock). */ - private final Map pendingSamples = new HashMap<>(); - - /** A list of recent sample IDs to identify late messages vs. invalid ones. */ - private final ArrayDeque recentPendingSamples = new ArrayDeque<>(NUM_GHOST_SAMPLE_IDS); - - /** Sample ID counter (guarded by lock). */ - private int sampleIdCounter; - - /** - * Flag indicating whether the coordinator is still running (guarded by - * lock). - */ - private boolean isShutDown; - - /** - * Creates a new coordinator for the job. - * - * @param executor to use to execute the futures - * @param sampleTimeout Time out after the expected sampling duration. - * This is added to the expected duration of a - * sample, which is determined by the number of - * samples and the delay between each sample. - */ - public StackTraceSampleCoordinator(Executor executor, long sampleTimeout) { - checkArgument(sampleTimeout >= 0L); - this.executor = Preconditions.checkNotNull(executor); - this.sampleTimeout = sampleTimeout; - } - - /** - * Triggers a stack trace sample to all tasks. - * - * @param tasksToSample Tasks to sample. - * @param numSamples Number of stack trace samples to collect. - * @param delayBetweenSamples Delay between consecutive samples. - * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates - * no maximum and keeps the complete stack trace. - * @return A future of the completed stack trace sample - */ - @SuppressWarnings("unchecked") - public CompletableFuture triggerStackTraceSample( - ExecutionVertex[] tasksToSample, - int numSamples, - Time delayBetweenSamples, - int maxStackTraceDepth) { - - checkNotNull(tasksToSample, "Tasks to sample"); - checkArgument(tasksToSample.length >= 1, "No tasks to sample"); - checkArgument(numSamples >= 1, "No number of samples"); - checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace depth"); - - // Execution IDs of running tasks - ExecutionAttemptID[] triggerIds = new ExecutionAttemptID[tasksToSample.length]; - Execution[] executions = new Execution[tasksToSample.length]; - - // Check that all tasks are RUNNING before triggering anything. The - // triggering can still fail. - for (int i = 0; i < triggerIds.length; i++) { - Execution execution = tasksToSample[i].getCurrentExecutionAttempt(); - if (execution != null && execution.getState() == ExecutionState.RUNNING) { - executions[i] = execution; - triggerIds[i] = execution.getAttemptId(); - } else { - return FutureUtils.completedExceptionally(new IllegalStateException("Task " + tasksToSample[i] - .getTaskNameWithSubtaskIndex() + " is not running.")); - } - } - - synchronized (lock) { - if (isShutDown) { - return FutureUtils.completedExceptionally(new IllegalStateException("Shut down")); - } - - final int sampleId = sampleIdCounter++; - - LOG.debug("Triggering stack trace sample {}", sampleId); - - final PendingStackTraceSample pending = new PendingStackTraceSample( - sampleId, triggerIds); - - // Discard the sample if it takes too long. We don't send cancel - // messages to the task managers, but only wait for the responses - // and then ignore them. - long expectedDuration = numSamples * delayBetweenSamples.toMilliseconds(); - Time timeout = Time.milliseconds(expectedDuration + sampleTimeout); - - // Add the pending sample before scheduling the discard task to - // prevent races with removing it again. - pendingSamples.put(sampleId, pending); - - // Trigger all samples - for (Execution execution: executions) { - final CompletableFuture stackTraceSampleFuture = execution.requestStackTraceSample( - sampleId, - numSamples, - delayBetweenSamples, - maxStackTraceDepth, - timeout); - - stackTraceSampleFuture.handleAsync( - (StackTraceSampleResponse stackTraceSampleResponse, Throwable throwable) -> { - if (stackTraceSampleResponse != null) { - collectStackTraces( - stackTraceSampleResponse.getSampleId(), - stackTraceSampleResponse.getExecutionAttemptID(), - stackTraceSampleResponse.getSamples()); - } else { - cancelStackTraceSample(sampleId, throwable); - } - - return null; - }, - executor); - } - - return pending.getStackTraceSampleFuture(); - } - } - - /** - * Cancels a pending sample. - * - * @param sampleId ID of the sample to cancel. - * @param cause Cause of the cancelling (can be null). - */ - public void cancelStackTraceSample(int sampleId, Throwable cause) { - synchronized (lock) { - if (isShutDown) { - return; - } - - PendingStackTraceSample sample = pendingSamples.remove(sampleId); - if (sample != null) { - if (cause != null) { - LOG.info("Cancelling sample " + sampleId, cause); - } else { - LOG.info("Cancelling sample {}", sampleId); - } - - sample.discard(cause); - rememberRecentSampleId(sampleId); - } - } - } - - /** - * Shuts down the coordinator. - * - *

After shut down, no further operations are executed. - */ - public void shutDown() { - synchronized (lock) { - if (!isShutDown) { - LOG.info("Shutting down stack trace sample coordinator."); - - for (PendingStackTraceSample pending : pendingSamples.values()) { - pending.discard(new RuntimeException("Shut down")); - } - - pendingSamples.clear(); - - isShutDown = true; - } - } - } - - /** - * Collects stack traces of a task. - * - * @param sampleId ID of the sample. - * @param executionId ID of the sampled task. - * @param stackTraces Stack traces of the sampled task. - * - * @throws IllegalStateException If unknown sample ID and not recently - * finished or cancelled sample. - */ - public void collectStackTraces( - int sampleId, - ExecutionAttemptID executionId, - List stackTraces) { - - synchronized (lock) { - if (isShutDown) { - return; - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Collecting stack trace sample {} of task {}", sampleId, executionId); - } - - PendingStackTraceSample pending = pendingSamples.get(sampleId); - - if (pending != null) { - pending.collectStackTraces(executionId, stackTraces); - - // Publish the sample - if (pending.isComplete()) { - pendingSamples.remove(sampleId); - rememberRecentSampleId(sampleId); - - pending.completePromiseAndDiscard(); - } - } else if (recentPendingSamples.contains(sampleId)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Received late stack trace sample {} of task {}", - sampleId, executionId); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Unknown sample ID " + sampleId); - } - } - } - } - - private void rememberRecentSampleId(int sampleId) { - if (recentPendingSamples.size() >= NUM_GHOST_SAMPLE_IDS) { - recentPendingSamples.removeFirst(); - } - recentPendingSamples.addLast(sampleId); - } - - int getNumberOfPendingSamples() { - synchronized (lock) { - return pendingSamples.size(); - } - } - - // ------------------------------------------------------------------------ - - /** - * A pending stack trace sample, which collects stack traces and owns a - * {@link StackTraceSample} promise. - * - *

Access pending sample in lock scope. - */ - private static class PendingStackTraceSample { - - private final int sampleId; - private final long startTime; - private final Set pendingTasks; - private final Map> stackTracesByTask; - private final CompletableFuture stackTraceFuture; - - private boolean isDiscarded; - - PendingStackTraceSample( - int sampleId, - ExecutionAttemptID[] tasksToCollect) { - - this.sampleId = sampleId; - this.startTime = System.currentTimeMillis(); - this.pendingTasks = new HashSet<>(Arrays.asList(tasksToCollect)); - this.stackTracesByTask = Maps.newHashMapWithExpectedSize(tasksToCollect.length); - this.stackTraceFuture = new CompletableFuture<>(); - } - - int getSampleId() { - return sampleId; - } - - long getStartTime() { - return startTime; - } - - boolean isDiscarded() { - return isDiscarded; - } - - boolean isComplete() { - if (isDiscarded) { - throw new IllegalStateException("Discarded"); - } - - return pendingTasks.isEmpty(); - } - - void discard(Throwable cause) { - if (!isDiscarded) { - pendingTasks.clear(); - stackTracesByTask.clear(); - - stackTraceFuture.completeExceptionally(new RuntimeException("Discarded", cause)); - - isDiscarded = true; - } - } - - void collectStackTraces(ExecutionAttemptID executionId, List stackTraces) { - if (isDiscarded) { - throw new IllegalStateException("Discarded"); - } - - if (pendingTasks.remove(executionId)) { - stackTracesByTask.put(executionId, Collections.unmodifiableList(stackTraces)); - } else if (isComplete()) { - throw new IllegalStateException("Completed"); - } else { - throw new IllegalArgumentException("Unknown task " + executionId); - } - } - - void completePromiseAndDiscard() { - if (isComplete()) { - isDiscarded = true; - - long endTime = System.currentTimeMillis(); - - StackTraceSample stackTraceSample = new StackTraceSample( - sampleId, - startTime, - endTime, - stackTracesByTask); - - stackTraceFuture.complete(stackTraceSample); - } else { - throw new IllegalStateException("Not completed yet"); - } - } - - @SuppressWarnings("unchecked") - CompletableFuture getStackTraceSampleFuture() { - return stackTraceFuture; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java deleted file mode 100644 index 9839abd..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebHandler.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor; - -/** - * Marker interface for web handlers which can describe their paths. - */ -public interface WebHandler { - - /** - * Returns an array of REST URL's under which this handler can be registered. - * - * @return array containing REST URL's under which this handler can be registered. - */ - String[] getPaths(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 71e1593..cd128de 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -27,50 +27,54 @@ import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.net.SSLUtils; -import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler; -import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler; -import org.apache.flink.runtime.webmonitor.handlers.ConstantTextHandler; -import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler; -import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler; -import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler; +import org.apache.flink.runtime.rest.handler.WebHandler; +import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler; +import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler; +import org.apache.flink.runtime.rest.handler.legacy.CurrentJobIdsHandler; +import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler; +import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.JobAccumulatorsHandler; +import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler; +import org.apache.flink.runtime.rest.handler.legacy.JobCancellationWithSavepointHandlers; +import org.apache.flink.runtime.rest.handler.legacy.JobConfigHandler; +import org.apache.flink.runtime.rest.handler.legacy.JobDetailsHandler; +import org.apache.flink.runtime.rest.handler.legacy.JobExceptionsHandler; +import org.apache.flink.runtime.rest.handler.legacy.JobManagerConfigHandler; +import org.apache.flink.runtime.rest.handler.legacy.JobPlanHandler; +import org.apache.flink.runtime.rest.handler.legacy.JobStoppingHandler; +import org.apache.flink.runtime.rest.handler.legacy.JobVertexAccumulatorsHandler; +import org.apache.flink.runtime.rest.handler.legacy.JobVertexBackPressureHandler; +import org.apache.flink.runtime.rest.handler.legacy.JobVertexDetailsHandler; +import org.apache.flink.runtime.rest.handler.legacy.JobVertexTaskManagersHandler; +import org.apache.flink.runtime.rest.handler.legacy.RequestHandler; +import org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttemptDetailsHandler; +import org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptAccumulatorsHandler; +import org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler; +import org.apache.flink.runtime.rest.handler.legacy.SubtasksAllAccumulatorsHandler; +import org.apache.flink.runtime.rest.handler.legacy.SubtasksTimesHandler; +import org.apache.flink.runtime.rest.handler.legacy.TaskManagerLogHandler; +import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker; +import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator; +import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointConfigHandler; +import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsCache; +import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsHandler; +import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsSubtasksHandler; +import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsHandler; +import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; +import org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler; +import org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler; +import org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler; import org.apache.flink.runtime.webmonitor.handlers.JarAccessDeniedHandler; import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler; import org.apache.flink.runtime.webmonitor.handlers.JarListHandler; import org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler; import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler; import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobCancellationWithSavepointHandlers; -import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobStoppingHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler; -import org.apache.flink.runtime.webmonitor.handlers.JobVertexTaskManagersHandler; -import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; -import org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler; -import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler; -import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler; -import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler; -import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler; -import org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler; -import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler; -import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointConfigHandler; -import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsCache; -import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsHandler; -import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsSubtasksHandler; -import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; -import org.apache.flink.runtime.webmonitor.metrics.JobManagerMetricsHandler; -import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler; -import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler; -import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; -import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap; @@ -189,7 +193,7 @@ public class WebRuntimeMonitor implements WebMonitor { this.uploadDir = null; } - ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(); + ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(timeout); // - Back pressure stats ---------------------------------------------- @@ -255,7 +259,7 @@ public class WebRuntimeMonitor implements WebMonitor { get(router, new SubtasksTimesHandler(currentGraphs, executor)); get(router, new JobVertexTaskManagersHandler(currentGraphs, executor, metricFetcher)); get(router, new JobVertexAccumulatorsHandler(currentGraphs, executor)); - get(router, new JobVertexBackPressureHandler(currentGraphs, executor, backPressureStatsTracker, refreshInterval)); + get(router, new JobVertexBackPressureHandler(currentGraphs, executor, backPressureStatsTracker, refreshInterval)); get(router, new JobVertexMetricsHandler(executor, metricFetcher)); get(router, new SubtasksAllAccumulatorsHandler(currentGraphs, executor)); get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, executor, metricFetcher)); http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java deleted file mode 100644 index 2445d3f..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/files/StaticFileServerHandler.java +++ /dev/null @@ -1,363 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.files; - -/***************************************************************************** - * This code is based on the "HttpStaticFileServerHandler" from the - * Netty project's HTTP server example. - * - * See http://netty.io and - * https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java - *****************************************************************************/ - -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.rest.handler.RedirectHandler; -import org.apache.flink.runtime.rest.handler.util.MimeTypes; -import org.apache.flink.runtime.webmonitor.RestfulGateway; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; - -import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; -import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler; -import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile; -import org.apache.flink.shaded.netty4.io.netty.util.CharsetUtil; - -import java.io.File; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.RandomAccessFile; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; -import java.nio.file.Files; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.Locale; -import java.util.TimeZone; -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.DATE; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.EXPIRES; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.IF_MODIFIED_SINCE; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.NOT_MODIFIED; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK; -import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Simple file server handler that serves requests to web frontend's static files, such as - * HTML, CSS, or JS files. - * - *

This code is based on the "HttpStaticFileServerHandler" from the Netty project's HTTP server - * example.

- */ -@ChannelHandler.Sharable -public class StaticFileServerHandler extends RedirectHandler { - - /** Timezone in which this server answers its "if-modified" requests. */ - private static final TimeZone GMT_TIMEZONE = TimeZone.getTimeZone("GMT"); - - /** Date format for HTTP. */ - public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz"; - - /** Be default, we allow files to be cached for 5 minutes. */ - private static final int HTTP_CACHE_SECONDS = 300; - - // ------------------------------------------------------------------------ - - /** The path in which the static documents are. */ - private final File rootPath; - - public StaticFileServerHandler( - GatewayRetriever retriever, - CompletableFuture localJobManagerAddressFuture, - Time timeout, - File rootPath) throws IOException { - - super(localJobManagerAddressFuture, retriever, timeout); - - this.rootPath = checkNotNull(rootPath).getCanonicalFile(); - } - - // ------------------------------------------------------------------------ - // Responses to requests - // ------------------------------------------------------------------------ - - @Override - protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Routed routed, T gateway) throws Exception { - final HttpRequest request = routed.request(); - final String requestPath; - - // make sure we request the "index.html" in case there is a directory request - if (routed.path().endsWith("/")) { - requestPath = routed.path() + "index.html"; - } - // in case the files being accessed are logs or stdout files, find appropriate paths. - else if (routed.path().equals("/jobmanager/log") || routed.path().equals("/jobmanager/stdout")) { - requestPath = ""; - } else { - requestPath = routed.path(); - } - - respondToRequest(channelHandlerContext, request, requestPath); - } - - /** - * Response when running with leading JobManager. - */ - private void respondToRequest(ChannelHandlerContext ctx, HttpRequest request, String requestPath) - throws IOException, ParseException, URISyntaxException { - - // convert to absolute path - final File file = new File(rootPath, requestPath); - - if (!file.exists()) { - // file does not exist. Try to load it with the classloader - ClassLoader cl = StaticFileServerHandler.class.getClassLoader(); - - try (InputStream resourceStream = cl.getResourceAsStream("web" + requestPath)) { - boolean success = false; - try { - if (resourceStream != null) { - URL root = cl.getResource("web"); - URL requested = cl.getResource("web" + requestPath); - - if (root != null && requested != null) { - URI rootURI = new URI(root.getPath()).normalize(); - URI requestedURI = new URI(requested.getPath()).normalize(); - - // Check that we don't load anything from outside of the - // expected scope. - if (!rootURI.relativize(requestedURI).equals(requestedURI)) { - logger.debug("Loading missing file from classloader: {}", requestPath); - // ensure that directory to file exists. - file.getParentFile().mkdirs(); - Files.copy(resourceStream, file.toPath()); - - success = true; - } - } - } - } catch (Throwable t) { - logger.error("error while responding", t); - } finally { - if (!success) { - logger.debug("Unable to load requested file {} from classloader", requestPath); - sendError(ctx, NOT_FOUND); - return; - } - } - } - } - - if (!file.exists() || file.isHidden() || file.isDirectory() || !file.isFile()) { - sendError(ctx, NOT_FOUND); - return; - } - - if (!file.getCanonicalFile().toPath().startsWith(rootPath.toPath())) { - sendError(ctx, NOT_FOUND); - return; - } - - // cache validation - final String ifModifiedSince = request.headers().get(IF_MODIFIED_SINCE); - if (ifModifiedSince != null && !ifModifiedSince.isEmpty()) { - SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); - Date ifModifiedSinceDate = dateFormatter.parse(ifModifiedSince); - - // Only compare up to the second because the datetime format we send to the client - // does not have milliseconds - long ifModifiedSinceDateSeconds = ifModifiedSinceDate.getTime() / 1000; - long fileLastModifiedSeconds = file.lastModified() / 1000; - if (ifModifiedSinceDateSeconds == fileLastModifiedSeconds) { - if (logger.isDebugEnabled()) { - logger.debug("Responding 'NOT MODIFIED' for file '" + file.getAbsolutePath() + '\''); - } - - sendNotModified(ctx); - return; - } - } - - if (logger.isDebugEnabled()) { - logger.debug("Responding with file '" + file.getAbsolutePath() + '\''); - } - - // Don't need to close this manually. Netty's DefaultFileRegion will take care of it. - final RandomAccessFile raf; - try { - raf = new RandomAccessFile(file, "r"); - } - catch (FileNotFoundException e) { - sendError(ctx, NOT_FOUND); - return; - } - - try { - long fileLength = raf.length(); - - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - setContentTypeHeader(response, file); - - // since the log and out files are rapidly changing, we don't want to browser to cache them - if (!(requestPath.contains("log") || requestPath.contains("out"))) { - setDateAndCacheHeaders(response, file); - } - if (HttpHeaders.isKeepAlive(request)) { - response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); - } - HttpHeaders.setContentLength(response, fileLength); - - // write the initial line and the header. - ctx.write(response); - - // write the content. - ChannelFuture lastContentFuture; - if (ctx.pipeline().get(SslHandler.class) == null) { - ctx.write(new DefaultFileRegion(raf.getChannel(), 0, fileLength), ctx.newProgressivePromise()); - lastContentFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT); - } else { - lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), - ctx.newProgressivePromise()); - // HttpChunkedInput will write the end marker (LastHttpContent) for us. - } - - // close the connection, if no keep-alive is needed - if (!HttpHeaders.isKeepAlive(request)) { - lastContentFuture.addListener(ChannelFutureListener.CLOSE); - } - } catch (Exception e) { - raf.close(); - logger.error("Failed to serve file.", e); - sendError(ctx, INTERNAL_SERVER_ERROR); - } - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - if (ctx.channel().isActive()) { - logger.error("Caught exception", cause); - sendError(ctx, INTERNAL_SERVER_ERROR); - } - } - - // ------------------------------------------------------------------------ - // Utilities to encode headers and responses - // ------------------------------------------------------------------------ - - /** - * Writes a simple error response message. - * - * @param ctx The channel context to write the response to. - * @param status The response status. - */ - public static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) { - FullHttpResponse response = new DefaultFullHttpResponse( - HTTP_1_1, status, Unpooled.copiedBuffer("Failure: " + status + "\r\n", CharsetUtil.UTF_8)); - response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8"); - - // close the connection as soon as the error message is sent. - ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); - } - - /** - * Send the "304 Not Modified" response. This response can be used when the - * file timestamp is the same as what the browser is sending up. - * - * @param ctx The channel context to write the response to. - */ - public static void sendNotModified(ChannelHandlerContext ctx) { - FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, NOT_MODIFIED); - setDateHeader(response); - - // close the connection as soon as the error message is sent. - ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); - } - - /** - * Sets the "date" header for the HTTP response. - * - * @param response HTTP response - */ - public static void setDateHeader(FullHttpResponse response) { - SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); - dateFormatter.setTimeZone(GMT_TIMEZONE); - - Calendar time = new GregorianCalendar(); - response.headers().set(DATE, dateFormatter.format(time.getTime())); - } - - /** - * Sets the "date" and "cache" headers for the HTTP Response. - * - * @param response The HTTP response object. - * @param fileToCache File to extract the modification timestamp from. - */ - public static void setDateAndCacheHeaders(HttpResponse response, File fileToCache) { - SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US); - dateFormatter.setTimeZone(GMT_TIMEZONE); - - // date header - Calendar time = new GregorianCalendar(); - response.headers().set(DATE, dateFormatter.format(time.getTime())); - - // cache headers - time.add(Calendar.SECOND, HTTP_CACHE_SECONDS); - response.headers().set(EXPIRES, dateFormatter.format(time.getTime())); - response.headers().set(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS); - response.headers().set(LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified()))); - } - - /** - * Sets the content type header for the HTTP Response. - * - * @param response HTTP response - * @param file file to extract content type - */ - public static void setContentTypeHeader(HttpResponse response, File file) { - String mimeType = MimeTypes.getMimeTypeForFileName(file.getName()); - String mimeFinal = mimeType != null ? mimeType : MimeTypes.getDefaultMimeType(); - response.headers().set(CONTENT_TYPE, mimeFinal); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java deleted file mode 100644 index 053d3f7..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.concurrent.FlinkFutureException; -import org.apache.flink.runtime.concurrent.FutureUtils; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; -import org.apache.flink.runtime.webmonitor.NotFoundException; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.Preconditions; - -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Base class for request handlers whose response depends on an ExecutionGraph - * that can be retrieved via "jobid" parameter. - */ -public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonRequestHandler { - - private final ExecutionGraphHolder executionGraphHolder; - - public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executor); - this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder); - } - - @Override - public CompletableFuture handleJsonRequest( - Map pathParams, - Map queryParams, - JobManagerGateway jobManagerGateway) { - String jidString = pathParams.get("jobid"); - if (jidString == null) { - throw new RuntimeException("JobId parameter missing"); - } - - JobID jid; - try { - jid = JobID.fromHexString(jidString); - } - catch (Exception e) { - return FutureUtils.completedExceptionally(new FlinkException("Invalid JobID string '" + jidString + "'", e)); - } - - final CompletableFuture> graphFuture = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway); - - return graphFuture.thenComposeAsync( - (Optional optGraph) -> { - if (optGraph.isPresent()) { - return handleRequest(optGraph.get(), pathParams); - } else { - throw new FlinkFutureException(new NotFoundException("Could not find job with jobId " + jid + '.')); - } - }, executor); - } - - public abstract CompletableFuture handleRequest(AccessExecutionGraph graph, Map params); -} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java deleted file mode 100644 index df09225..0000000 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJobVertexRequestHandler.java +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.webmonitor.handlers; - -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; - -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -/** - * Base class for request handlers whose response depends on a specific job vertex (defined - * via the "vertexid" parameter) in a specific job, defined via (defined voa the "jobid" parameter). - */ -public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler { - - public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); - } - - @Override - public final CompletableFuture handleRequest(AccessExecutionGraph graph, Map params) { - final JobVertexID vid = parseJobVertexId(params); - - final AccessExecutionJobVertex jobVertex = graph.getJobVertex(vid); - if (jobVertex == null) { - throw new IllegalArgumentException("No vertex with ID '" + vid + "' exists."); - } - - return handleRequest(jobVertex, params); - } - - /** - * Returns the job vertex ID parsed from the provided parameters. - * - * @param params Path parameters - * @return Parsed job vertex ID or null if not available. - */ - public static JobVertexID parseJobVertexId(Map params) { - String jobVertexIdParam = params.get("vertexid"); - if (jobVertexIdParam == null) { - return null; - } - - try { - return JobVertexID.fromHexString(jobVertexIdParam); - } catch (RuntimeException ignored) { - return null; - } - } - - public abstract CompletableFuture handleRequest(AccessExecutionJobVertex jobVertex, Map params); -}