flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-3342] [runtime] Fix checkpoint statistics state size overflow
Date Fri, 05 Feb 2016 10:52:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master b953a9b6a -> 8e3e2f8f4


[FLINK-3342] [runtime] Fix checkpoint statistics state size overflow


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e3e2f8f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e3e2f8f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e3e2f8f

Branch: refs/heads/master
Commit: 8e3e2f8f4704d1f45e165cc41bd6d590e0ab0f63
Parents: b953a9b
Author: Ufuk Celebi <uce@apache.org>
Authored: Fri Feb 5 11:51:45 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Fri Feb 5 11:51:45 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/stats/SimpleCheckpointStatsTracker.java    |  2 +-
 .../stats/SimpleCheckpointStatsTrackerTest.java           | 10 +++++++---
 2 files changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8e3e2f8f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index fba3f22..5ee4fc3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -136,7 +136,7 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker
{
 		}
 
 		synchronized (statsLock) {
-			int overallStateSize = 0;
+			long overallStateSize = 0;
 
 			// Operator stats
 			Map<JobVertexID, long[][]> statsForSubTasks = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/8e3e2f8f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index 2abd56d..56228ef 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -40,7 +40,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -282,7 +281,12 @@ public class SimpleCheckpointStatsTrackerTest {
 		int maxNumOperators = 32;
 		int minParallelism = 4;
 		int maxParallelism = 16;
-		int maxStateSize = 32 * 1024;
+
+		// Use yuge numbers here in order to test that summing up state sizes
+		// does not overflow. This was a bug in the initial version, because
+		// the individual state sizes (longs) were summed up in an int.
+		long minStateSize = Integer.MAX_VALUE;
+		long maxStateSize = Long.MAX_VALUE;
 		CompletedCheckpoint[] checkpoints = new CompletedCheckpoint[numCheckpoints];
 
 		int numOperators = RAND.nextInt(maxNumOperators - minNumOperators + 1) + minNumOperators;
@@ -320,7 +324,7 @@ public class SimpleCheckpointStatsTrackerTest {
 
 					states.add(new StateForTask(
 							new SerializedValue<StateHandle<?>>(null),
-							RAND.nextInt(maxStateSize + 1),
+							minStateSize + ((long) (RAND.nextDouble() * (maxStateSize - minStateSize))),
 							operatorId,
 							subtaskIndex,
 							duration));


Mime
View raw message