[FLINK-3310] [runtime-web] Add back pressure statistics to web monitor (backend)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7e70da3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7e70da3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7e70da3
Branch: refs/heads/master
Commit: b7e70da351390df6b179cd0cedd11f311401e5a4
Parents: d69fe30
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Feb 1 20:51:23 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Feb 8 15:04:44 2016 +0100
----------------------------------------------------------------------
docs/setup/config.md | 4 +
.../flink/configuration/ConfigConstants.java | 25 +-
flink-runtime-web/pom.xml | 6 +
.../webmonitor/BackPressureStatsTracker.java | 307 ++++++++++++
.../webmonitor/OperatorBackPressureStats.java | 126 +++++
.../runtime/webmonitor/StackTraceSample.java | 111 +++++
.../webmonitor/StackTraceSampleCoordinator.java | 477 +++++++++++++++++++
.../runtime/webmonitor/WebRuntimeMonitor.java | 80 +++-
.../handlers/JobVertexBackPressureHandler.java | 126 +++++
.../BackPressureStatsTrackerITCase.java | 300 ++++++++++++
.../BackPressureStatsTrackerTest.java | 196 ++++++++
.../StackTraceSampleCoordinatorTest.java | 377 +++++++++++++++
.../JobVertexBackPressureHandlerTest.java | 197 ++++++++
.../runtime/executiongraph/ExecutionGraph.java | 7 +-
.../runtime/executiongraph/ExecutionVertex.java | 24 +-
.../flink/runtime/jobmanager/JobManager.scala | 2 +-
.../messages/StackTraceSampleMessages.scala | 106 +++++
.../flink/runtime/taskmanager/TaskManager.scala | 156 +++++-
.../runtime/taskmanager/TaskManagerTest.java | 315 ++++++++++--
19 files changed, 2868 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 91297a4..0eb3acb 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -165,6 +165,10 @@ The following parameters configure Flink's JobManager and TaskManagers.
- `jobmanager.web.history`: The number of latest jobs that the JobManager's web front-end in its history (DEFAULT: 5).
- `jobmanager.web.checkpoints.disable`: Disables checkpoint statistics (DEFAULT: `false`).
- `jobmanager.web.checkpoints.history`: Number of checkpoint statistics to remember (DEFAULT: `10`).
+- `jobmanager.web.backpressure.cleanup-interval`: Time after which cached stats are cleaned up if not accessed (DEFAULT: `600000`, 10 mins).
+- `jobmanager.web.backpressure.refresh-interval`: Time after which available stats are deprecated and need to be refreshed (DEFAULT: `60000`, 1 min).
+- `jobmanager.web.backpressure.num-samples`: Number of stack trace samples to take to determine back pressure (DEFAULT: `100`).
+- `jobmanager.web.backpressure.delay-between-samples`: Delay between stack trace samples to determine back pressure (DEFAULT: `50`, 50 ms).
### Webclient
http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index aba3540..2b75644 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -357,7 +357,18 @@ public final class ConfigConstants {
/** Config parameter defining the number of checkpoints to remember for recent history. */
public static final String JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = "jobmanager.web.checkpoints.history";
-
+
+ /** Time after which cached stats are cleaned up if not accessed. */
+ public static final String JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = "jobmanager.web.backpressure.cleanup-interval";
+
+ /** Time after which available stats are deprecated and need to be refreshed (by resampling). */
+ public static final String JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = "jobmanager.web.backpressure.refresh-interval";
+
+ /** Number of stack trace samples to take to determine back pressure. */
+ public static final String JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = "jobmanager.web.backpressure.num-samples";
+
+ /** Delay between stack trace samples to determine back pressure. */
+ public static final String JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = "jobmanager.web.backpressure.delay-between-samples";
// ------------------------------ AKKA ------------------------------------
@@ -693,6 +704,18 @@ public final class ConfigConstants {
/** Default number of checkpoints to remember for recent history. */
public static final int DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = 10;
+ /** Time after which cached stats are cleaned up. */
+ public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = 10 * 60 * 1000;
+
+ /** Time after which available stats are deprecated and need to be refreshed (by resampling). */
+ public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = 60 * 1000;
+
+ /** Number of samples to take to determine back pressure. */
+ public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = 100;
+
+ /** Delay between samples to determine back pressure. */
+ public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = 50;
+
// ------------------------------ Akka Values ------------------------------
public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";
http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 7a6147b..1a19fb1 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -134,6 +134,12 @@ under the License.
<version>${curator.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-testkit_${scala.binary.version}</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
new file mode 100644
index 0000000..ff0573a
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import akka.dispatch.OnComplete;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Maps;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Back pressure statistics tracker.
+ *
+ * <p>Back pressure is determined by sampling running tasks. If a task is
+ * slowed down by back pressure it will be stuck in memory requests to a
+ * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}.
+ *
+ * <p>The back pressured stack traces look like this:
+ *
+ * <pre>
+ * java.lang.Object.wait(Native Method)
+ * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
+ * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING
+ * request
+ * [...]
+ * </pre>
+ */
+public class BackPressureStatsTracker {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTracker.class);
+
+ /** Maximum stack trace depth for samples. */
+ static final int MAX_STACK_TRACE_DEPTH = 3;
+
+ /** Expected class name for back pressure indicating stack trace element. */
+ static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
+
+ /** Expected method name for back pressure indicating stack trace element. */
+ static final String EXPECTED_METHOD_NAME = "requestBufferBlocking";
+
+ /** Lock guarding trigger operations. */
+ private final Object lock = new Object();
+
+ /* Stack trace sample coordinator. */
+ private final StackTraceSampleCoordinator coordinator;
+
+ /**
+ * Completed stats. Important: Job vertex IDs need to be scoped by job ID,
+ * because they are potentially constant across runs messing up the cached
+ * data.
+ */
+ private final Cache<ExecutionJobVertex, OperatorBackPressureStats> operatorStatsCache;
+
+ /** Pending in progress stats. Important: Job vertex IDs need to be scoped
+ * by job ID, because they are potentially constant across runs messing up
+ * the cached data.*/
+ private final Set<ExecutionJobVertex> pendingStats = new HashSet<>();
+
+ /** Cleanup interval for completed stats cache. */
+ private final int cleanUpInterval;
+
+ private final int numSamples;
+
+ private final FiniteDuration delayBetweenSamples;
+
+ /** Flag indicating whether the stats tracker has been shut down. */
+ private boolean shutDown;
+
+ /**
+ * Creates a back pressure statistics tracker.
+ *
+ * @param cleanUpInterval Clean up interval for completed stats.
+ * @param numSamples Number of stack trace samples when determining back pressure.
+ * @param delayBetweenSamples Delay between samples when determining back pressure.
+ */
+ public BackPressureStatsTracker(
+ StackTraceSampleCoordinator coordinator,
+ int cleanUpInterval,
+ int numSamples,
+ FiniteDuration delayBetweenSamples) {
+
+ this.coordinator = checkNotNull(coordinator, "Stack trace sample coordinator");
+
+ checkArgument(cleanUpInterval >= 0, "Clean up interval");
+ this.cleanUpInterval = cleanUpInterval;
+
+ checkArgument(numSamples >= 1, "Number of samples");
+ this.numSamples = numSamples;
+
+ this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay between samples");
+
+ this.operatorStatsCache = CacheBuilder.newBuilder()
+ .concurrencyLevel(1)
+ .expireAfterAccess(cleanUpInterval, TimeUnit.MILLISECONDS)
+ .build();
+ }
+
+ /** Cleanup interval for completed stats cache. */
+ public long getCleanUpInterval() {
+ return cleanUpInterval;
+ }
+
+ /**
+ * Returns back pressure statistics for a operator.
+ *
+ * @param vertex Operator to get the stats for.
+ *
+ * @return Back pressure statistics for an operator
+ */
+ public Option<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex vertex) {
+ return Option.apply(operatorStatsCache.getIfPresent(vertex));
+ }
+
+ /**
+ * Triggers a stack trace sample for a operator to gather the back pressure
+ * statistics. If there is a sample in progress for the operator, the call
+ * is ignored.
+ *
+ * @param vertex Operator to get the stats for.
+ */
+ @SuppressWarnings("unchecked")
+ public void triggerStackTraceSample(ExecutionJobVertex vertex) {
+ synchronized (lock) {
+ if (shutDown) {
+ return;
+ }
+
+ if (!pendingStats.contains(vertex)) {
+ pendingStats.add(vertex);
+
+ Future<StackTraceSample> sample = coordinator.triggerStackTraceSample(
+ vertex.getTaskVertices(),
+ numSamples,
+ delayBetweenSamples,
+ MAX_STACK_TRACE_DEPTH);
+
+ sample.onComplete(new StackTraceSampleCompletionCallback(
+ vertex), vertex.getGraph().getExecutionContext());
+ }
+ }
+ }
+
+ /**
+ * Cleans up the operator stats cache if it contains timed out entries.
+ *
+ * <p>The Guava cache only evicts as maintenance during normal operations.
+ * If this handler is inactive, it will never be cleaned.
+ */
+ public void cleanUpOperatorStatsCache() {
+ operatorStatsCache.cleanUp();
+ }
+
+ /**
+ * Shuts down the stats tracker.
+ *
+ * <p>Invalidates the cache and clears all pending stats.
+ */
+ public void shutDown() {
+ synchronized (lock) {
+ if (!shutDown) {
+ operatorStatsCache.invalidateAll();
+ pendingStats.clear();
+
+ shutDown = true;
+ }
+ }
+ }
+
+ /**
+ * Invalidates the cache (irrespective of clean up interval).
+ */
+ void invalidateOperatorStatsCache() {
+ operatorStatsCache.invalidateAll();
+ }
+
+ /**
+ * Callback on completed stack trace sample.
+ */
+ class StackTraceSampleCompletionCallback extends OnComplete<StackTraceSample> {
+
+ private final ExecutionJobVertex vertex;
+
+ public StackTraceSampleCompletionCallback(ExecutionJobVertex vertex) {
+ this.vertex = vertex;
+ }
+
+ @Override
+ public void onComplete(Throwable failure, StackTraceSample success) throws Throwable {
+ synchronized (lock) {
+ try {
+ if (shutDown) {
+ return;
+ }
+
+ if (success != null) {
+ OperatorBackPressureStats stats = createStatsFromSample(success);
+ operatorStatsCache.put(vertex, stats);
+ } else {
+ LOG.error("Failed to gather stack trace sample.", failure);
+ }
+ } catch (Throwable t) {
+ LOG.error("Error during stats completion.", t);
+ } finally {
+ pendingStats.remove(vertex);
+ }
+ }
+ }
+
+ /**
+ * Creates the back pressure stats from a stack trace sample.
+ *
+ * @param sample Stack trace sample to base stats on.
+ *
+ * @return Back pressure stats
+ */
+ private OperatorBackPressureStats createStatsFromSample(StackTraceSample sample) {
+ Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = sample.getStackTraces();
+
+ // Map task ID to subtask index, because the web interface expects
+ // it like that.
+ Map<ExecutionAttemptID, Integer> subtaskIndexMap = Maps
+ .newHashMapWithExpectedSize(traces.size());
+
+ Set<ExecutionAttemptID> sampledTasks = sample.getStackTraces().keySet();
+
+ for (ExecutionVertex task : vertex.getTaskVertices()) {
+ ExecutionAttemptID taskId = task.getCurrentExecutionAttempt().getAttemptId();
+ if (sampledTasks.contains(taskId)) {
+ subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex());
+ } else {
+ throw new RuntimeException("Outdated sample. A task, which is part of the " +
+ "sample has been reset.");
+ }
+ }
+
+ // Ratio of blocked samples to total samples per sub task. Array
+ // position corresponds to sub task index.
+ double[] backPressureRatio = new double[traces.size()];
+
+ for (Entry<ExecutionAttemptID, List<StackTraceElement[]>> entry : traces.entrySet()) {
+ int backPressureSamples = 0;
+
+ List<StackTraceElement[]> taskTraces = entry.getValue();
+
+ for (StackTraceElement[] trace : taskTraces) {
+ for (int i = trace.length - 1; i >= 0; i--) {
+ StackTraceElement elem = trace[i];
+
+ if (elem.getClassName().equals(EXPECTED_CLASS_NAME) &&
+ elem.getMethodName().equals(EXPECTED_METHOD_NAME)) {
+
+ backPressureSamples++;
+ break; // Continue with next stack trace
+ }
+ }
+ }
+
+ int subtaskIndex = subtaskIndexMap.get(entry.getKey());
+
+ int size = taskTraces.size();
+ double ratio = (size > 0)
+ ? ((double) backPressureSamples) / size
+ : 0;
+
+ backPressureRatio[subtaskIndex] = ratio;
+ }
+
+ return new OperatorBackPressureStats(
+ sample.getSampleId(),
+ sample.getEndTime(),
+ backPressureRatio);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java
new file mode 100644
index 0000000..cb262e3
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import java.util.Arrays;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Back pressure statistics of multiple tasks.
+ *
+ * <p>Statistics are gathered by sampling stack traces of running tasks. The
+ * back pressure ratio denotes the ratio of traces indicating back pressure
+ * to the total number of sampled traces.
+ */
+public class OperatorBackPressureStats {
+
+ /** ID of the corresponding sample. */
+ private final int sampleId;
+
+ /** End time stamp of the corresponding sample. */
+ private final long endTimestamp;
+
+ /** Back pressure ratio per subtask. */
+ private final double[] subTaskBackPressureRatio;
+
+ /** Maximum back pressure ratio. */
+ private final double maxSubTaskBackPressureRatio;
+
+ public OperatorBackPressureStats(
+ int sampleId,
+ long endTimestamp,
+ double[] subTaskBackPressureRatio) {
+
+ this.sampleId = sampleId;
+ this.endTimestamp = endTimestamp;
+ this.subTaskBackPressureRatio = checkNotNull(subTaskBackPressureRatio, "Sub task back pressure ratio");
+ checkArgument(subTaskBackPressureRatio.length >= 1, "No Sub task back pressure ratio specified");
+
+ double max = 0;
+ for (double ratio : subTaskBackPressureRatio) {
+ if (ratio > max) {
+ max = ratio;
+ }
+ }
+
+ maxSubTaskBackPressureRatio = max;
+ }
+
+ /**
+ * Returns the ID of the sample.
+ *
+ * @return ID of the sample
+ */
+ public int getSampleId() {
+ return sampleId;
+ }
+
+ /**
+ * Returns the time stamp, when all stack traces were collected at the
+ * JobManager.
+ *
+ * @return Time stamp, when all stack traces were collected at the
+ * JobManager
+ */
+ public long getEndTimestamp() {
+ return endTimestamp;
+ }
+
+ /**
+ * Returns the number of sub tasks.
+ *
+ * @return Number of sub tasks.
+ */
+ public int getNumberOfSubTasks() {
+ return subTaskBackPressureRatio.length;
+ }
+
+ /**
+ * Returns the ratio of stack traces indicating back pressure to total
+ * number of sampled stack traces.
+ *
+ * @param index Subtask index.
+ *
+ * @return Ratio of stack traces indicating back pressure to total number
+ * of sampled stack traces.
+ */
+ public double getBackPressureRatio(int index) {
+ return subTaskBackPressureRatio[index];
+ }
+
+ /**
+ * Returns the maximum back pressure ratio of all sub tasks.
+ *
+ * @return Maximum back pressure ratio of all sub tasks.
+ */
+ public double getMaxBackPressureRatio() {
+ return maxSubTaskBackPressureRatio;
+ }
+
+ @Override
+ public String toString() {
+ return "OperatorBackPressureStats{" +
+ "sampleId=" + sampleId +
+ ", endTimestamp=" + endTimestamp +
+ ", subTaskBackPressureRatio=" + Arrays.toString(subTaskBackPressureRatio) +
+ '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java
new file mode 100644
index 0000000..e861265
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * A sample of stack traces for one or more tasks.
+ *
+ * <p>The sampling is triggered in {@link StackTraceSampleCoordinator}.
+ */
+public class StackTraceSample {
+
+ /** ID of this sample (unique per job) */
+ private final int sampleId;
+
+ /** Time stamp, when the sample was triggered. */
+ private final long startTime;
+
+ /** Time stamp, when all stack traces were collected at the JobManager. */
+ private final long endTime;
+
+ /** Map of stack traces by execution ID. */
+ private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
+
+ /**
+ * Creates a stack trace sample
+ *
+ * @param sampleId ID of the sample.
+ * @param startTime Time stamp, when the sample was triggered.
+ * @param endTime Time stamp, when all stack traces were
+ * collected at the JobManager.
+ * @param stackTracesByTask Map of stack traces by execution ID.
+ */
+ public StackTraceSample(
+ int sampleId,
+ long startTime,
+ long endTime,
+ Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask) {
+
+ checkArgument(sampleId >= 0, "Negative sample ID");
+ checkArgument(startTime >= 0, "Negative start time");
+ checkArgument(endTime >= startTime, "End time before start time");
+
+ this.sampleId = sampleId;
+ this.startTime = startTime;
+ this.endTime = endTime;
+ this.stackTracesByTask = Collections.unmodifiableMap(stackTracesByTask);
+ }
+
+ /**
+ * Returns the ID of the sample.
+ *
+ * @return ID of the sample
+ */
+ public int getSampleId() {
+ return sampleId;
+ }
+
+ /**
+ * Returns the time stamp, when the sample was triggered.
+ *
+ * @return Time stamp, when the sample was triggered
+ */
+ public long getStartTime() {
+ return startTime;
+ }
+
+ /**
+ * Returns the time stamp, when all stack traces were collected at the
+ * JobManager.
+ *
+ * @return Time stamp, when all stack traces were collected at the
+ * JobManager
+ */
+ public long getEndTime() {
+ return endTime;
+ }
+
+ /**
+ * Returns the a map of stack traces by execution ID.
+ *
+ * @return Map of stack traces by execution ID
+ */
+ public Map<ExecutionAttemptID, List<StackTraceElement[]>> getStackTraces() {
+ return stackTracesByTask;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
new file mode 100644
index 0000000..e7b292f
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.google.common.collect.Maps;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleFailure;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A coordinator for triggering and collecting stack traces of running tasks.
+ */
+public class StackTraceSampleCoordinator {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StackTraceSampleCoordinator.class);
+
+ private static final int NUM_GHOST_SAMPLE_IDS = 10;
+
+ private final Object lock = new Object();
+
+ /** Actor for responses. */
+ private final ActorGateway responseActor;
+
+ /** Time out after the expected sampling duration. */
+ private final int sampleTimeout;
+
+ /** In progress samples (guarded by lock). */
+ private final Map<Integer, PendingStackTraceSample> pendingSamples = new HashMap<>();
+
+ /** A list of recent sample IDs to identify late messages vs. invalid ones. */
+ private final ArrayDeque<Integer> recentPendingSamples = new ArrayDeque<>(NUM_GHOST_SAMPLE_IDS);
+
+ /** Sample ID counter (guarded by lock). */
+ private int sampleIdCounter;
+
+ /**
+ * Timer to discard expired in progress samples. Lazily initiated as the
+ * sample coordinator will not be used very often (guarded by lock).
+ */
+ private Timer timer;
+
+ /**
+ * Flag indicating whether the coordinator is still running (guarded by
+ * lock).
+ */
+ private boolean isShutDown;
+
+ /**
+ * Creates a new coordinator for the job.
+ *
+ * @param sampleTimeout Time out after the expected sampling duration.
+ * This is added to the expected duration of a
+ * sample, which is determined by the number of
+ * samples and the delay between each sample.
+ */
+ public StackTraceSampleCoordinator(ActorSystem actorSystem, int sampleTimeout) {
+ Props props = Props.create(StackTraceSampleCoordinatorActor.class, this);
+ this.responseActor = new AkkaActorGateway(actorSystem.actorOf(props), null);
+
+ checkArgument(sampleTimeout >= 0);
+ this.sampleTimeout = sampleTimeout;
+ }
+
+ /**
+ * Triggers a stack trace sample to all tasks.
+ *
+ * @param tasksToSample Tasks to sample.
+ * @param numSamples Number of stack trace samples to collect.
+ * @param delayBetweenSamples Delay between consecutive samples.
+ * @param maxStackTraceDepth Maximum depth of the stack trace. 0 indicates
+ * no maximum and keeps the complete stack trace.
+ * @return A future of the completed stack trace sample
+ */
+ @SuppressWarnings("unchecked")
+ public Future<StackTraceSample> triggerStackTraceSample(
+ ExecutionVertex[] tasksToSample,
+ int numSamples,
+ FiniteDuration delayBetweenSamples,
+ int maxStackTraceDepth) {
+
+ checkNotNull(tasksToSample, "Tasks to sample");
+ checkArgument(tasksToSample.length >= 1, "No tasks to sample");
+ checkArgument(numSamples >= 1, "No number of samples");
+ checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace depth");
+
+ // Execution IDs of running tasks
+ ExecutionAttemptID[] triggerIds = new ExecutionAttemptID[tasksToSample.length];
+
+ // Check that all tasks are RUNNING before triggering anything. The
+ // triggering can still fail.
+ for (int i = 0; i < triggerIds.length; i++) {
+ Execution execution = tasksToSample[i].getCurrentExecutionAttempt();
+ if (execution != null && execution.getState() == ExecutionState.RUNNING) {
+ triggerIds[i] = execution.getAttemptId();
+ } else {
+ Promise failedPromise = new scala.concurrent.impl.Promise.DefaultPromise<>()
+ .failure(new IllegalStateException("Task " + tasksToSample[i]
+ .getTaskNameWithSubtaskIndex() + " is not running."));
+ return failedPromise.future();
+ }
+ }
+
+ synchronized (lock) {
+ if (isShutDown) {
+ Promise failedPromise = new scala.concurrent.impl.Promise.DefaultPromise<>()
+ .failure(new IllegalStateException("Shut down"));
+ return failedPromise.future();
+ }
+
+ if (timer == null) {
+ timer = new Timer("Stack trace sample coordinator timer");
+ }
+
+ int sampleId = sampleIdCounter++;
+
+ LOG.debug("Triggering stack trace sample {}", sampleId);
+
+ final PendingStackTraceSample pending = new PendingStackTraceSample(
+ sampleId, triggerIds);
+
+ // Discard the sample if it takes too long. We don't send cancel
+ // messages to the task managers, but only wait for the responses
+ // and then ignore them.
+ long expectedDuration = numSamples * delayBetweenSamples.toMillis();
+ long discardDelay = expectedDuration + sampleTimeout;
+
+ TimerTask discardTask = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ synchronized (lock) {
+ if (!pending.isDiscarded()) {
+ LOG.info("Sample {} expired before completing",
+ pending.getSampleId());
+
+ pending.discard(new RuntimeException("Time out"));
+ pendingSamples.remove(pending.getSampleId());
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error("Exception while handling sample timeout", t);
+ }
+ }
+ };
+
+ // Add the pending sample before scheduling the discard task to
+ // prevent races with removing it again.
+ pendingSamples.put(sampleId, pending);
+
+ timer.schedule(discardTask, discardDelay);
+
+ boolean success = true;
+ try {
+ // Trigger all samples
+ for (int i = 0; i < tasksToSample.length; i++) {
+ TriggerStackTraceSample msg = new TriggerStackTraceSample(
+ sampleId,
+ triggerIds[i],
+ numSamples,
+ delayBetweenSamples,
+ maxStackTraceDepth);
+
+ if (!tasksToSample[i].sendMessageToCurrentExecution(
+ msg,
+ triggerIds[i],
+ responseActor)) {
+ success = false;
+ break;
+ }
+ }
+
+ return pending.getStackTraceSampleFuture();
+ } finally {
+ if (!success) {
+ pending.discard(new RuntimeException("Failed to trigger sample, " +
+ "because task has been reset."));
+ pendingSamples.remove(sampleId);
+ rememberRecentSampleId(sampleId);
+ }
+ }
+ }
+ }
+
+ /**
+ * Cancels a pending sample.
+ *
+ * @param sampleId ID of the sample to cancel.
+ * @param cause Cause of the cancelling (can be <code>null</code>).
+ */
+ public void cancelStackTraceSample(int sampleId, Exception cause) {
+ synchronized (lock) {
+ if (isShutDown) {
+ return;
+ }
+
+ PendingStackTraceSample sample = pendingSamples.remove(sampleId);
+ if (sample != null) {
+ if (cause != null) {
+ LOG.info("Cancelling sample " + sampleId, cause);
+ } else {
+ LOG.info("Cancelling sample {}", sampleId);
+ }
+
+ sample.discard(cause);
+ rememberRecentSampleId(sampleId);
+ }
+ }
+ }
+
+ /**
+ * Shuts down the coordinator.
+ *
+ * <p>After shut down, no further operations are executed.
+ */
+ public void shutDown() {
+ synchronized (lock) {
+ if (!isShutDown) {
+ LOG.info("Shutting down stack trace sample coordinator.");
+
+ for (PendingStackTraceSample pending : pendingSamples.values()) {
+ pending.discard(new RuntimeException("Shut down"));
+ }
+
+ pendingSamples.clear();
+
+ if (timer != null) {
+ timer.cancel();
+ }
+
+ isShutDown = true;
+ }
+ }
+ }
+
+ /**
+ * Collects stack traces of a task.
+ *
+ * @param sampleId ID of the sample.
+ * @param executionId ID of the sampled task.
+ * @param stackTraces Stack traces of the sampled task.
+ *
+ * @throws IllegalStateException If unknown sample ID and not recently
+ * finished or cancelled sample.
+ */
+ public void collectStackTraces(
+ int sampleId,
+ ExecutionAttemptID executionId,
+ List<StackTraceElement[]> stackTraces) {
+
+ synchronized (lock) {
+ if (isShutDown) {
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Collecting stack trace sample {} of task {}", sampleId, executionId);
+ }
+
+ PendingStackTraceSample pending = pendingSamples.get(sampleId);
+
+ if (pending != null) {
+ pending.collectStackTraces(executionId, stackTraces);
+
+ // Publish the sample
+ if (pending.isComplete()) {
+ pendingSamples.remove(sampleId);
+ rememberRecentSampleId(sampleId);
+
+ pending.completePromiseAndDiscard();
+ }
+ } else if (recentPendingSamples.contains(sampleId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received late stack trace sample {} of task {}",
+ sampleId, executionId);
+ }
+ } else {
+ throw new IllegalStateException("Unknown sample ID " + sampleId);
+ }
+ }
+ }
+
+ private void rememberRecentSampleId(int sampleId) {
+ if (recentPendingSamples.size() >= NUM_GHOST_SAMPLE_IDS) {
+ recentPendingSamples.removeFirst();
+ }
+ recentPendingSamples.addLast(sampleId);
+ }
+
+ int getNumberOfPendingSamples() {
+ synchronized (lock) {
+ return pendingSamples.size();
+ }
+ }
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * A pending stack trace sample, which collects stack traces and owns a
+ * {@link StackTraceSample} promise.
+ *
+ * <p>Access pending sample in lock scope.
+ */
+ private static class PendingStackTraceSample {
+
+ private final int sampleId;
+ private final long startTime;
+ private final Set<ExecutionAttemptID> pendingTasks;
+ private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
+ private final Promise<StackTraceSample> stackTracePromise;
+
+ private boolean isDiscarded;
+
+ PendingStackTraceSample(
+ int sampleId,
+ ExecutionAttemptID[] tasksToCollect) {
+
+ this.sampleId = sampleId;
+ this.startTime = System.currentTimeMillis();
+ this.pendingTasks = new HashSet<>(Arrays.asList(tasksToCollect));
+ this.stackTracesByTask = Maps.newHashMapWithExpectedSize(tasksToCollect.length);
+ this.stackTracePromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+ }
+
+ int getSampleId() {
+ return sampleId;
+ }
+
+ long getStartTime() {
+ return startTime;
+ }
+
+ boolean isDiscarded() {
+ return isDiscarded;
+ }
+
+ boolean isComplete() {
+ if (isDiscarded) {
+ throw new IllegalStateException("Discarded");
+ }
+
+ return pendingTasks.isEmpty();
+ }
+
+ void discard(Throwable cause) {
+ if (!isDiscarded) {
+ pendingTasks.clear();
+ stackTracesByTask.clear();
+
+ stackTracePromise.failure(new RuntimeException("Discarded", cause));
+
+ isDiscarded = true;
+ }
+ }
+
+ void collectStackTraces(ExecutionAttemptID executionId, List<StackTraceElement[]> stackTraces) {
+ if (isDiscarded) {
+ throw new IllegalStateException("Discarded");
+ }
+
+ if (pendingTasks.remove(executionId)) {
+ stackTracesByTask.put(executionId, Collections.unmodifiableList(stackTraces));
+ } else if (isComplete()) {
+ throw new IllegalStateException("Completed");
+ } else {
+ throw new IllegalArgumentException("Unknown task " + executionId);
+ }
+ }
+
+ void completePromiseAndDiscard() {
+ if (isComplete()) {
+ isDiscarded = true;
+
+ long endTime = System.currentTimeMillis();
+
+ StackTraceSample stackTraceSample = new StackTraceSample(
+ sampleId,
+ startTime,
+ endTime,
+ stackTracesByTask);
+
+ stackTracePromise.success(stackTraceSample);
+ } else {
+ throw new IllegalStateException("Not completed yet");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ Future<StackTraceSample> getStackTraceSampleFuture() {
+ return stackTracePromise.future();
+ }
+ }
+
+ /**
+ * Actor for stack trace sample responses.
+ */
+ private static class StackTraceSampleCoordinatorActor extends FlinkUntypedActor {
+
+ StackTraceSampleCoordinator coordinator;
+
+ public StackTraceSampleCoordinatorActor(StackTraceSampleCoordinator coordinator) {
+ this.coordinator = checkNotNull(coordinator, "Stack trace sample coordinator");
+ }
+
+ @Override
+ protected void handleMessage(Object msg) throws Exception {
+ try {
+ if (msg instanceof ResponseStackTraceSampleSuccess) {
+ ResponseStackTraceSampleSuccess success = (ResponseStackTraceSampleSuccess) msg;
+
+ coordinator.collectStackTraces(
+ success.sampleId(),
+ success.executionId(),
+ success.samples());
+ } else if (msg instanceof ResponseStackTraceSampleFailure) {
+ ResponseStackTraceSampleFailure failure = (ResponseStackTraceSampleFailure) msg;
+
+ coordinator.cancelStackTraceSample(failure.sampleId(), failure.cause());
+ } else {
+ throw new IllegalArgumentException("Unexpected task sample message");
+ }
+ } catch (Throwable t) {
+ LOG.error("Error responding to message '" + msg + "': " + t.getMessage() + ".", t);
+ }
+ }
+
+ @Override
+ protected UUID getLeaderSessionID() {
+ return null;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 3523091..08ed2f9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.webmonitor;
import akka.actor.ActorSystem;
-
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
@@ -29,14 +28,17 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.router.Handler;
import io.netty.handler.codec.http.router.Router;
-
import org.apache.commons.io.FileUtils;
-
+import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
+import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.ConstantTextHandler;
+import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarAccessDeniedHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarListHandler;
@@ -46,29 +48,24 @@ import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobCheckpointsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
-import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexCheckpointsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
@@ -121,8 +118,11 @@ public class WebRuntimeMonitor implements WebMonitor {
private final File uploadDir;
- private AtomicBoolean cleanedUp = new AtomicBoolean();
+ private final StackTraceSampleCoordinator stackTraceSamples;
+ private final BackPressureStatsTracker backPressureStatsTracker;
+
+ private AtomicBoolean cleanedUp = new AtomicBoolean();
public WebRuntimeMonitor(
Configuration config,
@@ -163,6 +163,34 @@ public class WebRuntimeMonitor implements WebMonitor {
ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder();
+ // - Back pressure stats ----------------------------------------------
+
+ stackTraceSamples = new StackTraceSampleCoordinator(actorSystem, 60000);
+
+ // Back pressure stats tracker config
+ int cleanUpInterval = config.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL);
+
+ int refreshInterval = config.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL);
+
+ int numSamples = config.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES);
+
+ int delay = config.getInteger(
+ ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_DELAY,
+ ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY);
+
+ FiniteDuration delayBetweenSamples = new FiniteDuration(delay, TimeUnit.MILLISECONDS);
+
+ backPressureStatsTracker = new BackPressureStatsTracker(
+ stackTraceSamples, cleanUpInterval, numSamples, delayBetweenSamples);
+
+ // --------------------------------------------------------------------
+
router = new Router()
// config how to interact with this web server
.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
@@ -187,7 +215,10 @@ public class WebRuntimeMonitor implements WebMonitor {
.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/checkpoints", handler(new JobVertexCheckpointsHandler(currentGraphs)))
-
+ .GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler(
+ currentGraphs,
+ backPressureStatsTracker,
+ refreshInterval)))
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs)))
.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs)))
@@ -298,6 +329,23 @@ public class WebRuntimeMonitor implements WebMonitor {
synchronized (startupShutdownLock) {
jobManagerAddressPromise.success(jobManagerAkkaUrl);
leaderRetrievalService.start(retriever);
+
+ long delay = backPressureStatsTracker.getCleanUpInterval();
+
+ // Scheduled back pressure stats tracker cache cleanup. We schedule
+ // this here repeatedly, because cache clean up only happens on
+ // interactions with the cache. We need it to make sure that we
+ // don't leak memory after completed jobs or long ago accessed stats.
+ bootstrap.childGroup().scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ backPressureStatsTracker.cleanUpOperatorStatsCache();
+ } catch (Throwable t) {
+ LOG.error("Error during back pressure stats cache cleanup.", t);
+ }
+ }
+ }, delay, delay, TimeUnit.MILLISECONDS);
}
}
@@ -316,6 +364,10 @@ public class WebRuntimeMonitor implements WebMonitor {
}
}
+ stackTraceSamples.shutDown();
+
+ backPressureStatsTracker.shutDown();
+
cleanup();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
new file mode 100644
index 0000000..3ce6f02
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
+import scala.Option;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Request handler that returns back pressure stats for a single job vertex and
+ * all its sub tasks.
+ */
+public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandler {
+
+ /** Back pressure stats tracker. */
+ private final BackPressureStatsTracker backPressureStatsTracker;
+
+ /** Time after which stats are considered outdated. */
+ private final int refreshInterval;
+
+ public JobVertexBackPressureHandler(
+ ExecutionGraphHolder executionGraphHolder,
+ BackPressureStatsTracker backPressureStatsTracker,
+ int refreshInterval) {
+
+ super(executionGraphHolder);
+ this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker, "Stats tracker");
+ checkArgument(refreshInterval >= 0, "Negative timeout");
+ this.refreshInterval = refreshInterval;
+ }
+
+ @Override
+ public String handleRequest(
+ ExecutionJobVertex jobVertex,
+ Map<String, String> params) throws Exception {
+
+ try (StringWriter writer = new StringWriter();
+ JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer)) {
+
+ gen.writeStartObject();
+
+ Option<OperatorBackPressureStats> statsOption = backPressureStatsTracker
+ .getOperatorBackPressureStats(jobVertex);
+
+ if (statsOption.isDefined()) {
+ OperatorBackPressureStats stats = statsOption.get();
+
+ // Check whether we need to refresh
+ if (refreshInterval <= System.currentTimeMillis() - stats.getEndTimestamp()) {
+ backPressureStatsTracker.triggerStackTraceSample(jobVertex);
+ gen.writeStringField("status", "deprecated");
+ } else {
+ gen.writeStringField("status", "ok");
+ }
+
+ gen.writeStringField("backpressure-level", getBackPressureLevel(stats.getMaxBackPressureRatio()));
+ gen.writeNumberField("end-timestamp", stats.getEndTimestamp());
+
+ // Sub tasks
+ gen.writeArrayFieldStart("subtasks");
+ int numSubTasks = stats.getNumberOfSubTasks();
+ for (int i = 0; i < numSubTasks; i++) {
+ double ratio = stats.getBackPressureRatio(i);
+
+ gen.writeStartObject();
+ gen.writeNumberField("subtask", i);
+ gen.writeStringField("backpressure-level", getBackPressureLevel(ratio));
+ gen.writeNumberField("ratio", ratio);
+ gen.writeEndObject();
+ }
+ gen.writeEndArray();
+ } else {
+ backPressureStatsTracker.triggerStackTraceSample(jobVertex);
+ gen.writeStringField("status", "deprecated");
+ }
+
+ gen.writeEndObject();
+ gen.close();
+
+ return writer.toString();
+ }
+ }
+
+ /**
+ * Returns the back pressure level as a String.
+ *
+ * @param backPressureRatio Ratio of back pressures samples to total number of samples.
+ *
+ * @return Back pressure level ('no', 'low', or 'high')
+ */
+ static String getBackPressureLevel(double backPressureRatio) {
+ if (backPressureRatio <= 0.10) {
+ return "ok";
+ } else if (backPressureRatio <= 0.5) {
+ return "low";
+ } else {
+ return "high";
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
new file mode 100644
index 0000000..52b0794
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Simple back pressured task test.
+ */
+public class BackPressureStatsTrackerITCase extends TestLogger {
+
+ private static NetworkBufferPool networkBufferPool;
+ private static ActorSystem testActorSystem;
+
+ /** Shared as static variable with the test task. */
+ private static BufferPool testBufferPool;
+
+ @BeforeClass
+ public static void setup() {
+ testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+ networkBufferPool = new NetworkBufferPool(100, 8192, MemoryType.HEAP);
+ }
+
+ @AfterClass
+ public static void teardown() {
+ JavaTestKit.shutdownActorSystem(testActorSystem);
+ networkBufferPool.destroy();
+ }
+
+ /**
+ * Tests a simple fake-back pressured task. Back pressure is assumed when
+ * sampled stack traces are in blocking buffer requests.
+ */
+ @Test
+ public void testBackPressuredProducer() throws Exception {
+ new JavaTestKit(testActorSystem) {{
+ final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS);
+
+ // The JobGraph
+ final JobGraph jobGraph = new JobGraph();
+ final int parallelism = 4;
+
+ final JobVertex task = new JobVertex("Task");
+ task.setInvokableClass(BackPressuredTask.class);
+ task.setParallelism(parallelism);
+
+ jobGraph.addVertex(task);
+
+ ActorGateway jobManger = null;
+ ActorGateway taskManager = null;
+
+ //
+ // 1) Consume all buffers at first (no buffers for the test task)
+ //
+ testBufferPool = networkBufferPool.createBufferPool(1, false);
+ final List<Buffer> buffers = new ArrayList<>();
+ while (true) {
+ Buffer buffer = testBufferPool.requestBuffer();
+ if (buffer != null) {
+ buffers.add(buffer);
+ } else {
+ break;
+ }
+ }
+
+ try {
+ jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration());
+
+ Configuration config = new Configuration();
+ config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+
+ taskManager = TestingUtils.createTaskManager(
+ testActorSystem, jobManger, config, true, true);
+
+ final ActorGateway jm = jobManger;
+
+ new Within(deadline) {
+ @Override
+ protected void run() {
+ try {
+ ActorGateway testActor = new AkkaActorGateway(getTestActor(), null);
+
+ // Submit the job and wait until it is running
+ JobClient.submitJobDetached(
+ jm,
+ jobGraph,
+ deadline,
+ ClassLoader.getSystemClassLoader());
+
+ jm.tell(new WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);
+
+ expectMsgEquals(new AllVerticesRunning(jobGraph.getJobID()));
+
+ // Get the ExecutionGraph
+ jm.tell(new RequestExecutionGraph(jobGraph.getJobID()), testActor);
+
+ ExecutionGraphFound executionGraphResponse =
+ expectMsgClass(ExecutionGraphFound.class);
+
+ ExecutionGraph executionGraph = executionGraphResponse.executionGraph();
+ ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
+
+ StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
+ testActorSystem, 60000);
+
+ // Verify back pressure (clean up interval can be ignored)
+ BackPressureStatsTracker statsTracker = new BackPressureStatsTracker(
+ coordinator,
+ 100 * 1000,
+ 20,
+ new FiniteDuration(10, TimeUnit.MILLISECONDS));
+
+ int numAttempts = 10;
+
+ int nextSampleId = 0;
+
+ // Verify that all tasks are back pressured. This
+ // can fail if the task takes longer to request
+ // the buffer.
+ for (int attempt = 0; attempt < numAttempts; attempt++) {
+ try {
+ OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
+
+ assertEquals(nextSampleId + attempt, stats.getSampleId());
+ assertEquals(parallelism, stats.getNumberOfSubTasks());
+ assertEquals(1.0, stats.getMaxBackPressureRatio(), 0.0);
+
+ for (int i = 0; i < parallelism; i++) {
+ assertEquals(1.0, stats.getBackPressureRatio(i), 0.0);
+ }
+
+ nextSampleId = stats.getSampleId() + 1;
+
+ break;
+ } catch (Throwable t) {
+ if (attempt == numAttempts - 1) {
+ throw t;
+ } else {
+ Thread.sleep(500);
+ }
+ }
+ }
+
+ //
+ // 2) Release all buffers and let the tasks grab one
+ //
+ for (Buffer buf : buffers) {
+ buf.recycle();
+ }
+
+ // Wait for all buffers to be available. The tasks
+ // grab them and then immediately release them.
+ while (testBufferPool.getNumberOfAvailableMemorySegments() < 100) {
+ Thread.sleep(100);
+ }
+
+ // Verify that no task is back pressured any more.
+ for (int attempt = 0; attempt < numAttempts; attempt++) {
+ try {
+ OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
+
+ assertEquals(nextSampleId + attempt, stats.getSampleId());
+ assertEquals(parallelism, stats.getNumberOfSubTasks());
+
+ // Verify that no task is back pressured
+ for (int i = 0; i < parallelism; i++) {
+ assertEquals(0.0, stats.getBackPressureRatio(i), 0.0);
+ }
+
+ break;
+ } catch (Throwable t) {
+ if (attempt == numAttempts - 1) {
+ throw t;
+ } else {
+ Thread.sleep(500);
+ }
+ }
+ }
+
+ // Shut down
+ jm.tell(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), testActor);
+
+ // Cancel job
+ jm.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
+
+ // Response to removal notification
+ expectMsgEquals(true);
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+ };
+ } finally {
+ TestingUtils.stopActor(jobManger);
+ TestingUtils.stopActor(taskManager);
+
+ for (Buffer buf : buffers) {
+ buf.recycle();
+ }
+
+ testBufferPool.lazyDestroy();
+ }
+ }};
+ }
+
+ /**
+ * Triggers a new stats sample.
+ */
+ private OperatorBackPressureStats triggerStatsSample(
+ BackPressureStatsTracker statsTracker,
+ ExecutionJobVertex vertex) throws InterruptedException {
+
+ statsTracker.invalidateOperatorStatsCache();
+ statsTracker.triggerStackTraceSample(vertex);
+
+ // Sleep minimum duration
+ Thread.sleep(20 * 10);
+
+ Option<OperatorBackPressureStats> stats;
+
+ // Get the stats
+ while ((stats = statsTracker.getOperatorBackPressureStats(vertex)).isEmpty()) {
+ Thread.sleep(10);
+ }
+
+ return stats.get();
+ }
+
+ /**
+ * A back pressured producer sharing a {@link BufferPool} with the
+ * test driver.
+ */
+ public static class BackPressuredTask extends AbstractInvokable {
+
+ @Override
+ public void invoke() throws Exception {
+ while (true) {
+ Buffer buffer = testBufferPool.requestBufferBlocking();
+ // Got a buffer, yay!
+ buffer.recycle();
+
+ new CountDownLatch(1).await();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
new file mode 100644
index 0000000..b0955e1
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.impl.Promise;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class BackPressureStatsTrackerTest {
+
+ /** Tests simple statistics with fake stack traces. */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testTriggerStackTraceSample() throws Exception {
+ Promise<StackTraceSample> samplePromise = new Promise.DefaultPromise<>();
+
+ StackTraceSampleCoordinator sampleCoordinator = mock(StackTraceSampleCoordinator.class);
+ when(sampleCoordinator.triggerStackTraceSample(
+ any(ExecutionVertex[].class),
+ anyInt(),
+ any(FiniteDuration.class),
+ anyInt())).thenReturn(samplePromise.future());
+
+ ExecutionGraph graph = mock(ExecutionGraph.class);
+
+ // Same Thread execution context
+ when(graph.getExecutionContext()).thenReturn(new ExecutionContext() {
+
+ @Override
+ public void execute(Runnable runnable) {
+ runnable.run();
+ }
+
+ @Override
+ public void reportFailure(Throwable t) {
+ fail();
+ }
+
+ @Override
+ public ExecutionContext prepare() {
+ return this;
+ }
+ });
+
+ ExecutionVertex[] taskVertices = new ExecutionVertex[4];
+
+ ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+ when(jobVertex.getJobId()).thenReturn(new JobID());
+ when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
+ when(jobVertex.getGraph()).thenReturn(graph);
+ when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
+
+ taskVertices[0] = mockExecutionVertex(jobVertex, 0);
+ taskVertices[1] = mockExecutionVertex(jobVertex, 1);
+ taskVertices[2] = mockExecutionVertex(jobVertex, 2);
+ taskVertices[3] = mockExecutionVertex(jobVertex, 3);
+
+ int numSamples = 100;
+ FiniteDuration delayBetweenSamples = new FiniteDuration(100, TimeUnit.MILLISECONDS);
+
+ BackPressureStatsTracker tracker = new BackPressureStatsTracker(
+ sampleCoordinator, 9999, numSamples, delayBetweenSamples);
+
+ // Trigger
+ tracker.triggerStackTraceSample(jobVertex);
+
+ verify(sampleCoordinator).triggerStackTraceSample(
+ eq(taskVertices),
+ eq(numSamples),
+ eq(delayBetweenSamples),
+ eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
+
+ // Trigger again for pending request, should not fire
+ tracker.triggerStackTraceSample(jobVertex);
+
+ assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isEmpty());
+
+ verify(sampleCoordinator).triggerStackTraceSample(
+ eq(taskVertices),
+ eq(numSamples),
+ eq(delayBetweenSamples),
+ eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
+
+ assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isEmpty());
+
+ // Complete the future
+ Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new HashMap<>();
+ for (ExecutionVertex vertex : taskVertices) {
+ List<StackTraceElement[]> taskTraces = new ArrayList<>();
+
+ for (int i = 0; i < taskVertices.length; i++) {
+ // Traces until sub task index are back pressured
+ taskTraces.add(createStackTrace(i <= vertex.getParallelSubtaskIndex()));
+ }
+
+ traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces);
+ }
+
+ int sampleId = 1231;
+ int endTime = 841;
+
+ StackTraceSample sample = new StackTraceSample(
+ sampleId,
+ 0,
+ endTime,
+ traces);
+
+ // Succeed the promise
+ samplePromise.success(sample);
+
+ assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isDefined());
+
+ OperatorBackPressureStats stats = tracker.getOperatorBackPressureStats(jobVertex).get();
+
+ // Verify the stats
+ assertEquals(sampleId, stats.getSampleId());
+ assertEquals(endTime, stats.getEndTimestamp());
+ assertEquals(taskVertices.length, stats.getNumberOfSubTasks());
+
+ for (int i = 0; i < taskVertices.length; i++) {
+ double ratio = stats.getBackPressureRatio(i);
+ // Traces until sub task index are back pressured
+ assertEquals((i + 1) / ((double) 4), ratio, 0.0);
+ }
+ }
+
+ private StackTraceElement[] createStackTrace(boolean isBackPressure) {
+ if (isBackPressure) {
+ return new StackTraceElement[] { new StackTraceElement(
+ BackPressureStatsTracker.EXPECTED_CLASS_NAME,
+ BackPressureStatsTracker.EXPECTED_METHOD_NAME,
+ "LocalBufferPool.java",
+ 133) };
+ } else {
+ return Thread.currentThread().getStackTrace();
+ }
+ }
+
+ private ExecutionVertex mockExecutionVertex(
+ ExecutionJobVertex jobVertex,
+ int subTaskIndex) {
+
+ Execution exec = mock(Execution.class);
+ when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
+
+ JobVertexID id = jobVertex.getJobVertexId();
+
+ ExecutionVertex vertex = mock(ExecutionVertex.class);
+ when(vertex.getJobvertexId()).thenReturn(id);
+ when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+ when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex);
+
+ return vertex;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
new file mode 100644
index 0000000..406197c
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for the {@link StackTraceSampleCoordinator}.
+ */
+public class StackTraceSampleCoordinatorTest {
+
+ private static ActorSystem system;
+
+ private StackTraceSampleCoordinator coord;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ system = AkkaUtils.createLocalActorSystem(new Configuration());
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (system != null) {
+ system.shutdown();
+ }
+ }
+
+ @Before
+ public void init() throws Exception {
+ this.coord = new StackTraceSampleCoordinator(system, 60000);
+ }
+
+ /** Tests simple trigger and collect of stack trace samples. */
+ @Test
+ public void testTriggerStackTraceSample() throws Exception {
+ ExecutionVertex[] vertices = new ExecutionVertex[] {
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)
+ };
+
+ int numSamples = 1;
+ FiniteDuration delayBetweenSamples = new FiniteDuration(100, TimeUnit.MILLISECONDS);
+ int maxStackTraceDepth = 0;
+
+ Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+ vertices, numSamples, delayBetweenSamples, maxStackTraceDepth);
+
+ // Verify messages have been sent
+ for (ExecutionVertex vertex : vertices) {
+ ExecutionAttemptID expectedExecutionId = vertex
+ .getCurrentExecutionAttempt().getAttemptId();
+
+ TriggerStackTraceSample expectedMsg = new TriggerStackTraceSample(
+ 0,
+ expectedExecutionId,
+ numSamples,
+ delayBetweenSamples,
+ maxStackTraceDepth);
+
+ verify(vertex).sendMessageToCurrentExecution(
+ eq(expectedMsg), eq(expectedExecutionId), any(AkkaActorGateway.class));
+ }
+
+ assertFalse(sampleFuture.isCompleted());
+
+ StackTraceElement[] stackTraceSample = Thread.currentThread().getStackTrace();
+ List<StackTraceElement[]> traces = new ArrayList<>();
+ traces.add(stackTraceSample);
+ traces.add(stackTraceSample);
+ traces.add(stackTraceSample);
+
+ // Collect stack traces
+ for (int i = 0; i < vertices.length; i++) {
+ ExecutionAttemptID executionId = vertices[i].getCurrentExecutionAttempt().getAttemptId();
+ coord.collectStackTraces(0, executionId, traces);
+
+ if (i == vertices.length - 1) {
+ assertTrue(sampleFuture.isCompleted());
+ } else {
+ assertFalse(sampleFuture.isCompleted());
+ }
+ }
+
+ // Verify completed stack trace sample
+ StackTraceSample sample = sampleFuture.value().get().get();
+
+ assertEquals(0, sample.getSampleId());
+ assertTrue(sample.getEndTime() >= sample.getStartTime());
+
+ Map<ExecutionAttemptID, List<StackTraceElement[]>> tracesByTask = sample.getStackTraces();
+
+ for (ExecutionVertex vertex : vertices) {
+ ExecutionAttemptID executionId = vertex.getCurrentExecutionAttempt().getAttemptId();
+ List<StackTraceElement[]> sampleTraces = tracesByTask.get(executionId);
+
+ assertNotNull("Task not found", sampleTraces);
+ assertTrue(traces.equals(sampleTraces));
+ }
+
+ // Verify no more pending sample
+ assertEquals(0, coord.getNumberOfPendingSamples());
+
+ // Verify no error on late collect
+ coord.collectStackTraces(0, vertices[0].getCurrentExecutionAttempt().getAttemptId(), traces);
+ }
+
+ /** Tests triggering for non-running tasks fails the future. */
+ @Test
+ public void testTriggerStackTraceSampleNotRunningTasks() throws Exception {
+ ExecutionVertex[] vertices = new ExecutionVertex[] {
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.DEPLOYING, true)
+ };
+
+ Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+ vertices,
+ 1,
+ new FiniteDuration(100, TimeUnit.MILLISECONDS),
+ 0);
+
+ assertTrue(sampleFuture.isCompleted());
+ assertTrue(sampleFuture.failed().isCompleted());
+
+ assertTrue(sampleFuture.failed().value().get().get() instanceof IllegalStateException);
+ }
+
+ /** Tests triggering for reset tasks fails the future. */
+ @Test
+ public void testTriggerStackTraceSampleResetRunningTasks() throws Exception {
+ ExecutionVertex[] vertices = new ExecutionVertex[] {
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+ // Fails to send the message to the execution (happens when execution is reset)
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, false)
+ };
+
+ Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+ vertices,
+ 1,
+ new FiniteDuration(100, TimeUnit.MILLISECONDS),
+ 0);
+
+ assertTrue(sampleFuture.isCompleted());
+ assertTrue(sampleFuture.failed().isCompleted());
+ assertTrue(sampleFuture.failed().value().get().get().getCause() instanceof RuntimeException);
+ }
+
+ /** Tests that samples time out if they don't finish in time. */
+ @Test
+ public void testTriggerStackTraceSampleTimeout() throws Exception {
+ int timeout = 100;
+
+ coord = new StackTraceSampleCoordinator(system, timeout);
+
+ ExecutionVertex[] vertices = new ExecutionVertex[] {
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+ };
+
+ Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+ vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
+
+ // Wait for the timeout
+ Thread.sleep(timeout * 2);
+
+ boolean success = false;
+ for (int i = 0; i < 10; i++) {
+ if (sampleFuture.isCompleted()) {
+ success = true;
+ break;
+ }
+
+ Thread.sleep(timeout);
+ }
+
+ assertTrue("Sample did not time out", success);
+
+ Throwable cause = sampleFuture.failed().value().get().get();
+ assertTrue(cause.getCause().getMessage().contains("Time out"));
+
+ // Collect after the timeout
+ try {
+ ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
+ coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
+ fail("Did not throw expected Exception");
+ } catch (IllegalStateException ignored) {
+ }
+ }
+
+ /** Tests that collecting an unknown sample fails. */
+ @Test(expected = IllegalStateException.class)
+ public void testCollectStackTraceForUnknownSample() throws Exception {
+ coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>());
+ }
+
+ /** Tests cancelling of a pending sample. */
+ @Test
+ public void testCancelStackTraceSample() throws Exception {
+ ExecutionVertex[] vertices = new ExecutionVertex[] {
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+ };
+
+ Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+ vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
+
+ assertFalse(sampleFuture.isCompleted());
+
+ // Cancel
+ coord.cancelStackTraceSample(0, null);
+
+ // Verify completed
+ assertTrue(sampleFuture.isCompleted());
+
+ // Verify no more pending samples
+ assertEquals(0, coord.getNumberOfPendingSamples());
+ }
+
+ /** Tests that collecting for a cancelled sample throws no Exception. */
+ @Test
+ public void testCollectStackTraceForCanceledSample() throws Exception {
+ ExecutionVertex[] vertices = new ExecutionVertex[] {
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+ };
+
+ Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+ vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
+
+ assertFalse(sampleFuture.isCompleted());
+
+ coord.cancelStackTraceSample(0, null);
+
+ assertTrue(sampleFuture.isCompleted());
+
+ // Verify no error on late collect
+ ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
+ coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
+ }
+
+ /** Tests that collecting for a cancelled sample throws no Exception. */
+ @Test
+ public void testCollectForDiscardedPendingSample() throws Exception {
+ ExecutionVertex[] vertices = new ExecutionVertex[] {
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+ };
+
+ Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+ vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
+
+ assertFalse(sampleFuture.isCompleted());
+
+ coord.cancelStackTraceSample(0, null);
+
+ assertTrue(sampleFuture.isCompleted());
+
+ // Verify no error on late collect
+ ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
+ coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
+ }
+
+
+ /** Tests that collecting for a unknown task fails. */
+ @Test(expected = IllegalArgumentException.class)
+ public void testCollectStackTraceForUnknownTask() throws Exception {
+ ExecutionVertex[] vertices = new ExecutionVertex[] {
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+ };
+
+ coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
+
+ coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>());
+ }
+
+ /** Tests that shut down fails all pending samples and future sample triggers. */
+ @Test
+ public void testShutDown() throws Exception {
+ ExecutionVertex[] vertices = new ExecutionVertex[] {
+ mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+ };
+
+ List<Future<StackTraceSample>> sampleFutures = new ArrayList<>();
+
+ // Trigger
+ sampleFutures.add(coord.triggerStackTraceSample(
+ vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0));
+
+ sampleFutures.add(coord.triggerStackTraceSample(
+ vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0));
+
+ for (Future<StackTraceSample> future : sampleFutures) {
+ assertFalse(future.isCompleted());
+ }
+
+ // Shut down
+ coord.shutDown();
+
+ // Verify all completed
+ for (Future<StackTraceSample> future : sampleFutures) {
+ assertTrue(future.isCompleted());
+ }
+
+ // Verify new trigger returns failed future
+ Future<StackTraceSample> future = coord.triggerStackTraceSample(
+ vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
+
+ assertTrue(future.isCompleted());
+ assertTrue(future.failed().isCompleted());
+ }
+
+ // ------------------------------------------------------------------------
+
+ private ExecutionVertex mockExecutionVertex(
+ ExecutionAttemptID executionId,
+ ExecutionState state,
+ boolean sendSuccess) {
+
+ Execution exec = mock(Execution.class);
+ when(exec.getAttemptId()).thenReturn(executionId);
+ when(exec.getState()).thenReturn(state);
+
+ ExecutionVertex vertex = mock(ExecutionVertex.class);
+ when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
+ when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+ when(vertex.sendMessageToCurrentExecution(
+ any(Serializable.class), any(ExecutionAttemptID.class), any(AkkaActorGateway.class)))
+ .thenReturn(sendSuccess);
+
+ return vertex;
+ }
+}
|