flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [4/4] flink git commit: [FLINK-3251] [runtime] Return empty stats for unknown operator
Date Mon, 18 Jan 2016 16:00:18 GMT
[FLINK-3251] [runtime] Return empty stats for unknown operator


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/117ba95f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/117ba95f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/117ba95f

Branch: refs/heads/master
Commit: 117ba95fb24f1ca9d62b77e53fffc7c0bf5b7b3c
Parents: 1c3bbe4
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Jan 18 14:55:20 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Jan 18 15:25:18 2016 +0100

----------------------------------------------------------------------
 .../stats/SimpleCheckpointStatsTracker.java     | 37 ++++++++++----------
 .../stats/SimpleCheckpointStatsTrackerTest.java |  7 +---
 2 files changed, 20 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/117ba95f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index 5881f4f..fba3f22 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -299,31 +299,32 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker
{
 				long[][] subTaskStats = this.subTaskStats.get(operatorId);
 
 				if (subTaskStats == null) {
-					throw new IllegalArgumentException("Unknown operator ID.");
+					return Option.empty();
 				}
+				else {
+					long maxDuration = Long.MIN_VALUE;
+					long stateSize = 0;
 
-				long maxDuration = Long.MIN_VALUE;
-				long stateSize = 0;
+					for (long[] subTaskStat : subTaskStats) {
+						if (subTaskStat[0] > maxDuration) {
+							maxDuration = subTaskStat[0];
+						}
 
-				for (long[] subTaskStat : subTaskStats) {
-					if (subTaskStat[0] > maxDuration) {
-						maxDuration = subTaskStat[0];
+						stateSize += subTaskStat[1];
 					}
 
-					stateSize += subTaskStat[1];
-				}
-
-				stats = new OperatorCheckpointStats(
-						latestCompletedCheckpoint.getCheckpointID(),
-						latestCompletedCheckpoint.getTimestamp(),
-						maxDuration,
-						stateSize,
-						subTaskStats);
+					stats = new OperatorCheckpointStats(
+							latestCompletedCheckpoint.getCheckpointID(),
+							latestCompletedCheckpoint.getTimestamp(),
+							maxDuration,
+							stateSize,
+							subTaskStats);
 
-				// Remember this and don't recompute if requested again
-				operatorStatsCache.put(operatorId, stats);
+					// Remember this and don't recompute if requested again
+					operatorStatsCache.put(operatorId, stats);
 
-				return Option.apply(stats);
+					return Option.apply(stats);
+				}
 			}
 			else {
 				return Option.empty();

http://git-wip-us.apache.org/repos/asf/flink/blob/117ba95f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index 00ba0a6..2abd56d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -85,12 +85,7 @@ public class SimpleCheckpointStatsTrackerTest {
 
 		assertTrue(tracker.getJobStats().isDefined());
 
-		try {
-			tracker.getOperatorStats(new JobVertexID());
-			fail("Did not throw expected Exception");
-		}
-		catch (IllegalArgumentException ignored) {
-		}
+		assertTrue(tracker.getOperatorStats(new JobVertexID()).isEmpty());
 	}
 
 	@Test


Mime
View raw message