flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/2] flink git commit: [FLINK-4322] [checkpointing] Extend CheckpointCoordinatorTest
Date Fri, 19 Aug 2016 10:04:32 GMT
[FLINK-4322] [checkpointing] Extend CheckpointCoordinatorTest

The added tests check that savepoints ignore the maximum number
of concurrent checkpoints and minimum delay between checkpoints.

This closes #2385.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d7f8803
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d7f8803
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d7f8803

Branch: refs/heads/master
Commit: 5d7f8803155d2eb8865cce9a60dd677c2400261c
Parents: 8854d75
Author: Ufuk Celebi <uce@apache.org>
Authored: Fri Aug 19 12:01:24 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Fri Aug 19 12:04:12 2016 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinatorTest.java   | 90 +++++++++++++++++++-
 1 file changed, 89 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5d7f8803/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index f243803..09c53d6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -36,6 +36,7 @@ import org.mockito.stubbing.Answer;
 import scala.concurrent.Future;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -1646,7 +1647,94 @@ public class CheckpointCoordinatorTest {
 			fail(e.getMessage());
 		}
 	}
-	
+
+	/**
+	 * Tests that the savepoints can be triggered concurrently.
+	 */
+	@Test
+	public void testConcurrentSavepoints() throws Exception {
+		JobID jobId = new JobID();
+
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+		StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
+
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+				jobId,
+				100000,
+				200000,
+				0L,
+				1, // max one checkpoint at a time => should not affect savepoints
+				42,
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				cl,
+				checkpointIDCounter,
+				new StandaloneCompletedCheckpointStore(2, cl),
+				new HeapSavepointStore(),
+				new DisabledCheckpointStatsTracker());
+
+		List<Future<String>> savepointFutures = new ArrayList<>();
+
+		int numSavepoints = 5;
+
+		// Trigger savepoints
+		for (int i = 0; i < numSavepoints; i++) {
+			savepointFutures.add(coord.triggerSavepoint(i));
+		}
+
+		// After triggering multiple savepoints, all should in progress
+		for (Future<String> savepointFuture : savepointFutures) {
+			assertFalse(savepointFuture.isCompleted());
+		}
+
+		// ACK all savepoints
+		long checkpointId = checkpointIDCounter.getLast();
+		for (int i = 0; i < numSavepoints; i++, checkpointId--) {
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobId, attemptID1, checkpointId));
+		}
+
+		// After ACKs, all should be completed
+		for (Future<String> savepointFuture : savepointFutures) {
+			assertTrue(savepointFuture.isCompleted());
+		}
+	}
+
+	/**
+	 * Tests that no minimum delay between savepoints is enforced.
+	 */
+	@Test
+	public void testMinDelayBetweenSavepoints() throws Exception {
+		JobID jobId = new JobID();
+
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+				jobId,
+				100000,
+				200000,
+				100000000L, // very long min delay => should not affect savepoints
+				1,
+				42,
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				cl,
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(2, cl),
+				new HeapSavepointStore(),
+				new DisabledCheckpointStatsTracker());
+
+		Future<String> savepoint0 = coord.triggerSavepoint(0);
+		assertFalse("Did not trigger savepoint", savepoint0.isCompleted());
+
+		Future<String> savepoint1 = coord.triggerSavepoint(1);
+		assertFalse("Did not trigger savepoint", savepoint1.isCompleted());
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------


Mime
View raw message