flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [04/12] flink git commit: [FLINK-3310] [runtime-web] Add back pressure statistics to web monitor (backend)
Date Mon, 08 Feb 2016 14:05:37 GMT
[FLINK-3310] [runtime-web] Add back pressure statistics to web monitor (backend)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7e70da3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7e70da3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7e70da3

Branch: refs/heads/master
Commit: b7e70da351390df6b179cd0cedd11f311401e5a4
Parents: d69fe30
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Feb 1 20:51:23 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Feb 8 15:04:44 2016 +0100

----------------------------------------------------------------------
 docs/setup/config.md                            |   4 +
 .../flink/configuration/ConfigConstants.java    |  25 +-
 flink-runtime-web/pom.xml                       |   6 +
 .../webmonitor/BackPressureStatsTracker.java    | 307 ++++++++++++
 .../webmonitor/OperatorBackPressureStats.java   | 126 +++++
 .../runtime/webmonitor/StackTraceSample.java    | 111 +++++
 .../webmonitor/StackTraceSampleCoordinator.java | 477 +++++++++++++++++++
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  80 +++-
 .../handlers/JobVertexBackPressureHandler.java  | 126 +++++
 .../BackPressureStatsTrackerITCase.java         | 300 ++++++++++++
 .../BackPressureStatsTrackerTest.java           | 196 ++++++++
 .../StackTraceSampleCoordinatorTest.java        | 377 +++++++++++++++
 .../JobVertexBackPressureHandlerTest.java       | 197 ++++++++
 .../runtime/executiongraph/ExecutionGraph.java  |   7 +-
 .../runtime/executiongraph/ExecutionVertex.java |  24 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../messages/StackTraceSampleMessages.scala     | 106 +++++
 .../flink/runtime/taskmanager/TaskManager.scala | 156 +++++-
 .../runtime/taskmanager/TaskManagerTest.java    | 315 ++++++++++--
 19 files changed, 2868 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 91297a4..0eb3acb 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -165,6 +165,10 @@ The following parameters configure Flink's JobManager and TaskManagers.
 - `jobmanager.web.history`: The number of latest jobs that the JobManager's web front-end in its history (DEFAULT: 5).
 - `jobmanager.web.checkpoints.disable`: Disables checkpoint statistics (DEFAULT: `false`).
 - `jobmanager.web.checkpoints.history`: Number of checkpoint statistics to remember (DEFAULT: `10`).
+- `jobmanager.web.backpressure.cleanup-interval`: Time after which cached stats are cleaned up if not accessed (DEFAULT: `600000`, 10 mins).
+- `jobmanager.web.backpressure.refresh-interval`: Time after which available stats are deprecated and need to be refreshed (DEFAULT: `60000`, 1 min).
+- `jobmanager.web.backpressure.num-samples`: Number of stack trace samples to take to determine back pressure (DEFAULT: `100`).
+- `jobmanager.web.backpressure.delay-between-samples`: Delay between stack trace samples to determine back pressure (DEFAULT: `50`, 50 ms).
 
 ### Webclient
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index aba3540..2b75644 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -357,7 +357,18 @@ public final class ConfigConstants {
 
 	/** Config parameter defining the number of checkpoints to remember for recent history. */
 	public static final String JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = "jobmanager.web.checkpoints.history";
-	
+
+	/** Time after which cached stats are cleaned up if not accessed. */
+	public static final String JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = "jobmanager.web.backpressure.cleanup-interval";
+
+	/** Time after which available stats are deprecated and need to be refreshed (by resampling). */
+	public static final String JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = "jobmanager.web.backpressure.refresh-interval";
+
+	/** Number of stack trace samples to take to determine back pressure. */
+	public static final String JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = "jobmanager.web.backpressure.num-samples";
+
+	/** Delay between stack trace samples to determine back pressure. */
+	public static final String JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = "jobmanager.web.backpressure.delay-between-samples";
 
 	// ------------------------------ AKKA ------------------------------------
 
@@ -693,6 +704,18 @@ public final class ConfigConstants {
 	/** Default number of checkpoints to remember for recent history. */
 	public static final int DEFAULT_JOB_MANAGER_WEB_CHECKPOINTS_HISTORY_SIZE = 10;
 
+	/** Time after which cached stats are cleaned up. */
+	public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL = 10 * 60 * 1000;
+
+	/** Time after which available stats are deprecated and need to be refreshed (by resampling). */
+	public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL = 60 * 1000;
+
+	/** Number of samples to take to determine back pressure. */
+	public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES = 100;
+
+	/** Delay between samples to determine back pressure. */
+	public static final int DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY = 50;
+
 	// ------------------------------ Akka Values ------------------------------
 
 	public static String DEFAULT_AKKA_TRANSPORT_HEARTBEAT_INTERVAL = "1000 s";

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/pom.xml b/flink-runtime-web/pom.xml
index 7a6147b..1a19fb1 100644
--- a/flink-runtime-web/pom.xml
+++ b/flink-runtime-web/pom.xml
@@ -134,6 +134,12 @@ under the License.
 			<version>${curator.version}</version>
 			<scope>test</scope>
 		</dependency>
+
+		<dependency>
+			<groupId>com.typesafe.akka</groupId>
+			<artifactId>akka-testkit_${scala.binary.version}</artifactId>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
new file mode 100644
index 0000000..ff0573a
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import akka.dispatch.OnComplete;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Maps;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Back pressure statistics tracker.
+ *
+ * <p>Back pressure is determined by sampling running tasks. If a task is
+ * slowed down by back pressure it will be stuck in memory requests to a
+ * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}.
+ *
+ * <p>The back pressured stack traces look like this:
+ *
+ * <pre>
+ * java.lang.Object.wait(Native Method)
+ * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
+ * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) <--- BLOCKING
+ * request
+ * [...]
+ * </pre>
+ */
+public class BackPressureStatsTracker {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BackPressureStatsTracker.class);
+
+	/** Maximum stack trace depth for samples. */
+	static final int MAX_STACK_TRACE_DEPTH = 3;
+
+	/** Expected class name for back pressure indicating stack trace element. */
+	static final String EXPECTED_CLASS_NAME = "org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
+
+	/** Expected method name for back pressure indicating stack trace element. */
+	static final String EXPECTED_METHOD_NAME = "requestBufferBlocking";
+
+	/** Lock guarding trigger operations. */
+	private final Object lock = new Object();
+
+	/* Stack trace sample coordinator. */
+	private final StackTraceSampleCoordinator coordinator;
+
+	/**
+	 * Completed stats. Important: Job vertex IDs need to be scoped by job ID,
+	 * because they are potentially constant across runs messing up the cached
+	 * data.
+	 */
+	private final Cache<ExecutionJobVertex, OperatorBackPressureStats> operatorStatsCache;
+
+	/** Pending in progress stats. Important: Job vertex IDs need to be scoped
+	 * by job ID, because they are potentially constant across runs messing up
+	 * the cached data.*/
+	private final Set<ExecutionJobVertex> pendingStats = new HashSet<>();
+
+	/** Cleanup interval for completed stats cache. */
+	private final int cleanUpInterval;
+
+	private final int numSamples;
+
+	private final FiniteDuration delayBetweenSamples;
+
+	/** Flag indicating whether the stats tracker has been shut down. */
+	private boolean shutDown;
+
+	/**
+	 * Creates a back pressure statistics tracker.
+	 *
+	 * @param cleanUpInterval     Clean up interval for completed stats.
+	 * @param numSamples          Number of stack trace samples when determining back pressure.
+	 * @param delayBetweenSamples Delay between samples when determining back pressure.
+	 */
+	public BackPressureStatsTracker(
+			StackTraceSampleCoordinator coordinator,
+			int cleanUpInterval,
+			int numSamples,
+			FiniteDuration delayBetweenSamples) {
+
+		this.coordinator = checkNotNull(coordinator, "Stack trace sample coordinator");
+
+		checkArgument(cleanUpInterval >= 0, "Clean up interval");
+		this.cleanUpInterval = cleanUpInterval;
+
+		checkArgument(numSamples >= 1, "Number of samples");
+		this.numSamples = numSamples;
+
+		this.delayBetweenSamples = checkNotNull(delayBetweenSamples, "Delay between samples");
+
+		this.operatorStatsCache = CacheBuilder.newBuilder()
+				.concurrencyLevel(1)
+				.expireAfterAccess(cleanUpInterval, TimeUnit.MILLISECONDS)
+				.build();
+	}
+
+	/** Cleanup interval for completed stats cache. */
+	public long getCleanUpInterval() {
+		return cleanUpInterval;
+	}
+
+	/**
+	 * Returns back pressure statistics for a operator.
+	 *
+	 * @param vertex Operator to get the stats for.
+	 *
+	 * @return Back pressure statistics for an operator
+	 */
+	public Option<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex vertex) {
+		return Option.apply(operatorStatsCache.getIfPresent(vertex));
+	}
+
+	/**
+	 * Triggers a stack trace sample for a operator to gather the back pressure
+	 * statistics. If there is a sample in progress for the operator, the call
+	 * is ignored.
+	 *
+	 * @param vertex Operator to get the stats for.
+	 */
+	@SuppressWarnings("unchecked")
+	public void triggerStackTraceSample(ExecutionJobVertex vertex) {
+		synchronized (lock) {
+			if (shutDown) {
+				return;
+			}
+
+			if (!pendingStats.contains(vertex)) {
+				pendingStats.add(vertex);
+
+				Future<StackTraceSample> sample = coordinator.triggerStackTraceSample(
+						vertex.getTaskVertices(),
+						numSamples,
+						delayBetweenSamples,
+						MAX_STACK_TRACE_DEPTH);
+
+				sample.onComplete(new StackTraceSampleCompletionCallback(
+						vertex), vertex.getGraph().getExecutionContext());
+			}
+		}
+	}
+
+	/**
+	 * Cleans up the operator stats cache if it contains timed out entries.
+	 *
+	 * <p>The Guava cache only evicts as maintenance during normal operations.
+	 * If this handler is inactive, it will never be cleaned.
+	 */
+	public void cleanUpOperatorStatsCache() {
+		operatorStatsCache.cleanUp();
+	}
+
+	/**
+	 * Shuts down the stats tracker.
+	 *
+	 * <p>Invalidates the cache and clears all pending stats.
+	 */
+	public void shutDown() {
+		synchronized (lock) {
+			if (!shutDown) {
+				operatorStatsCache.invalidateAll();
+				pendingStats.clear();
+
+				shutDown = true;
+			}
+		}
+	}
+
+	/**
+	 * Invalidates the cache (irrespective of clean up interval).
+	 */
+	void invalidateOperatorStatsCache() {
+		operatorStatsCache.invalidateAll();
+	}
+
+	/**
+	 * Callback on completed stack trace sample.
+	 */
+	class StackTraceSampleCompletionCallback extends OnComplete<StackTraceSample> {
+
+		private final ExecutionJobVertex vertex;
+
+		public StackTraceSampleCompletionCallback(ExecutionJobVertex vertex) {
+			this.vertex = vertex;
+		}
+
+		@Override
+		public void onComplete(Throwable failure, StackTraceSample success) throws Throwable {
+			synchronized (lock) {
+				try {
+					if (shutDown) {
+						return;
+					}
+
+					if (success != null) {
+						OperatorBackPressureStats stats = createStatsFromSample(success);
+						operatorStatsCache.put(vertex, stats);
+					} else {
+						LOG.error("Failed to gather stack trace sample.", failure);
+					}
+				} catch (Throwable t) {
+					LOG.error("Error during stats completion.", t);
+				} finally {
+					pendingStats.remove(vertex);
+				}
+			}
+		}
+
+		/**
+		 * Creates the back pressure stats from a stack trace sample.
+		 *
+		 * @param sample Stack trace sample to base stats on.
+		 *
+		 * @return Back pressure stats
+		 */
+		private OperatorBackPressureStats createStatsFromSample(StackTraceSample sample) {
+			Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = sample.getStackTraces();
+
+			// Map task ID to subtask index, because the web interface expects
+			// it like that.
+			Map<ExecutionAttemptID, Integer> subtaskIndexMap = Maps
+					.newHashMapWithExpectedSize(traces.size());
+
+			Set<ExecutionAttemptID> sampledTasks = sample.getStackTraces().keySet();
+
+			for (ExecutionVertex task : vertex.getTaskVertices()) {
+				ExecutionAttemptID taskId = task.getCurrentExecutionAttempt().getAttemptId();
+				if (sampledTasks.contains(taskId)) {
+					subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex());
+				} else {
+					throw new RuntimeException("Outdated sample. A task, which is part of the " +
+							"sample has been reset.");
+				}
+			}
+
+			// Ratio of blocked samples to total samples per sub task. Array
+			// position corresponds to sub task index.
+			double[] backPressureRatio = new double[traces.size()];
+
+			for (Entry<ExecutionAttemptID, List<StackTraceElement[]>> entry : traces.entrySet()) {
+				int backPressureSamples = 0;
+
+				List<StackTraceElement[]> taskTraces = entry.getValue();
+
+				for (StackTraceElement[] trace : taskTraces) {
+					for (int i = trace.length - 1; i >= 0; i--) {
+						StackTraceElement elem = trace[i];
+
+						if (elem.getClassName().equals(EXPECTED_CLASS_NAME) &&
+								elem.getMethodName().equals(EXPECTED_METHOD_NAME)) {
+
+							backPressureSamples++;
+							break; // Continue with next stack trace
+						}
+					}
+				}
+
+				int subtaskIndex = subtaskIndexMap.get(entry.getKey());
+
+				int size = taskTraces.size();
+				double ratio = (size > 0)
+						? ((double) backPressureSamples) / size
+						: 0;
+
+				backPressureRatio[subtaskIndex] = ratio;
+			}
+
+			return new OperatorBackPressureStats(
+					sample.getSampleId(),
+					sample.getEndTime(),
+					backPressureRatio);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java
new file mode 100644
index 0000000..cb262e3
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/OperatorBackPressureStats.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import java.util.Arrays;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Back pressure statistics of multiple tasks.
+ *
+ * <p>Statistics are gathered by sampling stack traces of running tasks. The
+ * back pressure ratio denotes the ratio of traces indicating back pressure
+ * to the total number of sampled traces.
+ */
+public class OperatorBackPressureStats {
+
+	/** ID of the corresponding sample. */
+	private final int sampleId;
+
+	/** End time stamp of the corresponding sample. */
+	private final long endTimestamp;
+
+	/** Back pressure ratio per subtask. */
+	private final double[] subTaskBackPressureRatio;
+
+	/** Maximum back pressure ratio. */
+	private final double maxSubTaskBackPressureRatio;
+
+	public OperatorBackPressureStats(
+			int sampleId,
+			long endTimestamp,
+			double[] subTaskBackPressureRatio) {
+
+		this.sampleId = sampleId;
+		this.endTimestamp = endTimestamp;
+		this.subTaskBackPressureRatio = checkNotNull(subTaskBackPressureRatio, "Sub task back pressure ratio");
+		checkArgument(subTaskBackPressureRatio.length >= 1, "No Sub task back pressure ratio specified");
+
+		double max = 0;
+		for (double ratio : subTaskBackPressureRatio) {
+			if (ratio > max) {
+				max = ratio;
+			}
+		}
+
+		maxSubTaskBackPressureRatio = max;
+	}
+
+	/**
+	 * Returns the ID of the sample.
+	 *
+	 * @return ID of the sample
+	 */
+	public int getSampleId() {
+		return sampleId;
+	}
+
+	/**
+	 * Returns the time stamp, when all stack traces were collected at the
+	 * JobManager.
+	 *
+	 * @return Time stamp, when all stack traces were collected at the
+	 * JobManager
+	 */
+	public long getEndTimestamp() {
+		return endTimestamp;
+	}
+
+	/**
+	 * Returns the number of sub tasks.
+	 *
+	 * @return Number of sub tasks.
+	 */
+	public int getNumberOfSubTasks() {
+		return subTaskBackPressureRatio.length;
+	}
+
+	/**
+	 * Returns the ratio of stack traces indicating back pressure to total
+	 * number of sampled stack traces.
+	 *
+	 * @param index Subtask index.
+	 *
+	 * @return Ratio of stack traces indicating back pressure to total number
+	 * of sampled stack traces.
+	 */
+	public double getBackPressureRatio(int index) {
+		return subTaskBackPressureRatio[index];
+	}
+
+	/**
+	 * Returns the maximum back pressure ratio of all sub tasks.
+	 *
+	 * @return Maximum back pressure ratio of all sub tasks.
+	 */
+	public double getMaxBackPressureRatio() {
+		return maxSubTaskBackPressureRatio;
+	}
+
+	@Override
+	public String toString() {
+		return "OperatorBackPressureStats{" +
+				"sampleId=" + sampleId +
+				", endTimestamp=" + endTimestamp +
+				", subTaskBackPressureRatio=" + Arrays.toString(subTaskBackPressureRatio) +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java
new file mode 100644
index 0000000..e861265
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSample.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+/**
+ * A sample of stack traces for one or more tasks.
+ *
+ * <p>The sampling is triggered in {@link StackTraceSampleCoordinator}.
+ */
+public class StackTraceSample {
+
+	/** ID of this sample (unique per job) */
+	private final int sampleId;
+
+	/** Time stamp, when the sample was triggered. */
+	private final long startTime;
+
+	/** Time stamp, when all stack traces were collected at the JobManager. */
+	private final long endTime;
+
+	/** Map of stack traces by execution ID. */
+	private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
+
+	/**
+	 * Creates a stack trace sample
+	 *
+	 * @param sampleId          ID of the sample.
+	 * @param startTime         Time stamp, when the sample was triggered.
+	 * @param endTime           Time stamp, when all stack traces were
+	 *                          collected at the JobManager.
+	 * @param stackTracesByTask Map of stack traces by execution ID.
+	 */
+	public StackTraceSample(
+			int sampleId,
+			long startTime,
+			long endTime,
+			Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask) {
+
+		checkArgument(sampleId >= 0, "Negative sample ID");
+		checkArgument(startTime >= 0, "Negative start time");
+		checkArgument(endTime >= startTime, "End time before start time");
+
+		this.sampleId = sampleId;
+		this.startTime = startTime;
+		this.endTime = endTime;
+		this.stackTracesByTask = Collections.unmodifiableMap(stackTracesByTask);
+	}
+
+	/**
+	 * Returns the ID of the sample.
+	 *
+	 * @return ID of the sample
+	 */
+	public int getSampleId() {
+		return sampleId;
+	}
+
+	/**
+	 * Returns the time stamp, when the sample was triggered.
+	 *
+	 * @return Time stamp, when the sample was triggered
+	 */
+	public long getStartTime() {
+		return startTime;
+	}
+
+	/**
+	 * Returns the time stamp, when all stack traces were collected at the
+	 * JobManager.
+	 *
+	 * @return Time stamp, when all stack traces were collected at the
+	 * JobManager
+	 */
+	public long getEndTime() {
+		return endTime;
+	}
+
+	/**
+	 * Returns the a map of stack traces by execution ID.
+	 *
+	 * @return Map of stack traces by execution ID
+	 */
+	public Map<ExecutionAttemptID, List<StackTraceElement[]>> getStackTraces() {
+		return stackTracesByTask;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
new file mode 100644
index 0000000..e7b292f
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
@@ -0,0 +1,477 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.google.common.collect.Maps;
+import org.apache.flink.runtime.akka.FlinkUntypedActor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleFailure;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleSuccess;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.Future;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.UUID;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A coordinator for triggering and collecting stack traces of running tasks.
+ */
+public class StackTraceSampleCoordinator {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StackTraceSampleCoordinator.class);
+
+	private static final int NUM_GHOST_SAMPLE_IDS = 10;
+
+	private final Object lock = new Object();
+
+	/** Actor for responses. */
+	private final ActorGateway responseActor;
+
+	/** Time out after the expected sampling duration. */
+	private final int sampleTimeout;
+
+	/** In progress samples (guarded by lock). */
+	private final Map<Integer, PendingStackTraceSample> pendingSamples = new HashMap<>();
+
+	/** A list of recent sample IDs to identify late messages vs. invalid ones. */
+	private final ArrayDeque<Integer> recentPendingSamples = new ArrayDeque<>(NUM_GHOST_SAMPLE_IDS);
+
+	/** Sample ID counter (guarded by lock). */
+	private int sampleIdCounter;
+
+	/**
+	 * Timer to discard expired in progress samples. Lazily initiated as the
+	 * sample coordinator will not be used very often (guarded by lock).
+	 */
+	private Timer timer;
+
+	/**
+	 * Flag indicating whether the coordinator is still running (guarded by
+	 * lock).
+	 */
+	private boolean isShutDown;
+
+	/**
+	 * Creates a new coordinator for the job.
+	 *
+	 * @param sampleTimeout Time out after the expected sampling duration.
+	 *                      This is added to the expected duration of a
+	 *                      sample, which is determined by the number of
+	 *                      samples and the delay between each sample.
+	 */
+	public StackTraceSampleCoordinator(ActorSystem actorSystem, int sampleTimeout) {
+		Props props = Props.create(StackTraceSampleCoordinatorActor.class, this);
+		this.responseActor = new AkkaActorGateway(actorSystem.actorOf(props), null);
+
+		checkArgument(sampleTimeout >= 0);
+		this.sampleTimeout = sampleTimeout;
+	}
+
+	/**
+	 * Triggers a stack trace sample to all tasks.
+	 *
+	 * @param tasksToSample       Tasks to sample.
+	 * @param numSamples          Number of stack trace samples to collect.
+	 * @param delayBetweenSamples Delay between consecutive samples.
+	 * @param maxStackTraceDepth  Maximum depth of the stack trace. 0 indicates
+	 *                            no maximum and keeps the complete stack trace.
+	 * @return A future of the completed stack trace sample
+	 */
+	@SuppressWarnings("unchecked")
+	public Future<StackTraceSample> triggerStackTraceSample(
+			ExecutionVertex[] tasksToSample,
+			int numSamples,
+			FiniteDuration delayBetweenSamples,
+			int maxStackTraceDepth) {
+
+		checkNotNull(tasksToSample, "Tasks to sample");
+		checkArgument(tasksToSample.length >= 1, "No tasks to sample");
+		checkArgument(numSamples >= 1, "No number of samples");
+		checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack trace depth");
+
+		// Execution IDs of running tasks
+		ExecutionAttemptID[] triggerIds = new ExecutionAttemptID[tasksToSample.length];
+
+		// Check that all tasks are RUNNING before triggering anything. The
+		// triggering can still fail.
+		for (int i = 0; i < triggerIds.length; i++) {
+			Execution execution = tasksToSample[i].getCurrentExecutionAttempt();
+			if (execution != null && execution.getState() == ExecutionState.RUNNING) {
+				triggerIds[i] = execution.getAttemptId();
+			} else {
+				Promise failedPromise = new scala.concurrent.impl.Promise.DefaultPromise<>()
+						.failure(new IllegalStateException("Task " + tasksToSample[i]
+								.getTaskNameWithSubtaskIndex() + " is not running."));
+				return failedPromise.future();
+			}
+		}
+
+		synchronized (lock) {
+			if (isShutDown) {
+				Promise failedPromise = new scala.concurrent.impl.Promise.DefaultPromise<>()
+						.failure(new IllegalStateException("Shut down"));
+				return failedPromise.future();
+			}
+
+			if (timer == null) {
+				timer = new Timer("Stack trace sample coordinator timer");
+			}
+
+			int sampleId = sampleIdCounter++;
+
+			LOG.debug("Triggering stack trace sample {}", sampleId);
+
+			final PendingStackTraceSample pending = new PendingStackTraceSample(
+					sampleId, triggerIds);
+
+			// Discard the sample if it takes too long. We don't send cancel
+			// messages to the task managers, but only wait for the responses
+			// and then ignore them.
+			long expectedDuration = numSamples * delayBetweenSamples.toMillis();
+			long discardDelay = expectedDuration + sampleTimeout;
+
+			TimerTask discardTask = new TimerTask() {
+				@Override
+				public void run() {
+					try {
+						synchronized (lock) {
+							if (!pending.isDiscarded()) {
+								LOG.info("Sample {} expired before completing",
+										pending.getSampleId());
+
+								pending.discard(new RuntimeException("Time out"));
+								pendingSamples.remove(pending.getSampleId());
+							}
+						}
+					} catch (Throwable t) {
+						LOG.error("Exception while handling sample timeout", t);
+					}
+				}
+			};
+
+			// Add the pending sample before scheduling the discard task to
+			// prevent races with removing it again.
+			pendingSamples.put(sampleId, pending);
+
+			timer.schedule(discardTask, discardDelay);
+
+			boolean success = true;
+			try {
+				// Trigger all samples
+				for (int i = 0; i < tasksToSample.length; i++) {
+					TriggerStackTraceSample msg = new TriggerStackTraceSample(
+							sampleId,
+							triggerIds[i],
+							numSamples,
+							delayBetweenSamples,
+							maxStackTraceDepth);
+
+					if (!tasksToSample[i].sendMessageToCurrentExecution(
+							msg,
+							triggerIds[i],
+							responseActor)) {
+						success = false;
+						break;
+					}
+				}
+
+				return pending.getStackTraceSampleFuture();
+			} finally {
+				if (!success) {
+					pending.discard(new RuntimeException("Failed to trigger sample, " +
+							"because task has been reset."));
+					pendingSamples.remove(sampleId);
+					rememberRecentSampleId(sampleId);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Cancels a pending sample.
+	 *
+	 * @param sampleId ID of the sample to cancel.
+	 * @param cause Cause of the cancelling (can be <code>null</code>).
+	 */
+	public void cancelStackTraceSample(int sampleId, Exception cause) {
+		synchronized (lock) {
+			if (isShutDown) {
+				return;
+			}
+
+			PendingStackTraceSample sample = pendingSamples.remove(sampleId);
+			if (sample != null) {
+				if (cause != null) {
+					LOG.info("Cancelling sample " + sampleId, cause);
+				} else {
+					LOG.info("Cancelling sample {}", sampleId);
+				}
+
+				sample.discard(cause);
+				rememberRecentSampleId(sampleId);
+			}
+		}
+	}
+
+	/**
+	 * Shuts down the coordinator.
+	 *
+	 * <p>After shut down, no further operations are executed.
+	 */
+	public void shutDown() {
+		synchronized (lock) {
+			if (!isShutDown) {
+				LOG.info("Shutting down stack trace sample coordinator.");
+
+				for (PendingStackTraceSample pending : pendingSamples.values()) {
+					pending.discard(new RuntimeException("Shut down"));
+				}
+
+				pendingSamples.clear();
+
+				if (timer != null) {
+					timer.cancel();
+				}
+
+				isShutDown = true;
+			}
+		}
+	}
+
+	/**
+	 * Collects stack traces of a task.
+	 *
+	 * @param sampleId    ID of the sample.
+	 * @param executionId ID of the sampled task.
+	 * @param stackTraces Stack traces of the sampled task.
+	 *
+	 * @throws IllegalStateException If unknown sample ID and not recently
+	 *                               finished or cancelled sample.
+	 */
+	public void collectStackTraces(
+			int sampleId,
+			ExecutionAttemptID executionId,
+			List<StackTraceElement[]> stackTraces) {
+
+		synchronized (lock) {
+			if (isShutDown) {
+				return;
+			}
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Collecting stack trace sample {} of task {}", sampleId, executionId);
+			}
+
+			PendingStackTraceSample pending = pendingSamples.get(sampleId);
+
+			if (pending != null) {
+				pending.collectStackTraces(executionId, stackTraces);
+
+				// Publish the sample
+				if (pending.isComplete()) {
+					pendingSamples.remove(sampleId);
+					rememberRecentSampleId(sampleId);
+
+					pending.completePromiseAndDiscard();
+				}
+			} else if (recentPendingSamples.contains(sampleId)) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Received late stack trace sample {} of task {}",
+							sampleId, executionId);
+				}
+			} else {
+				throw new IllegalStateException("Unknown sample ID " + sampleId);
+			}
+		}
+	}
+
+	private void rememberRecentSampleId(int sampleId) {
+		if (recentPendingSamples.size() >= NUM_GHOST_SAMPLE_IDS) {
+			recentPendingSamples.removeFirst();
+		}
+		recentPendingSamples.addLast(sampleId);
+	}
+
+	int getNumberOfPendingSamples() {
+		synchronized (lock) {
+			return pendingSamples.size();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A pending stack trace sample, which collects stack traces and owns a
+	 * {@link StackTraceSample} promise.
+	 *
+	 * <p>Access pending sample in lock scope.
+	 */
+	private static class PendingStackTraceSample {
+
+		private final int sampleId;
+		private final long startTime;
+		private final Set<ExecutionAttemptID> pendingTasks;
+		private final Map<ExecutionAttemptID, List<StackTraceElement[]>> stackTracesByTask;
+		private final Promise<StackTraceSample> stackTracePromise;
+
+		private boolean isDiscarded;
+
+		PendingStackTraceSample(
+				int sampleId,
+				ExecutionAttemptID[] tasksToCollect) {
+
+			this.sampleId = sampleId;
+			this.startTime = System.currentTimeMillis();
+			this.pendingTasks = new HashSet<>(Arrays.asList(tasksToCollect));
+			this.stackTracesByTask = Maps.newHashMapWithExpectedSize(tasksToCollect.length);
+			this.stackTracePromise = new scala.concurrent.impl.Promise.DefaultPromise<>();
+		}
+
+		int getSampleId() {
+			return sampleId;
+		}
+
+		long getStartTime() {
+			return startTime;
+		}
+
+		boolean isDiscarded() {
+			return isDiscarded;
+		}
+
+		boolean isComplete() {
+			if (isDiscarded) {
+				throw new IllegalStateException("Discarded");
+			}
+
+			return pendingTasks.isEmpty();
+		}
+
+		void discard(Throwable cause) {
+			if (!isDiscarded) {
+				pendingTasks.clear();
+				stackTracesByTask.clear();
+
+				stackTracePromise.failure(new RuntimeException("Discarded", cause));
+
+				isDiscarded = true;
+			}
+		}
+
+		void collectStackTraces(ExecutionAttemptID executionId, List<StackTraceElement[]> stackTraces) {
+			if (isDiscarded) {
+				throw new IllegalStateException("Discarded");
+			}
+
+			if (pendingTasks.remove(executionId)) {
+				stackTracesByTask.put(executionId, Collections.unmodifiableList(stackTraces));
+			} else if (isComplete()) {
+				throw new IllegalStateException("Completed");
+			} else {
+				throw new IllegalArgumentException("Unknown task " + executionId);
+			}
+		}
+
+		void completePromiseAndDiscard() {
+			if (isComplete()) {
+				isDiscarded = true;
+
+				long endTime = System.currentTimeMillis();
+
+				StackTraceSample stackTraceSample = new StackTraceSample(
+						sampleId,
+						startTime,
+						endTime,
+						stackTracesByTask);
+
+				stackTracePromise.success(stackTraceSample);
+			} else {
+				throw new IllegalStateException("Not completed yet");
+			}
+		}
+
+		@SuppressWarnings("unchecked")
+		Future<StackTraceSample> getStackTraceSampleFuture() {
+			return stackTracePromise.future();
+		}
+	}
+
+	/**
+	 * Actor for stack trace sample responses.
+	 */
+	private static class StackTraceSampleCoordinatorActor extends FlinkUntypedActor {
+
+		StackTraceSampleCoordinator coordinator;
+
+		public StackTraceSampleCoordinatorActor(StackTraceSampleCoordinator coordinator) {
+			this.coordinator = checkNotNull(coordinator, "Stack trace sample coordinator");
+		}
+
+		@Override
+		protected void handleMessage(Object msg) throws Exception {
+			try {
+				if (msg instanceof ResponseStackTraceSampleSuccess) {
+					ResponseStackTraceSampleSuccess success = (ResponseStackTraceSampleSuccess) msg;
+
+					coordinator.collectStackTraces(
+							success.sampleId(),
+							success.executionId(),
+							success.samples());
+				} else if (msg instanceof ResponseStackTraceSampleFailure) {
+					ResponseStackTraceSampleFailure failure = (ResponseStackTraceSampleFailure) msg;
+
+					coordinator.cancelStackTraceSample(failure.sampleId(), failure.cause());
+				} else {
+					throw new IllegalArgumentException("Unexpected task sample message");
+				}
+			} catch (Throwable t) {
+				LOG.error("Error responding to message '" + msg + "': " + t.getMessage() + ".", t);
+			}
+		}
+
+		@Override
+		protected UUID getLeaderSessionID() {
+			return null;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index 3523091..08ed2f9 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor;
 
 import akka.actor.ActorSystem;
-
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
@@ -29,14 +28,17 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.http.HttpServerCodec;
 import io.netty.handler.codec.http.router.Handler;
 import io.netty.handler.codec.http.router.Router;
-
 import org.apache.commons.io.FileUtils;
-
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
+import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
 import org.apache.flink.runtime.webmonitor.handlers.ConstantTextHandler;
+import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
+import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarAccessDeniedHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarListHandler;
@@ -46,29 +48,24 @@ import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobCheckpointsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
-import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVertexCheckpointsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
-import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
 import org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler;
 import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
 import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -121,8 +118,11 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 	private final File uploadDir;
 
-	private AtomicBoolean cleanedUp = new AtomicBoolean();
+	private final StackTraceSampleCoordinator stackTraceSamples;
 
+	private final BackPressureStatsTracker backPressureStatsTracker;
+
+	private AtomicBoolean cleanedUp = new AtomicBoolean();
 
 	public WebRuntimeMonitor(
 			Configuration config,
@@ -163,6 +163,34 @@ public class WebRuntimeMonitor implements WebMonitor {
 
 		ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder();
 
+		// - Back pressure stats ----------------------------------------------
+
+		stackTraceSamples = new StackTraceSampleCoordinator(actorSystem, 60000);
+
+		// Back pressure stats tracker config
+		int cleanUpInterval = config.getInteger(
+				ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL,
+				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_CLEAN_UP_INTERVAL);
+
+		int refreshInterval = config.getInteger(
+				ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL,
+				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_REFRESH_INTERVAL);
+
+		int numSamples = config.getInteger(
+				ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES,
+				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_NUM_SAMPLES);
+
+		int delay = config.getInteger(
+				ConfigConstants.JOB_MANAGER_WEB_BACK_PRESSURE_DELAY,
+				ConfigConstants.DEFAULT_JOB_MANAGER_WEB_BACK_PRESSURE_DELAY);
+
+		FiniteDuration delayBetweenSamples = new FiniteDuration(delay, TimeUnit.MILLISECONDS);
+
+		backPressureStatsTracker = new BackPressureStatsTracker(
+				stackTraceSamples, cleanUpInterval, numSamples, delayBetweenSamples);
+
+		// --------------------------------------------------------------------
+
 		router = new Router()
 			// config how to interact with this web server
 			.GET("/config", handler(new DashboardConfigHandler(cfg.getRefreshInterval())))
@@ -187,7 +215,10 @@ public class WebRuntimeMonitor implements WebMonitor {
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/checkpoints", handler(new JobVertexCheckpointsHandler(currentGraphs)))
-
+			.GET("/jobs/:jobid/vertices/:vertexid/backpressure", handler(new JobVertexBackPressureHandler(
+							currentGraphs,
+							backPressureStatsTracker,
+							refreshInterval)))
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs)))
 			.GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs)))
@@ -298,6 +329,23 @@ public class WebRuntimeMonitor implements WebMonitor {
 		synchronized (startupShutdownLock) {
 			jobManagerAddressPromise.success(jobManagerAkkaUrl);
 			leaderRetrievalService.start(retriever);
+
+			long delay = backPressureStatsTracker.getCleanUpInterval();
+
+			// Scheduled back pressure stats tracker cache cleanup. We schedule
+			// this here repeatedly, because cache clean up only happens on
+			// interactions with the cache. We need it to make sure that we
+			// don't leak memory after completed jobs or long ago accessed stats.
+			bootstrap.childGroup().scheduleWithFixedDelay(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						backPressureStatsTracker.cleanUpOperatorStatsCache();
+					} catch (Throwable t) {
+						LOG.error("Error during back pressure stats cache cleanup.", t);
+					}
+				}
+			}, delay, delay, TimeUnit.MILLISECONDS);
 		}
 	}
 
@@ -316,6 +364,10 @@ public class WebRuntimeMonitor implements WebMonitor {
 				}
 			}
 
+			stackTraceSamples.shutDown();
+
+			backPressureStatsTracker.shutDown();
+
 			cleanup();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
new file mode 100644
index 0000000..3ce6f02
--- /dev/null
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
+import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
+import scala.Option;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Request handler that returns back pressure stats for a single job vertex and
+ * all its sub tasks.
+ */
+public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandler {
+
+	/** Back pressure stats tracker. */
+	private final BackPressureStatsTracker backPressureStatsTracker;
+
+	/** Time after which stats are considered outdated. */
+	private final int refreshInterval;
+
+	public JobVertexBackPressureHandler(
+			ExecutionGraphHolder executionGraphHolder,
+			BackPressureStatsTracker backPressureStatsTracker,
+			int refreshInterval) {
+
+		super(executionGraphHolder);
+		this.backPressureStatsTracker = checkNotNull(backPressureStatsTracker, "Stats tracker");
+		checkArgument(refreshInterval >= 0, "Negative timeout");
+		this.refreshInterval = refreshInterval;
+	}
+
+	@Override
+	public String handleRequest(
+			ExecutionJobVertex jobVertex,
+			Map<String, String> params) throws Exception {
+
+		try (StringWriter writer = new StringWriter();
+				JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer)) {
+
+			gen.writeStartObject();
+
+			Option<OperatorBackPressureStats> statsOption = backPressureStatsTracker
+					.getOperatorBackPressureStats(jobVertex);
+
+			if (statsOption.isDefined()) {
+				OperatorBackPressureStats stats = statsOption.get();
+
+				// Check whether we need to refresh
+				if (refreshInterval <= System.currentTimeMillis() - stats.getEndTimestamp()) {
+					backPressureStatsTracker.triggerStackTraceSample(jobVertex);
+					gen.writeStringField("status", "deprecated");
+				} else {
+					gen.writeStringField("status", "ok");
+				}
+
+				gen.writeStringField("backpressure-level", getBackPressureLevel(stats.getMaxBackPressureRatio()));
+				gen.writeNumberField("end-timestamp", stats.getEndTimestamp());
+
+				// Sub tasks
+				gen.writeArrayFieldStart("subtasks");
+				int numSubTasks = stats.getNumberOfSubTasks();
+				for (int i = 0; i < numSubTasks; i++) {
+					double ratio = stats.getBackPressureRatio(i);
+
+					gen.writeStartObject();
+					gen.writeNumberField("subtask", i);
+					gen.writeStringField("backpressure-level", getBackPressureLevel(ratio));
+					gen.writeNumberField("ratio", ratio);
+					gen.writeEndObject();
+				}
+				gen.writeEndArray();
+			} else {
+				backPressureStatsTracker.triggerStackTraceSample(jobVertex);
+				gen.writeStringField("status", "deprecated");
+			}
+
+			gen.writeEndObject();
+			gen.close();
+
+			return writer.toString();
+		}
+	}
+
+	/**
+	 * Returns the back pressure level as a String.
+	 *
+	 * @param backPressureRatio Ratio of back pressures samples to total number of samples.
+	 *
+	 * @return Back pressure level ('no', 'low', or 'high')
+	 */
+	static String getBackPressureLevel(double backPressureRatio) {
+		if (backPressureRatio <= 0.10) {
+			return "ok";
+		} else if (backPressureRatio <= 0.5) {
+			return "low";
+		} else {
+			return "high";
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
new file mode 100644
index 0000000..52b0794
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerITCase.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
+import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Simple back pressured task test.
+ */
+public class BackPressureStatsTrackerITCase extends TestLogger {
+
+	private static NetworkBufferPool networkBufferPool;
+	private static ActorSystem testActorSystem;
+
+	/** Shared as static variable with the test task. */
+	private static BufferPool testBufferPool;
+
+	@BeforeClass
+	public static void setup() {
+		testActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+		networkBufferPool = new NetworkBufferPool(100, 8192, MemoryType.HEAP);
+	}
+
+	@AfterClass
+	public static void teardown() {
+		JavaTestKit.shutdownActorSystem(testActorSystem);
+		networkBufferPool.destroy();
+	}
+
+	/**
+	 * Tests a simple fake-back pressured task. Back pressure is assumed when
+	 * sampled stack traces are in blocking buffer requests.
+	 */
+	@Test
+	public void testBackPressuredProducer() throws Exception {
+		new JavaTestKit(testActorSystem) {{
+			final FiniteDuration deadline = new FiniteDuration(60, TimeUnit.SECONDS);
+
+			// The JobGraph
+			final JobGraph jobGraph = new JobGraph();
+			final int parallelism = 4;
+
+			final JobVertex task = new JobVertex("Task");
+			task.setInvokableClass(BackPressuredTask.class);
+			task.setParallelism(parallelism);
+
+			jobGraph.addVertex(task);
+
+			ActorGateway jobManger = null;
+			ActorGateway taskManager = null;
+
+			//
+			// 1) Consume all buffers at first (no buffers for the test task)
+			//
+			testBufferPool = networkBufferPool.createBufferPool(1, false);
+			final List<Buffer> buffers = new ArrayList<>();
+			while (true) {
+				Buffer buffer = testBufferPool.requestBuffer();
+				if (buffer != null) {
+					buffers.add(buffer);
+				} else {
+					break;
+				}
+			}
+
+			try {
+				jobManger = TestingUtils.createJobManager(testActorSystem, new Configuration());
+
+				Configuration config = new Configuration();
+				config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+
+				taskManager = TestingUtils.createTaskManager(
+						testActorSystem, jobManger, config, true, true);
+
+				final ActorGateway jm = jobManger;
+
+				new Within(deadline) {
+					@Override
+					protected void run() {
+						try {
+							ActorGateway testActor = new AkkaActorGateway(getTestActor(), null);
+
+							// Submit the job and wait until it is running
+							JobClient.submitJobDetached(
+									jm,
+									jobGraph,
+									deadline,
+									ClassLoader.getSystemClassLoader());
+
+							jm.tell(new WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);
+
+							expectMsgEquals(new AllVerticesRunning(jobGraph.getJobID()));
+
+							// Get the ExecutionGraph
+							jm.tell(new RequestExecutionGraph(jobGraph.getJobID()), testActor);
+
+							ExecutionGraphFound executionGraphResponse =
+									expectMsgClass(ExecutionGraphFound.class);
+
+							ExecutionGraph executionGraph = executionGraphResponse.executionGraph();
+							ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
+
+							StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
+									testActorSystem, 60000);
+
+							// Verify back pressure (clean up interval can be ignored)
+							BackPressureStatsTracker statsTracker = new BackPressureStatsTracker(
+									coordinator,
+									100 * 1000,
+									20,
+									new FiniteDuration(10, TimeUnit.MILLISECONDS));
+
+							int numAttempts = 10;
+
+							int nextSampleId = 0;
+
+							// Verify that all tasks are back pressured. This
+							// can fail if the task takes longer to request
+							// the buffer.
+							for (int attempt = 0; attempt < numAttempts; attempt++) {
+								try {
+									OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
+
+									assertEquals(nextSampleId + attempt, stats.getSampleId());
+									assertEquals(parallelism, stats.getNumberOfSubTasks());
+									assertEquals(1.0, stats.getMaxBackPressureRatio(), 0.0);
+
+									for (int i = 0; i < parallelism; i++) {
+										assertEquals(1.0, stats.getBackPressureRatio(i), 0.0);
+									}
+
+									nextSampleId = stats.getSampleId() + 1;
+
+									break;
+								} catch (Throwable t) {
+									if (attempt == numAttempts - 1) {
+										throw t;
+									} else {
+										Thread.sleep(500);
+									}
+								}
+							}
+
+							//
+							// 2) Release all buffers and let the tasks grab one
+							//
+							for (Buffer buf : buffers) {
+								buf.recycle();
+							}
+
+							// Wait for all buffers to be available. The tasks
+							// grab them and then immediately release them.
+							while (testBufferPool.getNumberOfAvailableMemorySegments() < 100) {
+								Thread.sleep(100);
+							}
+
+							// Verify that no task is back pressured any more.
+							for (int attempt = 0; attempt < numAttempts; attempt++) {
+								try {
+									OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
+
+									assertEquals(nextSampleId + attempt, stats.getSampleId());
+									assertEquals(parallelism, stats.getNumberOfSubTasks());
+
+									// Verify that no task is back pressured
+									for (int i = 0; i < parallelism; i++) {
+										assertEquals(0.0, stats.getBackPressureRatio(i), 0.0);
+									}
+
+									break;
+								} catch (Throwable t) {
+									if (attempt == numAttempts - 1) {
+										throw t;
+									} else {
+										Thread.sleep(500);
+									}
+								}
+							}
+
+							// Shut down
+							jm.tell(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), testActor);
+
+							// Cancel job
+							jm.tell(new JobManagerMessages.CancelJob(jobGraph.getJobID()));
+
+							// Response to removal notification
+							expectMsgEquals(true);
+						} catch (Exception e) {
+							e.printStackTrace();
+							fail(e.getMessage());
+						}
+					}
+				};
+			} finally {
+				TestingUtils.stopActor(jobManger);
+				TestingUtils.stopActor(taskManager);
+
+				for (Buffer buf : buffers) {
+					buf.recycle();
+				}
+
+				testBufferPool.lazyDestroy();
+			}
+		}};
+	}
+
+	/**
+	 * Triggers a new stats sample.
+	 */
+	private OperatorBackPressureStats triggerStatsSample(
+			BackPressureStatsTracker statsTracker,
+			ExecutionJobVertex vertex) throws InterruptedException {
+
+		statsTracker.invalidateOperatorStatsCache();
+		statsTracker.triggerStackTraceSample(vertex);
+
+		// Sleep minimum duration
+		Thread.sleep(20 * 10);
+
+		Option<OperatorBackPressureStats> stats;
+
+		// Get the stats
+		while ((stats = statsTracker.getOperatorBackPressureStats(vertex)).isEmpty()) {
+			Thread.sleep(10);
+		}
+
+		return stats.get();
+	}
+
+	/**
+	 * A back pressured producer sharing a {@link BufferPool} with the
+	 * test driver.
+	 */
+	public static class BackPressuredTask extends AbstractInvokable {
+
+		@Override
+		public void invoke() throws Exception {
+			while (true) {
+				Buffer buffer = testBufferPool.requestBufferBlocking();
+				// Got a buffer, yay!
+				buffer.recycle();
+
+				new CountDownLatch(1).await();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
new file mode 100644
index 0000000..b0955e1
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTrackerTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.junit.Test;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.duration.FiniteDuration;
+import scala.concurrent.impl.Promise;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class BackPressureStatsTrackerTest {
+
+	/** Tests simple statistics with fake stack traces. */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testTriggerStackTraceSample() throws Exception {
+		Promise<StackTraceSample> samplePromise = new Promise.DefaultPromise<>();
+
+		StackTraceSampleCoordinator sampleCoordinator = mock(StackTraceSampleCoordinator.class);
+		when(sampleCoordinator.triggerStackTraceSample(
+				any(ExecutionVertex[].class),
+				anyInt(),
+				any(FiniteDuration.class),
+				anyInt())).thenReturn(samplePromise.future());
+
+		ExecutionGraph graph = mock(ExecutionGraph.class);
+
+		// Same Thread execution context
+		when(graph.getExecutionContext()).thenReturn(new ExecutionContext() {
+
+			@Override
+			public void execute(Runnable runnable) {
+				runnable.run();
+			}
+
+			@Override
+			public void reportFailure(Throwable t) {
+				fail();
+			}
+
+			@Override
+			public ExecutionContext prepare() {
+				return this;
+			}
+		});
+
+		ExecutionVertex[] taskVertices = new ExecutionVertex[4];
+
+		ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
+		when(jobVertex.getJobId()).thenReturn(new JobID());
+		when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
+		when(jobVertex.getGraph()).thenReturn(graph);
+		when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
+
+		taskVertices[0] = mockExecutionVertex(jobVertex, 0);
+		taskVertices[1] = mockExecutionVertex(jobVertex, 1);
+		taskVertices[2] = mockExecutionVertex(jobVertex, 2);
+		taskVertices[3] = mockExecutionVertex(jobVertex, 3);
+
+		int numSamples = 100;
+		FiniteDuration delayBetweenSamples = new FiniteDuration(100, TimeUnit.MILLISECONDS);
+
+		BackPressureStatsTracker tracker = new BackPressureStatsTracker(
+				sampleCoordinator, 9999, numSamples, delayBetweenSamples);
+
+		// Trigger
+		tracker.triggerStackTraceSample(jobVertex);
+
+		verify(sampleCoordinator).triggerStackTraceSample(
+				eq(taskVertices),
+				eq(numSamples),
+				eq(delayBetweenSamples),
+				eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
+
+		// Trigger again for pending request, should not fire
+		tracker.triggerStackTraceSample(jobVertex);
+
+		assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isEmpty());
+
+		verify(sampleCoordinator).triggerStackTraceSample(
+				eq(taskVertices),
+				eq(numSamples),
+				eq(delayBetweenSamples),
+				eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
+
+		assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isEmpty());
+
+		// Complete the future
+		Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new HashMap<>();
+		for (ExecutionVertex vertex : taskVertices) {
+			List<StackTraceElement[]> taskTraces = new ArrayList<>();
+
+			for (int i = 0; i < taskVertices.length; i++) {
+				// Traces until sub task index are back pressured
+				taskTraces.add(createStackTrace(i <= vertex.getParallelSubtaskIndex()));
+			}
+
+			traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces);
+		}
+
+		int sampleId = 1231;
+		int endTime = 841;
+
+		StackTraceSample sample = new StackTraceSample(
+				sampleId,
+				0,
+				endTime,
+				traces);
+
+		// Succeed the promise
+		samplePromise.success(sample);
+
+		assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isDefined());
+
+		OperatorBackPressureStats stats = tracker.getOperatorBackPressureStats(jobVertex).get();
+
+		// Verify the stats
+		assertEquals(sampleId, stats.getSampleId());
+		assertEquals(endTime, stats.getEndTimestamp());
+		assertEquals(taskVertices.length, stats.getNumberOfSubTasks());
+		
+		for (int i = 0; i < taskVertices.length; i++) {
+			double ratio = stats.getBackPressureRatio(i);
+			// Traces until sub task index are back pressured
+			assertEquals((i + 1) / ((double) 4), ratio, 0.0);
+		}
+	}
+
+	private StackTraceElement[] createStackTrace(boolean isBackPressure) {
+		if (isBackPressure) {
+			return new StackTraceElement[] { new StackTraceElement(
+					BackPressureStatsTracker.EXPECTED_CLASS_NAME,
+					BackPressureStatsTracker.EXPECTED_METHOD_NAME,
+					"LocalBufferPool.java",
+					133) };
+		} else {
+			return Thread.currentThread().getStackTrace();
+		}
+	}
+
+	private ExecutionVertex mockExecutionVertex(
+			ExecutionJobVertex jobVertex,
+			int subTaskIndex) {
+
+		Execution exec = mock(Execution.class);
+		when(exec.getAttemptId()).thenReturn(new ExecutionAttemptID());
+
+		JobVertexID id = jobVertex.getJobVertexId();
+
+		ExecutionVertex vertex = mock(ExecutionVertex.class);
+		when(vertex.getJobvertexId()).thenReturn(id);
+		when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+		when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex);
+
+		return vertex;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b7e70da3/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
new file mode 100644
index 0000000..406197c
--- /dev/null
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for the {@link StackTraceSampleCoordinator}.
+ */
+public class StackTraceSampleCoordinatorTest {
+
+	private static ActorSystem system;
+
+	private StackTraceSampleCoordinator coord;
+
+	@BeforeClass
+	public static void setUp() throws Exception {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	}
+
+	@AfterClass
+	public static void tearDown() throws Exception {
+		if (system != null) {
+			system.shutdown();
+		}
+	}
+
+	@Before
+	public void init() throws Exception {
+		this.coord = new StackTraceSampleCoordinator(system, 60000);
+	}
+
+	/** Tests simple trigger and collect of stack trace samples. */
+	@Test
+	public void testTriggerStackTraceSample() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)
+		};
+
+		int numSamples = 1;
+		FiniteDuration delayBetweenSamples = new FiniteDuration(100, TimeUnit.MILLISECONDS);
+		int maxStackTraceDepth = 0;
+
+		Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+				vertices, numSamples, delayBetweenSamples, maxStackTraceDepth);
+
+		// Verify messages have been sent
+		for (ExecutionVertex vertex : vertices) {
+			ExecutionAttemptID expectedExecutionId = vertex
+					.getCurrentExecutionAttempt().getAttemptId();
+
+			TriggerStackTraceSample expectedMsg = new TriggerStackTraceSample(
+					0,
+					expectedExecutionId,
+					numSamples,
+					delayBetweenSamples,
+					maxStackTraceDepth);
+
+			verify(vertex).sendMessageToCurrentExecution(
+					eq(expectedMsg), eq(expectedExecutionId), any(AkkaActorGateway.class));
+		}
+
+		assertFalse(sampleFuture.isCompleted());
+
+		StackTraceElement[] stackTraceSample = Thread.currentThread().getStackTrace();
+		List<StackTraceElement[]> traces = new ArrayList<>();
+		traces.add(stackTraceSample);
+		traces.add(stackTraceSample);
+		traces.add(stackTraceSample);
+
+		// Collect stack traces
+		for (int i = 0; i < vertices.length; i++) {
+			ExecutionAttemptID executionId = vertices[i].getCurrentExecutionAttempt().getAttemptId();
+			coord.collectStackTraces(0, executionId, traces);
+
+			if (i == vertices.length - 1) {
+				assertTrue(sampleFuture.isCompleted());
+			} else {
+				assertFalse(sampleFuture.isCompleted());
+			}
+		}
+
+		// Verify completed stack trace sample
+		StackTraceSample sample = sampleFuture.value().get().get();
+
+		assertEquals(0, sample.getSampleId());
+		assertTrue(sample.getEndTime() >= sample.getStartTime());
+
+		Map<ExecutionAttemptID, List<StackTraceElement[]>> tracesByTask = sample.getStackTraces();
+
+		for (ExecutionVertex vertex : vertices) {
+			ExecutionAttemptID executionId = vertex.getCurrentExecutionAttempt().getAttemptId();
+			List<StackTraceElement[]> sampleTraces = tracesByTask.get(executionId);
+
+			assertNotNull("Task not found", sampleTraces);
+			assertTrue(traces.equals(sampleTraces));
+		}
+
+		// Verify no more pending sample
+		assertEquals(0, coord.getNumberOfPendingSamples());
+
+		// Verify no error on late collect
+		coord.collectStackTraces(0, vertices[0].getCurrentExecutionAttempt().getAttemptId(), traces);
+	}
+
+	/** Tests triggering for non-running tasks fails the future. */
+	@Test
+	public void testTriggerStackTraceSampleNotRunningTasks() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.DEPLOYING, true)
+		};
+
+		Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+				vertices,
+				1,
+				new FiniteDuration(100, TimeUnit.MILLISECONDS),
+				0);
+
+		assertTrue(sampleFuture.isCompleted());
+		assertTrue(sampleFuture.failed().isCompleted());
+
+		assertTrue(sampleFuture.failed().value().get().get() instanceof IllegalStateException);
+	}
+
+	/** Tests triggering for reset tasks fails the future. */
+	@Test
+	public void testTriggerStackTraceSampleResetRunningTasks() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+				// Fails to send the message to the execution (happens when execution is reset)
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, false)
+		};
+
+		Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+				vertices,
+				1,
+				new FiniteDuration(100, TimeUnit.MILLISECONDS),
+				0);
+
+		assertTrue(sampleFuture.isCompleted());
+		assertTrue(sampleFuture.failed().isCompleted());
+		assertTrue(sampleFuture.failed().value().get().get().getCause() instanceof RuntimeException);
+	}
+
+	/** Tests that samples time out if they don't finish in time. */
+	@Test
+	public void testTriggerStackTraceSampleTimeout() throws Exception {
+		int timeout = 100;
+
+		coord = new StackTraceSampleCoordinator(system, timeout);
+
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+		};
+
+		Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+				vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
+
+		// Wait for the timeout
+		Thread.sleep(timeout * 2);
+
+		boolean success = false;
+		for (int i = 0; i < 10; i++) {
+			if (sampleFuture.isCompleted()) {
+				success = true;
+				break;
+			}
+
+			Thread.sleep(timeout);
+		}
+
+		assertTrue("Sample did not time out", success);
+
+		Throwable cause = sampleFuture.failed().value().get().get();
+		assertTrue(cause.getCause().getMessage().contains("Time out"));
+
+		// Collect after the timeout
+		try {
+			ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
+			coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
+			fail("Did not throw expected Exception");
+		} catch (IllegalStateException ignored) {
+		}
+	}
+
+	/** Tests that collecting an unknown sample fails. */
+	@Test(expected = IllegalStateException.class)
+	public void testCollectStackTraceForUnknownSample() throws Exception {
+		coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>());
+	}
+
+	/** Tests cancelling of a pending sample. */
+	@Test
+	public void testCancelStackTraceSample() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+		};
+
+		Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+				vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
+
+		assertFalse(sampleFuture.isCompleted());
+
+		// Cancel
+		coord.cancelStackTraceSample(0, null);
+
+		// Verify completed
+		assertTrue(sampleFuture.isCompleted());
+
+		// Verify no more pending samples
+		assertEquals(0, coord.getNumberOfPendingSamples());
+	}
+
+	/** Tests that collecting for a cancelled sample throws no Exception. */
+	@Test
+	public void testCollectStackTraceForCanceledSample() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+		};
+
+		Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+				vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
+
+		assertFalse(sampleFuture.isCompleted());
+
+		coord.cancelStackTraceSample(0, null);
+
+		assertTrue(sampleFuture.isCompleted());
+
+		// Verify no error on late collect
+		ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
+		coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
+	}
+
+	/** Tests that collecting for a cancelled sample throws no Exception. */
+	@Test
+	public void testCollectForDiscardedPendingSample() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+		};
+
+		Future<StackTraceSample> sampleFuture = coord.triggerStackTraceSample(
+				vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
+
+		assertFalse(sampleFuture.isCompleted());
+
+		coord.cancelStackTraceSample(0, null);
+
+		assertTrue(sampleFuture.isCompleted());
+
+		// Verify no error on late collect
+		ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
+		coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
+	}
+
+
+	/** Tests that collecting for a unknown task fails. */
+	@Test(expected = IllegalArgumentException.class)
+	public void testCollectStackTraceForUnknownTask() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+		};
+
+		coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
+
+		coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>());
+	}
+
+	/** Tests that shut down fails all pending samples and future sample triggers. */
+	@Test
+	public void testShutDown() throws Exception {
+		ExecutionVertex[] vertices = new ExecutionVertex[] {
+				mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true),
+		};
+
+		List<Future<StackTraceSample>> sampleFutures = new ArrayList<>();
+
+		// Trigger
+		sampleFutures.add(coord.triggerStackTraceSample(
+				vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0));
+
+		sampleFutures.add(coord.triggerStackTraceSample(
+				vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0));
+
+		for (Future<StackTraceSample> future : sampleFutures) {
+			assertFalse(future.isCompleted());
+		}
+
+		// Shut down
+		coord.shutDown();
+
+		// Verify all completed
+		for (Future<StackTraceSample> future : sampleFutures) {
+			assertTrue(future.isCompleted());
+		}
+
+		// Verify new trigger returns failed future
+		Future<StackTraceSample> future = coord.triggerStackTraceSample(
+				vertices, 1, new FiniteDuration(100, TimeUnit.MILLISECONDS), 0);
+
+		assertTrue(future.isCompleted());
+		assertTrue(future.failed().isCompleted());
+	}
+
+	// ------------------------------------------------------------------------
+
+	private ExecutionVertex mockExecutionVertex(
+			ExecutionAttemptID executionId,
+			ExecutionState state,
+			boolean sendSuccess) {
+
+		Execution exec = mock(Execution.class);
+		when(exec.getAttemptId()).thenReturn(executionId);
+		when(exec.getState()).thenReturn(state);
+
+		ExecutionVertex vertex = mock(ExecutionVertex.class);
+		when(vertex.getJobvertexId()).thenReturn(new JobVertexID());
+		when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+		when(vertex.sendMessageToCurrentExecution(
+				any(Serializable.class), any(ExecutionAttemptID.class), any(AkkaActorGateway.class)))
+				.thenReturn(sendSuccess);
+
+		return vertex;
+	}
+}


Mime
View raw message