flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject flink git commit: [FLINK-2671] [tests] Fix test instability in StreamCheckpointNotifierITCase
Date Fri, 11 Mar 2016 14:37:26 GMT
Repository: flink
Updated Branches:
  refs/heads/master c9e0761de -> 3bbe59c0e


[FLINK-2671] [tests] Fix test instability in StreamCheckpointNotifierITCase

Also add more logging, to help future test debugging


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bbe59c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bbe59c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bbe59c0

Branch: refs/heads/master
Commit: 3bbe59c0ecf71c2c3c4b2c7135c84a666e876f24
Parents: c9e0761
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Mar 11 15:30:32 2016 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Mar 11 15:36:52 2016 +0100

----------------------------------------------------------------------
 .../StreamCheckpointNotifierITCase.java         | 27 ++++++++++----------
 1 file changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3bbe59c0/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
index 9b14a96..cf15052 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointNotifierITCase.java
@@ -41,13 +41,14 @@ import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertFalse;
@@ -72,6 +73,8 @@ import static org.junit.Assert.fail;
  */
 @SuppressWarnings("serial")
 public class StreamCheckpointNotifierITCase extends TestLogger {
+
+	private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointNotifierITCase.class);
 	
 	private static final int NUM_TASK_MANAGERS = 2;
 	private static final int NUM_TASK_SLOTS = 3;
@@ -394,28 +397,23 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
 
 		static final List<Long>[] completedCheckpoints = createCheckpointLists(PARALLELISM);
 		
-		private final long numElements;
-
-		private long failurePos;
-		private long count;
+		private final long failurePos;
+		
+		private volatile long count;
 
 		private volatile boolean notificationAlready;
 		
 		OnceFailingReducer(long numElements) {
-			this.numElements = numElements;
-		}
-
-		@Override
-		public void open(Configuration parameters) {
-			long failurePosMin = (long) (0.4 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-			long failurePosMax = (long) (0.7 * numElements / getRuntimeContext().getNumberOfParallelSubtasks());
-
-			failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin;
+			this.failurePos = (long) (0.5 * numElements / PARALLELISM);
 		}
 
 		@Override
 		public Tuple1<Long> reduce(Tuple1<Long> value1, Tuple1<Long> value2)
{
 			count++;
+			if (count >= failurePos && getRuntimeContext().getIndexOfThisSubtask() == 0)
{
+				LOG.info(">>>>>>>>>>>>>>>>> Reached
failing position <<<<<<<<<<<<<<<<<<<<<");
+			}
+			
 			value1.f0 += value2.f0;
 			return value1;
 		}
@@ -423,6 +421,7 @@ public class StreamCheckpointNotifierITCase extends TestLogger {
 		@Override
 		public Long snapshotState(long checkpointId, long checkpointTimestamp) throws Exception
{
 			if (!hasFailed && count >= failurePos && getRuntimeContext().getIndexOfThisSubtask()
== 0) {
+				LOG.info(">>>>>>>>>>>>>>>>> Throwing
Exception <<<<<<<<<<<<<<<<<<<<<");
 				hasFailed = true;
 				failureCheckpointID = checkpointId;
 				throw new Exception("Test Failure");


Mime
View raw message