flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/2] flink git commit: [FLINK-8575][runtime] Add missing synchronization in BackPressureStatsTracker
Date Thu, 08 Feb 2018 13:54:49 GMT
[FLINK-8575][runtime] Add missing synchronization in BackPressureStatsTracker

Operations in method getOperatorBackPressureStats must appear atomic otherwise
the stack trace can be sampled multiple times.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c85d5e3b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c85d5e3b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c85d5e3b

Branch: refs/heads/master
Commit: c85d5e3bc2dcca21cc82b92e71a700dddeedfbb1
Parents: c212701
Author: gyao <gary@data-artisans.com>
Authored: Wed Feb 7 12:25:35 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Feb 8 12:40:29 2018 +0100

----------------------------------------------------------------------
 .../legacy/backpressure/BackPressureStatsTracker.java     | 10 ++++++----
 1 file changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c85d5e3b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
index b2b57e7..8c130e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
@@ -155,11 +155,13 @@ public class BackPressureStatsTracker {
 	 * @return Back pressure statistics for an operator
 	 */
 	public Optional<OperatorBackPressureStats> getOperatorBackPressureStats(ExecutionJobVertex
vertex) {
-		final OperatorBackPressureStats stats = operatorStatsCache.getIfPresent(vertex);
-		if (stats == null || backPressureStatsRefreshInterval <= System.currentTimeMillis()
- stats.getEndTimestamp()) {
-			triggerStackTraceSampleInternal(vertex);
+		synchronized (lock) {
+			final OperatorBackPressureStats stats = operatorStatsCache.getIfPresent(vertex);
+			if (stats == null || backPressureStatsRefreshInterval <= System.currentTimeMillis()
- stats.getEndTimestamp()) {
+				triggerStackTraceSampleInternal(vertex);
+			}
+			return Optional.ofNullable(stats);
 		}
-		return Optional.ofNullable(stats);
 	}
 
 	/**


Mime
View raw message