flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/3] flink git commit: [FLINK-7216] [distr. coordination] Guard against concurrent global failover
Date Thu, 20 Jul 2017 13:52:20 GMT
[FLINK-7216] [distr. coordination] Guard against concurrent global failover


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e6348fbd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e6348fbd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e6348fbd

Branch: refs/heads/release-1.3
Commit: e6348fbde1fc0ee8ea682063a4d6503ba3b68864
Parents: 6c0803d
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Jul 18 19:49:56 2017 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jul 20 15:26:43 2017 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  35 ++--
 .../restart/ExecutionGraphRestartCallback.java  |  23 ++-
 .../executiongraph/restart/RestartStrategy.java |   2 +-
 .../ExecutionGraphRestartTest.java              | 161 ++++++++++++++++++-
 .../executiongraph/ExecutionGraphTestUtils.java |  35 +++-
 5 files changed, 231 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 9f29faf..a7d768b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -966,7 +966,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				if (transitionState(current, JobStatus.CANCELLING)) {
 
 					// make sure no concurrent local actions interfere with the cancellation
-					incrementGlobalModVersion();
+					final long globalVersionForRestart = incrementGlobalModVersion();
 
 					final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
 
@@ -980,7 +980,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					allTerminal.thenAccept(new AcceptFunction<Void>() {
 						@Override
 						public void accept(Void value) {
-							allVerticesInTerminalState();
+							// cancellations may currently be overridden by failures which trigger
+							// restarts, so we need to pass a proper restart global version here
+							allVerticesInTerminalState(globalVersionForRestart);
 						}
 					});
 
@@ -1085,17 +1087,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				return;
 			}
 			else if (current == JobStatus.RESTARTING) {
+				// we handle 'failGlobal()' while in 'RESTARTING' as a safety net in case something
+				// has gone wrong in 'RESTARTING' and we need to re-attempt the restarts
 				this.failureCause = t;
 
-				if (tryRestartOrFail()) {
+				final long globalVersionForRestart = incrementGlobalModVersion();
+				if (tryRestartOrFail(globalVersionForRestart)) {
 					return;
 				}
 			}
 			else if (transitionState(current, JobStatus.FAILING, t)) {
 				this.failureCause = t;
 
-				// make sure no concurrent local actions interfere with the cancellation
-				incrementGlobalModVersion();
+				// make sure no concurrent local or global actions interfere with the failover
+				final long globalVersionForRestart = incrementGlobalModVersion();
 
 				// we build a future that is complete once all vertices have reached a terminal state
 				final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
@@ -1109,7 +1114,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				allTerminal.thenAccept(new AcceptFunction<Void>() {
 					@Override
 					public void accept(Void value) {
-						allVerticesInTerminalState();
+						allVerticesInTerminalState(globalVersionForRestart);
 					}
 				});
 
@@ -1120,10 +1125,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	public void restart() {
+	public void restart(long expectedGlobalVersion) {
 		try {
 			synchronized (progressLock) {
-				JobStatus current = state;
+				// check the global version to see whether this recovery attempt is still valid
+				if (globalModVersion != expectedGlobalVersion) {
+					LOG.info("Concurrent full restart subsumed this restart.");
+					return;
+				}
+
+				final JobStatus current = state;
 
 				if (current == JobStatus.CANCELED) {
 					LOG.info("Canceled job during restart. Aborting restart.");
@@ -1329,7 +1340,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * This method is a callback during cancellation/failover and called when all tasks
 	 * have reached a terminal state (cancelled/failed/finished).
 	 */
-	private void allVerticesInTerminalState() {
+	private void allVerticesInTerminalState(long expectedGlobalVersionForRestart) {
 		// we are done, transition to the final state
 		JobStatus current;
 		while (true) {
@@ -1345,7 +1356,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				}
 			}
 			else if (current == JobStatus.FAILING) {
-				if (tryRestartOrFail()) {
+				if (tryRestartOrFail(expectedGlobalVersionForRestart)) {
 					break;
 				}
 				// concurrent job status change, let's check again
@@ -1374,7 +1385,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 *
 	 * @return true if the operation could be executed; false if a concurrent job status change
occurred
 	 */
-	private boolean tryRestartOrFail() {
+	private boolean tryRestartOrFail(long globalModVersionForRestart) {
 		JobStatus currentState = state;
 
 		if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
@@ -1392,7 +1403,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
 					LOG.info("Restarting the job {} ({}).", getJobName(), getJobID());
 
-					RestartCallback restarter = new ExecutionGraphRestartCallback(this);
+					RestartCallback restarter = new ExecutionGraphRestartCallback(this, globalModVersionForRestart);
 					restartStrategy.restart(restarter, new ScheduledExecutorServiceAdapter(futureExecutor));
 
 					return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
index 5874f91..7f98110 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
@@ -25,27 +25,38 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}. 
- * 
+ * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}.
+ *
  * <p>This callback implementation is one-shot; it can only be used once.
  */
 public class ExecutionGraphRestartCallback implements RestartCallback {
 
-	/** The ExecutionGraph to restart */
+	/** The ExecutionGraph to restart. */
 	private final ExecutionGraph execGraph;
 
-	/** Atomic flag to make sure this is used only once */
+	/** Atomic flag to make sure this is used only once. */
 	private final AtomicBoolean used;
 
-	public ExecutionGraphRestartCallback(ExecutionGraph execGraph) {
+	/** The globalModVersion that the ExecutionGraph needs to have for the restart to go through.
*/
+	private final long expectedGlobalModVersion;
+
+	/**
+	 * Creates a new ExecutionGraphRestartCallback.
+	 *
+	 * @param execGraph The ExecutionGraph to restart
+	 * @param expectedGlobalModVersion  The globalModVersion that the ExecutionGraph needs to
have
+	 *                                  for the restart to go through
+	 */
+	public ExecutionGraphRestartCallback(ExecutionGraph execGraph, long expectedGlobalModVersion)
{
 		this.execGraph = checkNotNull(execGraph);
 		this.used = new AtomicBoolean(false);
+		this.expectedGlobalModVersion = expectedGlobalModVersion;
 	}
 
 	@Override
 	public void triggerFullRecovery() {
 		if (used.compareAndSet(false, true)) {
-			execGraph.restart();
+			execGraph.restart(expectedGlobalModVersion);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
index 60e2e8b..ffa2777 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -37,7 +37,7 @@ public interface RestartStrategy {
 	 * Called by the ExecutionGraph to eventually trigger a full recovery.
 	 * The recovery must be triggered on the given callback object, and may be delayed
 	 * with the help of the given scheduled executor.
-	 * 
+	 *
 	 * <p>The thread that calls this method is not supposed to block/sleep.
 	 *
 	 * @param restarter The hook to restart the ExecutionGraph

http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 81702a2..3ce6baa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -36,10 +36,13 @@ import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -60,9 +63,16 @@ import scala.concurrent.impl.Promise;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.completeCancellingForAllVertices;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createSimpleTestGraph;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilDeployedAndSwitchToRunning;
+import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -90,7 +100,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.FAILED, eg.getState());
 
 		// This should not restart the graph.
-		eg.restart();
+		eg.restart(eg.getGlobalModVersion());
 
 		assertEquals(JobStatus.FAILED, eg.getState());
 	}
@@ -187,7 +197,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.CANCELED, executionGraph.getState());
 
 		// The restart has been aborted
-		executionGraph.restart();
+		executionGraph.restart(executionGraph.getGlobalModVersion());
 
 		assertEquals(JobStatus.CANCELED, executionGraph.getState());
 	}
@@ -254,7 +264,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.FAILED, executionGraph.getState());
 
 		// The restart has been aborted
-		executionGraph.restart();
+		executionGraph.restart(executionGraph.getGlobalModVersion());
 
 		assertEquals(JobStatus.FAILED, executionGraph.getState());
 	}
@@ -555,6 +565,106 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.SUSPENDED, eg.getState());
 	}
 
+	@Test
+	public void testConcurrentLocalFailAndRestart() throws Exception {
+		final ExecutionGraph eg = createSimpleTestGraph(new FixedDelayRestartStrategy(10, 0L));
+		eg.setScheduleMode(ScheduleMode.EAGER);
+		eg.scheduleForExecution();
+
+		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+		final ExecutionJobVertex vertex = eg.getVerticesTopologically().iterator().next();
+		final Execution first = vertex.getTaskVertices()[0].getCurrentExecutionAttempt();
+		final Execution last = vertex.getTaskVertices()[vertex.getParallelism() - 1].getCurrentExecutionAttempt();
+
+		final OneShotLatch failTrigger = new OneShotLatch();
+		final CountDownLatch readyLatch = new CountDownLatch(2);
+
+		Thread failure1 = new Thread() {
+			@Override
+			public void run() {
+				readyLatch.countDown();
+				try {
+					failTrigger.await();
+				} catch (InterruptedException ignored) {}
+
+				first.fail(new Exception("intended test failure 1"));
+			}
+		};
+
+		Thread failure2 = new Thread() {
+			@Override
+			public void run() {
+				readyLatch.countDown();
+				try {
+					failTrigger.await();
+				} catch (InterruptedException ignored) {}
+
+				last.fail(new Exception("intended test failure 2"));
+			}
+		};
+
+		// make sure both threads start simultaneously
+		failure1.start();
+		failure2.start();
+		readyLatch.await();
+		failTrigger.trigger();
+
+		waitUntilJobStatus(eg, JobStatus.FAILING, 1000);
+		completeCancellingForAllVertices(eg);
+
+		waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
+		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+		finishAllVertices(eg);
+
+		eg.waitUntilTerminal();
+		assertEquals(JobStatus.FINISHED, eg.getState());
+	}
+
+	@Test
+	public void testConcurrentGlobalFailAndRestarts() throws Exception {
+		final OneShotLatch restartTrigger = new OneShotLatch();
+
+		final int parallelism = 10;
+		final JobID jid = new JobID();
+		final JobVertex vertex = createNoOpVertex(parallelism);
+		final SlotProvider slots = new SimpleSlotProvider(jid, parallelism, new NotCancelAckingTaskGateway());
+		final TriggeredRestartStrategy restartStrategy = new TriggeredRestartStrategy(restartTrigger);
+
+		final ExecutionGraph eg = createSimpleTestGraph(jid, slots, restartStrategy, vertex);
+		eg.setScheduleMode(ScheduleMode.EAGER);
+		eg.scheduleForExecution();
+
+		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+
+		// fail into 'RESTARTING'
+		eg.failGlobal(new Exception("intended test failure 1"));
+		assertEquals(JobStatus.FAILING, eg.getState());
+		completeCancellingForAllVertices(eg);
+		waitUntilJobStatus(eg, JobStatus.RESTARTING, 1000);
+
+		eg.failGlobal(new Exception("intended test failure 2"));
+		assertEquals(JobStatus.RESTARTING, eg.getState());
+
+		// trigger both restart strategies to kick in concurrently
+		restartTrigger.trigger();
+
+		waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);
+		waitUntilDeployedAndSwitchToRunning(eg, 1000);
+		finishAllVertices(eg);
+
+		eg.waitUntilTerminal();
+		assertEquals(JobStatus.FINISHED, eg.getState());
+
+		if (eg.getNumberOfFullRestarts() > 2) {
+			fail("Too many restarts: " + eg.getNumberOfFullRestarts());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
 	private static class ControllableRestartStrategy implements RestartStrategy {
 
 		private Promise<Boolean> reachedCanRestart = new Promise.DefaultPromise<>();
@@ -727,4 +837,49 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		assertEquals(JobStatus.FINISHED, eg.getState());
 	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A TaskManager gateway that does not ack cancellations.
+	 */
+	private static final class NotCancelAckingTaskGateway extends SimpleAckingTaskManagerGateway
{
+
+		@Override
+		public org.apache.flink.runtime.concurrent.Future<Acknowledge> cancelTask(ExecutionAttemptID
executionAttemptID, Time timeout) {
+			return new FlinkCompletableFuture<>();
+		}
+	}
+
+	/**
+	 * A RestartStrategy that blocks restarting on a given {@link OneShotLatch}.
+	 */
+	private static final class TriggeredRestartStrategy implements RestartStrategy {
+
+		private final OneShotLatch latch;
+
+		TriggeredRestartStrategy(OneShotLatch latch) {
+			this.latch = latch;
+		}
+
+		@Override
+		public boolean canRestart() {
+			return true;
+		}
+
+		@Override
+		public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
+			executor.execute(new Runnable() {
+				@Override
+				public void run() {
+					try {
+						latch.await();
+					} catch (InterruptedException e) {
+						Thread.currentThread().interrupt();
+					}
+					restarter.triggerFullRecovery();
+				}
+			});
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e6348fbd/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 51b5c7f..83c8fb0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -170,8 +170,8 @@ public class ExecutionGraphTestUtils {
 	}
 
 	/**
-	 * Takes all vertices in the given ExecutionGraph and switches their current
-	 * execution to RUNNING.
+	 * Takes all vertices in the given ExecutionGraph and attempts to move them
+	 * from CANCELING to CANCELED.
 	 */
 	public static void completeCancellingForAllVertices(ExecutionGraph eg) {
 		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
@@ -181,7 +181,7 @@ public class ExecutionGraphTestUtils {
 
 	/**
 	 * Takes all vertices in the given ExecutionGraph and switches their current
-	 * execution to RUNNING.
+	 * execution to FINISHED.
 	 */
 	public static void finishAllVertices(ExecutionGraph eg) {
 		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
@@ -189,6 +189,35 @@ public class ExecutionGraphTestUtils {
 		}
 	}
 
+	/**
+	 * Turns a newly scheduled execution graph into a state where all vertices run.
+	 * This waits until all executions have reached state 'DEPLOYING' and then switches them
to running.
+	 */
+	public static void waitUntilDeployedAndSwitchToRunning(ExecutionGraph eg, long timeout)
throws TimeoutException {
+		// wait until everything is running
+		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+			final Execution exec = ev.getCurrentExecutionAttempt();
+			waitUntilExecutionState(exec, ExecutionState.DEPLOYING, timeout);
+		}
+
+		// Note: As ugly as it is, we need this minor sleep, because between switching
+		// to 'DEPLOYED' and when the 'switchToRunning()' may be called lies a race check
+		// against concurrent modifications (cancel / fail). We can only switch this to running
+		// once that check is passed. For the actual runtime, this switch is triggered by a callback
+		// from the TaskManager, which comes strictly after that. For tests, we use mock TaskManagers
+		// which cannot easily tell us when that condition has happened, unfortunately.
+		try {
+			Thread.sleep(2);
+		} catch (InterruptedException e) {
+			Thread.currentThread().interrupt();
+		}
+
+		for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
+			final Execution exec = ev.getCurrentExecutionAttempt();
+			exec.switchToRunning();
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  state modifications
 	// ------------------------------------------------------------------------


Mime
View raw message