flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-3535] [runtime-web] Decrease log verbosity of StackTraceSampleCoordinator
Date Mon, 29 Feb 2016 19:10:08 GMT
Repository: flink
Updated Branches:
  refs/heads/master 734ba01dd -> 9580b8fe5


[FLINK-3535] [runtime-web] Decrease log verbosity of StackTraceSampleCoordinator

This closes #1732.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9580b8fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9580b8fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9580b8fe

Branch: refs/heads/master
Commit: 9580b8fe5a5ec8b3b23ffa7e09123b1e160e2016
Parents: 734ba01
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Feb 29 14:14:35 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Feb 29 20:09:26 2016 +0100

----------------------------------------------------------------------
 .../runtime/webmonitor/BackPressureStatsTracker.java  |  9 +++++++--
 .../webmonitor/StackTraceSampleCoordinator.java       |  8 ++++++--
 .../webmonitor/StackTraceSampleCoordinatorTest.java   | 14 +++++---------
 .../runtime/messages/StackTraceSampleMessages.scala   |  6 +++++-
 4 files changed, 23 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9580b8fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
index b9b8a47..db88ffd 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
@@ -33,6 +33,7 @@ import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -170,6 +171,10 @@ public class BackPressureStatsTracker {
 				if (executionContext != null) {
 					pendingStats.add(vertex);
 
+					if (LOG.isDebugEnabled()) {
+						LOG.debug("Triggering stack trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
+					}
+
 					Future<StackTraceSample> sample = coordinator.triggerStackTraceSample(
 							vertex.getTaskVertices(),
 							numSamples,
@@ -246,7 +251,7 @@ public class BackPressureStatsTracker {
 						OperatorBackPressureStats stats = createStatsFromSample(success);
 						operatorStatsCache.put(vertex, stats);
 					} else {
-						LOG.warn("Failed to gather stack trace sample.", failure);
+						LOG.debug("Failed to gather stack trace sample.", failure);
 					}
 				} catch (Throwable t) {
 					LOG.error("Error during stats completion.", t);
@@ -278,7 +283,7 @@ public class BackPressureStatsTracker {
 				if (sampledTasks.contains(taskId)) {
 					subtaskIndexMap.put(taskId, task.getParallelSubtaskIndex());
 				} else {
-					throw new RuntimeException("Outdated sample. A task, which is part of the " +
+					LOG.debug("Outdated sample. A task, which is part of the " +
 							"sample has been reset.");
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/9580b8fe/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
index e7b292f..bbfb530 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
@@ -179,7 +179,9 @@ public class StackTraceSampleCoordinator {
 										pending.getSampleId());
 
 								pending.discard(new RuntimeException("Time out"));
-								pendingSamples.remove(pending.getSampleId());
+								if (pendingSamples.remove(pending.getSampleId()) != null) {
+									rememberRecentSampleId(pending.getSampleId());
+								}
 							}
 						}
 					} catch (Throwable t) {
@@ -319,7 +321,9 @@ public class StackTraceSampleCoordinator {
 							sampleId, executionId);
 				}
 			} else {
-				throw new IllegalStateException("Unknown sample ID " + sampleId);
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Unknown sample ID " + sampleId);
+				}
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9580b8fe/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
index 406197c..29345a6 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.java
@@ -226,17 +226,13 @@ public class StackTraceSampleCoordinatorTest {
 		Throwable cause = sampleFuture.failed().value().get().get();
 		assertTrue(cause.getCause().getMessage().contains("Time out"));
 
-		// Collect after the timeout
-		try {
-			ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
-			coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
-			fail("Did not throw expected Exception");
-		} catch (IllegalStateException ignored) {
-		}
+		// Collect after the timeout (should be ignored)
+		ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
+		coord.collectStackTraces(0, executionId, new ArrayList<StackTraceElement[]>());
 	}
 
-	/** Tests that collecting an unknown sample fails. */
-	@Test(expected = IllegalStateException.class)
+	/** Tests that collecting an unknown sample is ignored. */
+	@Test
 	public void testCollectStackTraceForUnknownSample() throws Exception {
 		coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList<StackTraceElement[]>());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9580b8fe/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
index 9f2e6e9..c3c26dc 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/StackTraceSampleMessages.scala
@@ -63,7 +63,11 @@ object StackTraceSampleMessages {
       sampleId: Int,
       executionId: ExecutionAttemptID,
       samples: java.util.List[Array[StackTraceElement]])
-    extends StackTraceSampleMessages
+    extends StackTraceSampleMessages {
+
+    override def toString: String =
+      s"ResponseStackTraceSampleSuccess($sampleId, $executionId, ${samples.size()} samples)"
+  }
 
   /**
     * Response after a failed stack trace sample (sent by the task managers to


Mime
View raw message