flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-3390] [runtime, tests] Restore savepoint path on ExecutionGraph restart
Date Fri, 26 Feb 2016 17:44:12 GMT
Repository: flink
Updated Branches:
  refs/heads/master 016644ac3 -> c2a43c95d


[FLINK-3390] [runtime, tests] Restore savepoint path on ExecutionGraph restart

Temporary work around to restore initial state on failure during recovery as
required by a user. Will be superseded by FLINK-3397 with better handling of
checkpoint and savepoint restoring.

A failure during recovery resulted in restarting a job without its savepoint
state. This temporary work around makes sure that if the savepoint coordinator
ever restored a savepoint and there was no checkpoint after the savepoint,
the savepoint state will be restored again.

This closes #1720.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c2a43c95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c2a43c95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c2a43c95

Branch: refs/heads/master
Commit: c2a43c95de9b7616b5249cbb190fc0203030bf40
Parents: 016644a
Author: Ufuk Celebi <uce@apache.org>
Authored: Fri Feb 26 12:46:07 2016 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Feb 26 18:43:58 2016 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       |   6 +-
 .../checkpoint/SavepointCoordinator.java        |  12 ++
 .../runtime/executiongraph/ExecutionGraph.java  |  12 +-
 .../test/checkpointing/SavepointITCase.java     | 169 ++++++++++++++++++-
 4 files changed, 187 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c2a43c95/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index b0e23d6..edeab6a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -741,7 +741,7 @@ public class CheckpointCoordinator {
 	//  Checkpoint State Restoring
 	// --------------------------------------------------------------------------------------------
 
-	public void restoreLatestCheckpointedState(
+	public boolean restoreLatestCheckpointedState(
 			Map<JobVertexID, ExecutionJobVertex> tasks,
 			boolean errorIfNoCheckpoint,
 			boolean allOrNothingState) throws Exception {
@@ -761,7 +761,7 @@ public class CheckpointCoordinator {
 				if (errorIfNoCheckpoint) {
 					throw new IllegalStateException("No completed checkpoint available");
 				} else {
-					return;
+					return false;
 				}
 			}
 
@@ -799,6 +799,8 @@ public class CheckpointCoordinator {
 					exec.setInitialState(state.getState(), recoveryTimestamp);
 				}
 			}
+
+			return true;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c2a43c95/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
index 5638e78..5008932 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
@@ -72,6 +72,10 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 	/** Mapping from checkpoint ID to promises for savepoints. */
 	private final Map<Long, Promise<String>> savepointPromises;
 
+	// TODO(uce) Temporary work around to restore initial state on
+	// failure during recovery. Will be superseded by FLINK-3397.
+	private volatile String savepointRestorePath;
+
 	public SavepointCoordinator(
 			JobID jobId,
 			long baseInterval,
@@ -102,6 +106,10 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 		this.savepointPromises = new ConcurrentHashMap<>();
 	}
 
+	public String getSavepointRestorePath() {
+		return savepointRestorePath;
+	}
+
 	// ------------------------------------------------------------------------
 	// Savepoint trigger and reset
 	// ------------------------------------------------------------------------
@@ -221,6 +229,10 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 			checkpointIdCounter.start();
 			checkpointIdCounter.setCount(nextCheckpointId + 1);
 			LOG.info("Reset the checkpoint ID to {}", nextCheckpointId);
+
+			if (savepointRestorePath == null) {
+				savepointRestorePath = savepointPath;
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c2a43c95/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9a6eb85..0d6de98 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -872,7 +872,17 @@ public class ExecutionGraph implements Serializable {
 
 				// if we have checkpointed state, reload it into the executions
 				if (checkpointCoordinator != null) {
-					checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
+					boolean restored = checkpointCoordinator
+							.restoreLatestCheckpointedState(getAllVertices(), false, false);
+
+					// TODO(uce) Temporary work around to restore initial state on
+					// failure during recovery. Will be superseded by FLINK-3397.
+					if (!restored && savepointCoordinator != null) {
+						String savepointPath = savepointCoordinator.getSavepointRestorePath();
+						if (savepointPath != null) {
+							savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath);
+						}
+					}
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c2a43c95/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 49f4ee7..5386353 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -56,7 +56,7 @@ import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskStateList;
@@ -71,6 +71,7 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.File;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -736,6 +737,113 @@ public class SavepointITCase extends TestLogger {
 		}
 	}
 
+	/**
+	 * Tests that a restore failure is retried with the savepoint state.
+	 */
+	@Test
+	public void testRestoreFailure() throws Exception {
+		// Config
+		int numTaskManagers = 1;
+		int numSlotsPerTaskManager = 1;
+		int numExecutionRetries = 2;
+		int retryDelay = 500;
+		int checkpointingInterval = 100000000;
+
+		// Test deadline
+		final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow();
+
+		ForkableFlinkMiniCluster flink = null;
+
+		try {
+			// The job
+			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(1);
+			env.enableCheckpointing(checkpointingInterval);
+			env.setNumberOfExecutionRetries(numExecutionRetries);
+			env.getConfig().setExecutionRetryDelay(retryDelay);
+
+			DataStream<Integer> stream = env
+					.addSource(new RestoreStateCountingAndFailingSource());
+
+			// Source configuration
+			RestoreStateCountingAndFailingSource.failOnRestoreStateCall = false;
+			RestoreStateCountingAndFailingSource.numRestoreStateCalls = 0;
+			RestoreStateCountingAndFailingSource.checkpointCompleteLatch = new CountDownLatch(1);
+			RestoreStateCountingAndFailingSource.emitted= 0;
+
+			stream.addSink(new DiscardingSink<Integer>());
+
+			JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+			// Flink configuration
+			final Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
+			LOG.info("Flink configuration: " + config + ".");
+
+			// Start Flink
+			flink = new ForkableFlinkMiniCluster(config);
+			LOG.info("Starting Flink cluster.");
+			flink.start();
+
+			// Retrieve the job manager
+			LOG.info("Retrieving JobManager.");
+			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
+			LOG.info("JobManager: " + jobManager + ".");
+
+			// Submit the job and wait for some checkpoints to complete
+			flink.submitJobDetached(jobGraph);
+
+			while (deadline.hasTimeLeft() && RestoreStateCountingAndFailingSource.emitted
< 100) {
+				Thread.sleep(100);
+			}
+
+			assertTrue("No progress", RestoreStateCountingAndFailingSource.emitted >= 100);
+
+			// Trigger the savepoint
+			Future<Object> savepointPathFuture = jobManager.ask(
+					new TriggerSavepoint(jobGraph.getJobID()), deadline.timeLeft());
+
+			final String savepointPath = ((TriggerSavepointSuccess) Await
+					.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
+			LOG.info("Retrieved savepoint path: " + savepointPath + ".");
+
+			// Completed checkpoint
+			RestoreStateCountingAndFailingSource.checkpointCompleteLatch.await();
+
+			// Cancel the job
+			Future<?> cancelFuture = jobManager.ask(new CancelJob(
+					jobGraph.getJobID()), deadline.timeLeft());
+			Await.ready(cancelFuture, deadline.timeLeft());
+
+			// Wait for the job to be removed
+			Future<?> removedFuture = jobManager.ask(new NotifyWhenJobRemoved(
+					jobGraph.getJobID()), deadline.timeLeft());
+			Await.ready(removedFuture, deadline.timeLeft());
+
+			// Set source to fail on restore calls and try to recover from savepoint
+			RestoreStateCountingAndFailingSource.failOnRestoreStateCall = true;
+			jobGraph.setSavepointPath(savepointPath);
+
+			try {
+				flink.submitJobAndWait(jobGraph, false, deadline.timeLeft());
+				// If the savepoint state is not restored, we will wait here
+				// until the deadline times out.
+				fail("Did not throw expected Exception");
+			} catch (Exception ignored) {
+			} finally {
+				// Expecting one restore for the initial submission from
+				// savepoint and one for the execution retries
+				assertEquals(1 + numExecutionRetries, RestoreStateCountingAndFailingSource.numRestoreStateCalls);
+			}
+		}
+		finally {
+			if (flink != null) {
+				flink.shutdown();
+			}
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// Test program
 	// ------------------------------------------------------------------------
@@ -761,13 +869,7 @@ public class SavepointITCase extends TestLogger {
 				.shuffle()
 				.map(new StatefulCounter());
 
-		// Discard
-		stream.addSink(new SinkFunction<Integer>() {
-			private static final long serialVersionUID = -8671189807690005893L;
-			@Override
-			public void invoke(Integer value) throws Exception {
-			}
-		});
+		stream.addSink(new DiscardingSink<Integer>());
 
 		return env.getStreamGraph().getJobGraph();
 	}
@@ -779,7 +881,7 @@ public class SavepointITCase extends TestLogger {
 		private volatile boolean running = true;
 
 		// Test control
-		private static CountDownLatch CheckpointCompleteLatch = new CountDownLatch(0);
+		private static CountDownLatch CheckpointCompleteLatch = new CountDownLatch(1);
 
 		@Override
 		public void run(SourceContext<Integer> ctx) throws Exception {
@@ -837,4 +939,53 @@ public class SavepointITCase extends TestLogger {
 		}
 	}
 
+	/**
+	 * Test source that counts calls to restoreState and that can be configured
+	 * to fail on restoreState calls.
+	 */
+	private static class RestoreStateCountingAndFailingSource
+			implements SourceFunction<Integer>, Checkpointed, CheckpointListener {
+
+		private static final long serialVersionUID = 1L;
+
+		private static volatile int numRestoreStateCalls = 0;
+		private static volatile boolean failOnRestoreStateCall = false;
+		private static volatile CountDownLatch checkpointCompleteLatch = new CountDownLatch(1);
+		private static volatile int emitted = 0;
+
+		private volatile boolean running = true;
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			while (running) {
+				ctx.collect(1);
+				emitted++;
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+
+		@Override
+		public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception
{
+			return 1;
+		}
+
+		@Override
+		public void restoreState(Serializable state) throws Exception {
+			numRestoreStateCalls++;
+
+			if (failOnRestoreStateCall) {
+				throw new RuntimeException("Restore test failure");
+			}
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			checkpointCompleteLatch.countDown();
+		}
+	}
+
 }


Mime
View raw message