flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/2] flink git commit: Revert "Add more through test verification"
Date Mon, 05 Mar 2018 19:07:30 GMT
Revert "Add more through test verification"

This reverts commit bddd910b526985f6c2b1a76ccc41539676468e6f which was
added to the release-1.4 branch by mistake.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7f8b0b81
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7f8b0b81
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7f8b0b81

Branch: refs/heads/release-1.4
Commit: 7f8b0b81699f6ddfdcc4d69710e47b58b4d0d79f
Parents: 8e1987d
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Mon Mar 5 20:05:23 2018 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Mar 5 20:06:22 2018 +0100

----------------------------------------------------------------------
 .../ZooKeeperHighAvailabilityITCase.java        | 22 +++++---------------
 1 file changed, 5 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7f8b0b81/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
index 947c42b..4fd8ea9 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ZooKeeperHighAvailabilityITCase.java
@@ -62,7 +62,6 @@ import java.nio.file.Files;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import scala.concurrent.Await;
@@ -70,9 +69,7 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -97,12 +94,7 @@ public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
 	// once with isRestored() == false. All other invocations must have isRestored() == true.
This
 	// verifies that we don't restart a job from scratch in case the CompletedCheckpoints can't
 	// be read.
-	private static AtomicInteger allowedInitializeCallsWithoutRestore = new AtomicInteger(1);
-
-	// in CheckpointBlockingFunction we count when we see restores that are not allowed. We
only
-	// allow restores once we messed with the HA directory and moved it back again
-	private static AtomicInteger illegalRestores = new AtomicInteger(0);
-	private static AtomicBoolean restoreAllowed = new AtomicBoolean(false);
+	private static AtomicInteger allowedRestores = new AtomicInteger(1);
 
 	private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class);
 	protected LocalFlinkMiniCluster cluster = null;
@@ -159,7 +151,7 @@ public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
 
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(1);
-		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 0));
+		env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 100L));
 		env.enableCheckpointing(10); // Flink doesn't allow lower than 10 ms
 
 		File checkpointLocation = temporaryFolder.newFolder();
@@ -194,6 +186,8 @@ public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
 
 		failInCheckpointLatch.trigger();
 
+		Thread.sleep(2000);
+
 		// Ensure that we see at least one cycle where the job tries to restart and fails.
 		CompletableFuture<JobStatus> jobStatusFuture = FutureUtils.retrySuccesfulWithDelay(
 			() -> getJobStatus(jobManager, jobID, TEST_TIMEOUT),
@@ -216,7 +210,6 @@ public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
 			jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
 
 		// move back the HA directory so that the job can restore
-		restoreAllowed.set(true);
 		Files.move(movedCheckpointLocation.toPath(), haStorageDir.toPath());
 
 		// now the job should be able to go to RUNNING again
@@ -246,8 +239,6 @@ public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
 		assertEquals(
 			JobStatus.CANCELED,
 			jobStatusFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
-
-		assertThat("We saw illegal restores.", illegalRestores.get(), is(0));
 	}
 
 	/**
@@ -320,14 +311,11 @@ public class ZooKeeperHighAvailabilityITCase extends TestBaseUtils {
 		@Override
 		public void initializeState(FunctionInitializationContext context) {
 			if (!context.isRestored()) {
-				int updatedValue = allowedInitializeCallsWithoutRestore.decrementAndGet();
+				int updatedValue = allowedRestores.decrementAndGet();
 				if (updatedValue < 0) {
 					throw new RuntimeException("We are not allowed any more restores.");
 				}
 			} else {
-				if (!restoreAllowed.get()) {
-					illegalRestores.getAndIncrement();
-				}
 				successfulRestoreLatch.trigger();
 			}
 		}


Mime
View raw message