flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [hotfix] Improved test stability of RescalingITCase
Date Mon, 31 Oct 2016 13:03:06 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9f4be44c2 -> 094b747a3


[hotfix] Improved test stability of RescalingITCase

This closes #2728.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/094b747a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/094b747a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/094b747a

Branch: refs/heads/master
Commit: 094b747a39906f01f6a8b92233a5a8011618e641
Parents: 9f4be44
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Mon Oct 31 00:36:58 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Oct 31 14:02:59 2016 +0100

----------------------------------------------------------------------
 .../test/checkpointing/RescalingITCase.java      | 19 +++++++------------
 1 file changed, 7 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/094b747a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 4819c26..da25ae6 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -53,7 +53,6 @@ import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import scala.Option;
@@ -262,17 +261,11 @@ public class RescalingITCase extends TestLogger {
 			// wait until the operator is started
 			StateSourceBase.workStartedLatch.await();
 
-			while (deadline.hasTimeLeft()) {
-				Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID,
Option.<String>empty()), deadline.timeLeft());
-				FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS);
-				savepointResponse = Await.result(savepointPathFuture, waitingTime);
-
-				if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) {
-					break;
-				}
-			}
+			Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID,
Option.<String>empty()), deadline.timeLeft());
+			FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS);
+			savepointResponse = Await.result(savepointPathFuture, waitingTime);
 
-			assertTrue(savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess);
+			assertTrue(String.valueOf(savepointResponse), savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess);
 
 			final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess)savepointResponse).savepointPath();
 
@@ -585,7 +578,7 @@ public class RescalingITCase extends TestLogger {
 		env.enableCheckpointing(Long.MAX_VALUE);
 		env.setRestartStrategy(RestartStrategies.noRestart());
 
-		StateSourceBase.workStartedLatch = new CountDownLatch(1);
+		StateSourceBase.workStartedLatch = new CountDownLatch(parallelism);
 
 		SourceFunction<Integer> src;
 
@@ -922,6 +915,8 @@ public class RescalingITCase extends TestLogger {
 		@Override
 		public void snapshotState(FunctionSnapshotContext context) throws Exception {
 
+			counterPartitions.clear();
+
 			CHECK_CORRECT_SNAPSHOT[getRuntimeContext().getIndexOfThisSubtask()] = counter;
 
 			int div = counter / NUM_PARTITIONS;


Mime
View raw message