flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-5248] [tests] Catch restore failures in SavepointITCase
Date Mon, 05 Dec 2016 13:28:26 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 e4ca3a587 -> a5065e316


[FLINK-5248] [tests] Catch restore failures in SavepointITCase

- Minor test clean up and reduced checkpointing interval for speed up
- The test did not catch a task restore failure since only the
  TDDs were tested. Now, we test that restore is actually called
  and some checkpoints complete after restoring from a savepoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a5065e31
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a5065e31
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a5065e31

Branch: refs/heads/release-1.1
Commit: a5065e316ab05db4801e56283dc8def800905d50
Parents: e4ca3a5
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Dec 5 11:50:38 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Dec 5 11:50:39 2016 +0100

----------------------------------------------------------------------
 .../test/checkpointing/SavepointITCase.java     | 112 +++++++++++++++----
 1 file changed, 90 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a5065e31/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 2878e74..3186b6e 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -89,6 +89,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import static org.apache.flink.runtime.messages.JobManagerMessages.getDisposeSavepointSuccess;
@@ -134,7 +135,8 @@ public class SavepointITCase extends TestLogger {
 		final Deadline deadline = new FiniteDuration(5, TimeUnit.MINUTES).fromNow();
 
 		// The number of checkpoints to complete before triggering the savepoint
-		final int numberOfCompletedCheckpoints = 10;
+		final int numberOfCompletedCheckpoints = 2;
+		final int checkpointingInterval = 100;
 
 		// Temporary directory for file state backend
 		final File tmpDir = CommonTestUtils.createTempDirectory();
@@ -184,13 +186,12 @@ public class SavepointITCase extends TestLogger {
 			LOG.info("JobManager: " + jobManager + ".");
 
 			// Submit the job
-			final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, 1000);
+			final JobGraph jobGraph = createJobGraph(parallelism, 0, 1000, checkpointingInterval);
 			final JobID jobId = jobGraph.getJobID();
 
 			// Wait for the source to be notified about the expected number
 			// of completed checkpoints
-			InfiniteTestSource.CheckpointCompleteLatch = new CountDownLatch(
-					numberOfCompletedCheckpoints);
+			StatefulCounter.resetForTest();
 
 			LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
 
@@ -200,7 +201,7 @@ public class SavepointITCase extends TestLogger {
 					" checkpoint complete notifications.");
 
 			// Wait...
-			InfiniteTestSource.CheckpointCompleteLatch.await();
+			StatefulCounter.awaitCompletedCheckpoints(parallelism, numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
 
 			LOG.info("Received all " + numberOfCompletedCheckpoints +
 					" checkpoint complete notifications.");
@@ -232,24 +233,16 @@ public class SavepointITCase extends TestLogger {
 			// - Verification START -------------------------------------------
 
 			// Only one checkpoint of the savepoint should exist
-			String errMsg = "Checkpoints directory not cleaned up properly.";
+			// We currently have the following directory layout: checkpointDir/jobId/chk-ID
 			File[] files = checkpointDir.listFiles();
-			if (files != null) {
-				assertEquals(errMsg, 1, files.length);
-			}
-			else {
-				fail(errMsg);
-			}
+			assertNotNull("Checkpoint directory empty", files);
+			assertEquals("Checkpoints directory cleaned up, but needed for savepoint.", 1, files.length);
+			assertEquals("No job-specific base directory", jobGraph.getJobID().toString(), files[0].getName());
 
 			// Only one savepoint should exist
-			errMsg = "Savepoints directory cleaned up.";
 			files = savepointDir.listFiles();
-			if (files != null) {
-				assertEquals(errMsg, 1, files.length);
-			}
-			else {
-				fail(errMsg);
-			}
+			assertNotNull("Savepoint directory empty", files);
+			assertEquals("No savepoint found in savepoint directory", 1, files.length);
 
 			// - Verification END ---------------------------------------------
 
@@ -264,6 +257,10 @@ public class SavepointITCase extends TestLogger {
 					deadline.timeLeft());
 			LOG.info("JobManager: " + jobManager + ".");
 
+			// Reset for restore
+			StatefulCounter.resetForTest();
+
+			// Gather all task deployment descriptors
 			final Throwable[] error = new Throwable[1];
 			final ForkableFlinkMiniCluster finalFlink = flink;
 			final Multimap<JobVertexID, TaskDeploymentDescriptor> tdds = HashMultimap.create();
@@ -321,7 +318,7 @@ public class SavepointITCase extends TestLogger {
 
 			// - Verification START -------------------------------------------
 
-			errMsg = "Error during gathering of TaskDeploymentDescriptors";
+			String errMsg = "Error during gathering of TaskDeploymentDescriptors";
 			assertNull(errMsg, error[0]);
 
 			// Verify that all tasks, which are part of the savepoint
@@ -344,6 +341,12 @@ public class SavepointITCase extends TestLogger {
 				}
 			}
 
+			// Await state is restored
+			StatefulCounter.awaitStateRestoredFromCheckpoint(deadline.timeLeft().toMillis());
+
+			// Await some progress after restore
+			StatefulCounter.awaitCompletedCheckpoints(parallelism, numberOfCompletedCheckpoints, deadline.timeLeft().toMillis());
+
 			// - Verification END ---------------------------------------------
 
 			LOG.info("Cancelling job " + jobId + ".");
@@ -932,8 +935,13 @@ public class SavepointITCase extends TestLogger {
 	}
 
 	private static class StatefulCounter
-			extends RichMapFunction<Integer, Integer>
-			implements Checkpointed<byte[]> {
+		extends RichMapFunction<Integer, Integer>
+		implements Checkpointed<byte[]>, CheckpointListener {
+
+		private static final Object checkpointLock = new Object();
+		private static int numCompleteCalls;
+		private static int numRestoreCalls;
+		private static boolean restoredFromCheckpoint;
 
 		private static final long serialVersionUID = 7317800376639115920L;
 		private byte[] data;
@@ -964,6 +972,66 @@ public class SavepointITCase extends TestLogger {
 		@Override
 		public void restoreState(byte[] data) throws Exception {
 			this.data = data;
+
+			synchronized (checkpointLock) {
+				if (++numRestoreCalls == getRuntimeContext().getNumberOfParallelSubtasks()) {
+					restoredFromCheckpoint = true;
+					checkpointLock.notifyAll();
+				}
+			}
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			synchronized (checkpointLock) {
+				numCompleteCalls++;
+				checkpointLock.notifyAll();
+			}
+		}
+
+		// --------------------------------------------------------------------
+
+		static void resetForTest() {
+			synchronized (checkpointLock) {
+				numCompleteCalls = 0;
+				numRestoreCalls = 0;
+				restoredFromCheckpoint = false;
+			}
+		}
+
+		static void awaitCompletedCheckpoints(
+			int parallelism,
+			int expectedNumberOfCompletedCheckpoints,
+			long timeoutMillis) throws InterruptedException, TimeoutException {
+
+			long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
+
+			synchronized (checkpointLock) {
+				// One completion notification per parallel subtask
+				int expectedNumber = parallelism * expectedNumberOfCompletedCheckpoints;
+				while (numCompleteCalls < expectedNumber && System.nanoTime() <= deadline)
{
+					checkpointLock.wait();
+				}
+
+				if (numCompleteCalls < expectedNumber) {
+					throw new TimeoutException("Did not complete " + expectedNumberOfCompletedCheckpoints
+
+						" within timeout of " + timeoutMillis + " millis.");
+				}
+			}
+		}
+
+		static void awaitStateRestoredFromCheckpoint(long timeoutMillis) throws InterruptedException,
TimeoutException {
+			long deadline = System.nanoTime() + timeoutMillis * 1_000_000;
+
+			synchronized (checkpointLock) {
+				while (!restoredFromCheckpoint && System.currentTimeMillis() <= deadline)
{
+					checkpointLock.wait();
+				}
+
+				if (!restoredFromCheckpoint) {
+					throw new TimeoutException("Did not restore from checkpoint within timeout of " + timeoutMillis
+ " millis.");
+				}
+			}
 		}
 	}
 


Mime
View raw message