flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-8575][runtime] Add missing synchronization in BackPressureStatsTracker
Date Thu, 08 Feb 2018 13:54:48 GMT
Repository: flink
Updated Branches:
  refs/heads/master c212701d5 -> 9435370e7


[FLINK-8575][runtime] Add missing synchronization in BackPressureStatsTracker

Make triggerStackTraceSampleInternal private again and add locking to
triggerStackTraceSample.

This closes #5422.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9435370e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9435370e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9435370e

Branch: refs/heads/master
Commit: 9435370e76098f8ea3b689411c085c82a253a6d3
Parents: c85d5e3
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Feb 8 12:38:53 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Feb 8 12:40:29 2018 +0100

----------------------------------------------------------------------
 .../backpressure/BackPressureStatsTracker.java  | 54 ++++++++++----------
 .../BackPressureStatsTrackerTest.java           |  8 ++-
 2 files changed, 30 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9435370e/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
index 8c130e6..ec8a451 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rest.handler.legacy.backpressure;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
@@ -172,40 +171,39 @@ public class BackPressureStatsTracker {
 	 * @param vertex Operator to get the stats for.
 	 * @return Flag indicating whether a sample with triggered.
 	 */
-	@VisibleForTesting
-	boolean triggerStackTraceSampleInternal(final ExecutionJobVertex vertex) {
-		synchronized (lock) {
-			if (shutDown) {
-				return false;
-			}
+	private boolean triggerStackTraceSampleInternal(final ExecutionJobVertex vertex) {
+		assert(Thread.holdsLock(lock));
 
-			if (!pendingStats.contains(vertex) &&
-				!vertex.getGraph().getState().isGloballyTerminalState()) {
+		if (shutDown) {
+			return false;
+		}
 
-				Executor executor = vertex.getGraph().getFutureExecutor();
+		if (!pendingStats.contains(vertex) &&
+			!vertex.getGraph().getState().isGloballyTerminalState()) {
 
-				// Only trigger if still active job
-				if (executor != null) {
-					pendingStats.add(vertex);
+			Executor executor = vertex.getGraph().getFutureExecutor();
 
-					if (LOG.isDebugEnabled()) {
-						LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
-					}
+			// Only trigger if still active job
+			if (executor != null) {
+				pendingStats.add(vertex);
 
-					CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample(
-						vertex.getTaskVertices(),
-						numSamples,
-						delayBetweenSamples,
-						MAX_STACK_TRACE_DEPTH);
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
+				}
 
-					sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor);
+				CompletableFuture<StackTraceSample> sample = coordinator.triggerStackTraceSample(
+					vertex.getTaskVertices(),
+					numSamples,
+					delayBetweenSamples,
+					MAX_STACK_TRACE_DEPTH);
 
-					return true;
-				}
-			}
+				sample.handleAsync(new StackTraceSampleCompletionCallback(vertex), executor);
 
-			return false;
+				return true;
+			}
 		}
+
+		return false;
 	}
 
 	/**
@@ -220,7 +218,9 @@ public class BackPressureStatsTracker {
 	 */
 	@Deprecated
 	public boolean triggerStackTraceSample(ExecutionJobVertex vertex) {
-		return triggerStackTraceSampleInternal(vertex);
+		synchronized (lock) {
+			return triggerStackTraceSampleInternal(vertex);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9435370e/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
index debf71d..0bbf5f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
@@ -93,18 +93,16 @@ public class BackPressureStatsTrackerTest extends TestLogger {
 		// getOperatorBackPressureStats triggers stack trace sampling
 		Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
 
-		Mockito.verify(sampleCoordinator).triggerStackTraceSample(
+		Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample(
 				Matchers.eq(taskVertices),
 				Matchers.eq(numSamples),
 				Matchers.eq(delayBetweenSamples),
 				Matchers.eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
 
-		// Trigger again for pending request, should not fire
-		Assert.assertFalse("Unexpected trigger", tracker.triggerStackTraceSampleInternal(jobVertex));
-
+		// Request back pressure stats again. This should not trigger another sample request
 		Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
 
-		Mockito.verify(sampleCoordinator).triggerStackTraceSample(
+		Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample(
 				Matchers.eq(taskVertices),
 				Matchers.eq(numSamples),
 				Matchers.eq(delayBetweenSamples),


Mime
View raw message