flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [hotfix] [tests] Increase stability of SavepointITCase
Date Tue, 24 Oct 2017 09:48:02 GMT
[hotfix] [tests] Increase stability of SavepointITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0d68d95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0d68d95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0d68d95

Branch: refs/heads/master
Commit: d0d68d9587a88c03ab5fa3f2db4804fbb548813a
Parents: 358bb6d
Author: Stephan Ewen <sewen@apache.org>
Authored: Sun Oct 15 20:54:01 2017 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Oct 24 11:47:10 2017 +0200

----------------------------------------------------------------------
 .../test/checkpointing/SavepointITCase.java     | 31 ++++++++++----------
 1 file changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0d68d95/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 0cd5184..c96758c 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -497,20 +497,20 @@ public class SavepointITCase extends TestLogger {
 		final File tmpDir = folder.getRoot();
 		final File savepointDir = new File(tmpDir, "savepoints");
 
-		TestingCluster flink = null;
+		// Flink configuration
+		final Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+		config.setString(CoreOptions.SAVEPOINT_DIRECTORY,
+				savepointDir.toURI().toString());
+
 		String savepointPath;
-		try {
-			// Flink configuration
-			final Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
-			config.setString(CoreOptions.SAVEPOINT_DIRECTORY,
-					savepointDir.toURI().toString());
 
-			LOG.info("Flink configuration: " + config + ".");
+		LOG.info("Flink configuration: " + config + ".");
 
-			// Start Flink
-			flink = new TestingCluster(config);
+		// Start Flink
+		TestingCluster flink = new TestingCluster(config);
+		try {
 			LOG.info("Starting Flink cluster.");
 			flink.start(true);
 
@@ -564,10 +564,12 @@ public class SavepointITCase extends TestLogger {
 		} finally {
 			// Shut down the Flink cluster (thereby canceling the job)
 			LOG.info("Shutting down Flink cluster.");
-			flink.shutdown();
-			flink.awaitTermination();
+			flink.stop();
 		}
 
+		// create a new TestingCluster to make sure we start with completely
+		// new resources
+		flink = new TestingCluster(config);
 		try {
 			LOG.info("Restarting Flink cluster.");
 			flink.start(true);
@@ -614,8 +616,7 @@ public class SavepointITCase extends TestLogger {
 			// Await some progress after restore
 			StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 		} finally {
-			flink.shutdown();
-			flink.awaitTermination();
+			flink.stop();
 		}
 	}
 


Mime
View raw message