flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [flink] echauchot commented on a change in pull request #13040: [FLINK-17073] [checkpointing] checkpointing backpressure if there are too many checkpoints to clean
Date Mon, 28 Sep 2020 11:36:40 GMT

echauchot commented on a change in pull request #13040:
URL: https://github.com/apache/flink/pull/13040#discussion_r495873680



##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
##########
@@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws Exception {
 		recoveredTestCheckpoint.awaitDiscard();
 	}
 
+	/**
+	 * FLINK-17073 tests that there is no request triggered when there are too many checkpoints
+	 * waiting to clean and that it resumes when the number of waiting checkpoints as gone below
+	 * the threshold.
+	 *
+	 */
+	@Test
+	public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() throws Exception{
+		ManualClock clock = new ManualClock();
+		clock.advanceTime(1, TimeUnit.DAYS);
+		int maxCleaningCheckpoints = 1;
+		CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
+		CheckpointRequestDecider checkpointRequestDecider =  new CheckpointRequestDecider(maxCleaningCheckpoints,
unused ->{}, clock, 1, new AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean);
+
+		final int maxCheckpointsToRetain = 1;
+		Executors.PausableThreadPoolExecutor executor = Executors.pausableExecutor();
+		ZooKeeperCompletedCheckpointStore checkpointStore = createCompletedCheckpoints(maxCheckpointsToRetain,
executor);
+
+		//pause the executor to pause checkpoints cleaning, to allow assertions
+		executor.pause();
+
+		int nbCheckpointsToInject = 3;
+		for (int i = 1; i <= nbCheckpointsToInject; i++) {
+			// add checkpoints to clean
+			TestCompletedCheckpoint completedCheckpoint = new TestCompletedCheckpoint(new JobID(),
i,
+				i, Collections.emptyMap(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+				checkpointsCleaner::cleanCheckpoint);
+			checkpointStore.addCheckpoint(completedCheckpoint);
+		}
+
+		Thread.sleep(100L); // give time to submit checkpoints for cleaning
+
+		int nbCheckpointsSubmittedForCleaningByCheckpointStore = nbCheckpointsToInject - maxCheckpointsToRetain;
+		assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore, checkpointsCleaner.getNumberOfCheckpointsToClean());

Review comment:
       I mean, a test reflects the state of the production code at a given time, and at this
time a checkpoint cleaning task is submitted as a single runnable. Anyway, if the problem
is relying on internals of the executor, I can add await methods in the public API of the
executor as you suggested in a previous comment. That way the test author will know that the
test expects a certain number of submitted and triggered runnables. I pushed a commit in that
sense. Is this ok for you ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



Mime
View raw message