flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-9788) ExecutionGraph Inconsistency prevents Job from recovering
Date Thu, 11 Oct 2018 15:00:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-9788?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16646551#comment-16646551
] 

ASF GitHub Bot commented on FLINK-9788:
---------------------------------------

tillrohrmann closed pull request #6810: [FLINK-9788] Fix ExecutionGraph inconsistency for
global failures when restarting
URL: https://github.com/apache/flink/pull/6810
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index acb1e16fe71..0be1ff27420 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1151,18 +1151,7 @@ public void failGlobal(Throwable t) {
 				current == JobStatus.SUSPENDED ||
 				current.isGloballyTerminalState()) {
 				return;
-			}
-			else if (current == JobStatus.RESTARTING) {
-				// we handle 'failGlobal()' while in 'RESTARTING' as a safety net in case something
-				// has gone wrong in 'RESTARTING' and we need to re-attempt the restarts
-				initFailureCause(t);
-
-				final long globalVersionForRestart = incrementGlobalModVersion();
-				if (tryRestartOrFail(globalVersionForRestart)) {
-					return;
-				}
-			}
-			else if (transitionState(current, JobStatus.FAILING, t)) {
+			} else if (transitionState(current, JobStatus.FAILING, t)) {
 				initFailureCause(t);
 
 				// make sure no concurrent local or global actions interfere with the failover
@@ -1240,7 +1229,7 @@ public void restart(long expectedGlobalVersion) {
 						colGroups.add(cgroup);
 					}
 
-					jv.resetForNewExecution(resetTimestamp, globalModVersion);
+					jv.resetForNewExecution(resetTimestamp, expectedGlobalVersion);
 				}
 
 				for (int i = 0; i < stateTimestamps.length; i++) {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 9b98de78143..91510d1af54 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -25,6 +25,7 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -57,12 +58,15 @@
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.io.IOException;
 import java.net.InetAddress;
 import java.util.Iterator;
@@ -86,15 +90,20 @@
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.finishAllVertices;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.switchToRunning;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.waitUntilJobStatus;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-import static org.mockito.Mockito.spy;
 
+/**
+ * Tests the restart behaviour of the {@link ExecutionGraph}.
+ */
 public class ExecutionGraphRestartTest extends TestLogger {
 
-	private final static int NUM_TASKS = 31;
+	private static final int NUM_TASKS = 31;
 
 	private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
 
@@ -113,9 +122,7 @@ public void testNoManualRestart() throws Exception {
 
 		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
 
-		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-			vertex.getCurrentExecutionAttempt().cancelingComplete();
-		}
+		completeCanceling(eg);
 
 		assertEquals(JobStatus.FAILED, eg.getState());
 
@@ -125,9 +132,19 @@ public void testNoManualRestart() throws Exception {
 		assertEquals(JobStatus.FAILED, eg.getState());
 	}
 
+	private void completeCanceling(ExecutionGraph eg) {
+		executeOperationForAllExecutions(eg, Execution::cancelingComplete);
+	}
+
+	private void executeOperationForAllExecutions(ExecutionGraph eg, Consumer<Execution>
operation) {
+		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+			operation.accept(vertex.getCurrentExecutionAttempt());
+		}
+	}
+
 	@Test
 	public void testRestartAutomatically() throws Exception {
-		RestartStrategy restartStrategy = new FixedDelayRestartStrategy(1, 1000);
+		RestartStrategy restartStrategy = new FixedDelayRestartStrategy(1, 0L);
 		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
 		ExecutionGraph eg = executionGraphInstanceTuple.f0;
 
@@ -207,23 +224,21 @@ public void testFailWhileRestarting() throws Exception {
 		// Kill the instance and wait for the job to restart
 		instance.markDead();
 
-		Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
-
-		while (deadline.hasTimeLeft() &&
-			executionGraph.getState() != JobStatus.RESTARTING) {
-
-			Thread.sleep(100);
-		}
+		waitUntilJobStatus(executionGraph, JobStatus.RESTARTING, TestingUtils.TESTING_DURATION().toMillis());
 
 		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
 
-		// The restarting should not fail with an ordinary exception
-		executionGraph.failGlobal(new Exception("Test exception"));
+		// If we fail when being in RESTARTING, then we should try to restart again
+		final long globalModVersion = executionGraph.getGlobalModVersion();
+		final Exception testException = new Exception("Test exception");
+		executionGraph.failGlobal(testException);
 
+		assertNotEquals(globalModVersion, executionGraph.getGlobalModVersion());
 		assertEquals(JobStatus.RESTARTING, executionGraph.getState());
+		assertEquals(testException, executionGraph.getFailureCause()); // we should have updated
the failure cause
 
 		// but it should fail when sending a SuppressRestartsException
-		executionGraph.failGlobal(new SuppressRestartsException(new Exception("Test exception")));
+		executionGraph.failGlobal(new SuppressRestartsException(new Exception("Suppress restart
exception")));
 
 		assertEquals(JobStatus.FAILED, executionGraph.getState());
 
@@ -254,9 +269,7 @@ public void testCancelWhileFailing() throws Exception {
 		assertEquals(JobStatus.CANCELLING, graph.getState());
 
 		// let all tasks finish cancelling
-		for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices())
{
-			vertex.getCurrentExecutionAttempt().cancelingComplete();
-		}
+		completeCanceling(graph);
 
 		assertEquals(JobStatus.CANCELED, graph.getState());
 	}
@@ -267,11 +280,7 @@ public void testFailWhileCanceling() throws Exception {
 		final ExecutionGraph graph = createExecutionGraph(restartStrategy).f0;
 
 		assertEquals(JobStatus.RUNNING, graph.getState());
-
-		// switch all tasks to running
-		for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices())
{
-			vertex.getCurrentExecutionAttempt().switchToRunning();
-		}
+		switchAllTasksToRunning(graph);
 
 		graph.cancel();
 
@@ -282,13 +291,15 @@ public void testFailWhileCanceling() throws Exception {
 		assertEquals(JobStatus.FAILING, graph.getState());
 
 		// let all tasks finish cancelling
-		for (ExecutionVertex vertex : graph.getVerticesTopologically().iterator().next().getTaskVertices())
{
-			vertex.getCurrentExecutionAttempt().cancelingComplete();
-		}
+		completeCanceling(graph);
 
 		assertEquals(JobStatus.FAILED, graph.getState());
 	}
 
+	private void switchAllTasksToRunning(ExecutionGraph graph) {
+		executeOperationForAllExecutions(graph, Execution::switchToRunning);
+	}
+
 	@Test
 	public void testNoRestartOnSuppressException() throws Exception {
 		final ExecutionGraph eg = createExecutionGraph(new FixedDelayRestartStrategy(Integer.MAX_VALUE,
0)).f0;
@@ -299,9 +310,7 @@ public void testNoRestartOnSuppressException() throws Exception {
 
 		assertEquals(JobStatus.FAILING, eg.getState());
 
-		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-			vertex.getCurrentExecutionAttempt().cancelingComplete();
-		}
+		completeCanceling(eg);
 
 		eg.waitUntilTerminal();
 		assertEquals(JobStatus.FAILED, eg.getState());
@@ -330,7 +339,7 @@ public void testFailingExecutionAfterRestart() throws Exception {
 		JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task1", 1, NoOpInvokable.class);
 		JobVertex receiver = ExecutionGraphTestUtils.createJobVertex("Task2", 1, NoOpInvokable.class);
 		JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver);
-		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000), scheduler);
+		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 0L), scheduler);
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
@@ -766,6 +775,68 @@ public void testRestartWithSlotSharingAndNotEnoughResources() throws
Exception {
 		}
 	}
 
+	/**
+	 * Tests that the {@link ExecutionGraph} can handle concurrent failures while
+	 * being in the RESTARTING state.
+	 */
+	@Test
+	public void testConcurrentFailureWhileRestarting() throws Exception {
+		final long timeout = 5000L;
+
+		final CountDownLatch countDownLatch = new CountDownLatch(2);
+		final CountDownLatchRestartStrategy restartStrategy = new CountDownLatchRestartStrategy(countDownLatch);
+		final ExecutionGraph executionGraph = createSimpleExecutionGraph(restartStrategy, new TestingSlotProvider(ignored
-> new CompletableFuture<>()));
+
+		executionGraph.setQueuedSchedulingAllowed(true);
+		executionGraph.scheduleForExecution();
+
+		assertThat(executionGraph.getState(), is(JobStatus.RUNNING));
+
+		executionGraph.failGlobal(new FlinkException("Test exception"));
+
+		executor.execute(() -> {
+			countDownLatch.countDown();
+			try {
+				countDownLatch.await();
+			} catch (InterruptedException e) {
+				ExceptionUtils.rethrow(e);
+			}
+
+			executionGraph.failGlobal(new FlinkException("Concurrent exception"));
+		});
+
+		waitUntilJobStatus(executionGraph, JobStatus.RUNNING, timeout);
+	}
+
+	private static final class CountDownLatchRestartStrategy implements RestartStrategy {
+
+		private final CountDownLatch countDownLatch;
+
+		private CountDownLatchRestartStrategy(CountDownLatch countDownLatch) {
+			this.countDownLatch = countDownLatch;
+		}
+
+		@Override
+		public boolean canRestart() {
+			return true;
+		}
+
+		@Override
+		public void restart(RestartCallback restarter, ScheduledExecutor executor) {
+			executor.execute(() -> {
+				countDownLatch.countDown();
+
+				try {
+					countDownLatch.await();
+				} catch (InterruptedException e) {
+					ExceptionUtils.rethrow(e);
+				}
+
+				restarter.triggerFullRecovery();
+			});
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
@@ -846,14 +917,6 @@ public void run() {
 	}
 
 	private static Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy
restartStrategy) throws Exception {
-		return createExecutionGraph(restartStrategy, false);
-	}
-
-	private static Tuple2<ExecutionGraph, Instance> createSpyExecutionGraph(RestartStrategy
restartStrategy) throws Exception {
-		return createExecutionGraph(restartStrategy, true);
-	}
-
-	private static Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy
restartStrategy, boolean isSpy) throws Exception {
 		Instance instance = ExecutionGraphTestUtils.getInstance(
 			new ActorTaskManagerGateway(
 				new SimpleActorGateway(TestingUtils.directExecutionContext())),
@@ -862,15 +925,7 @@ public void run() {
 		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 		scheduler.newInstanceAvailable(instance);
 
-		JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", NUM_TASKS, NoOpInvokable.class);
-
-		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
-
-		ExecutionGraph eg = newExecutionGraph(restartStrategy, scheduler);
-		if (isSpy) {
-			eg = spy(eg);
-		}
-		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+		ExecutionGraph eg = createSimpleExecutionGraph(restartStrategy, scheduler);
 
 		assertEquals(JobStatus.CREATED, eg.getState());
 
@@ -879,7 +934,23 @@ public void run() {
 		return new Tuple2<>(eg, instance);
 	}
 
-	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, Scheduler
scheduler) throws IOException {
+	private static ExecutionGraph createSimpleExecutionGraph(RestartStrategy restartStrategy,
SlotProvider slotProvider) throws IOException, JobException {
+		JobGraph jobGraph = createJobGraph(NUM_TASKS);
+
+		ExecutionGraph eg = newExecutionGraph(restartStrategy, slotProvider);
+		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		return eg;
+	}
+
+	@Nonnull
+	private static JobGraph createJobGraph(int parallelism) {
+		JobVertex sender = ExecutionGraphTestUtils.createJobVertex("Task", parallelism, NoOpInvokable.class);
+
+		return new JobGraph("Pointwise job", sender);
+	}
+
+	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy, SlotProvider
slotProvider) throws IOException {
 		return new ExecutionGraph(
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
@@ -889,7 +960,7 @@ private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy,
 			new SerializedValue<>(new ExecutionConfig()),
 			AkkaUtils.getDefaultTimeout(),
 			restartStrategy,
-			scheduler);
+			slotProvider);
 	}
 
 	private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean
haltAfterRestart) throws InterruptedException, TimeoutException {
@@ -922,8 +993,10 @@ private static void waitForAllResourcesToBeAssignedAfterAsyncRestart(ExecutionGr
 
 	private static void waitForAsyncRestart(ExecutionGraph eg, FiniteDuration timeout) throws
InterruptedException {
 		Deadline deadline = timeout.fromNow();
+		long waitingTime = 10L;
 		while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
-			Thread.sleep(100);
+			Thread.sleep(waitingTime);
+			waitingTime = Math.min(waitingTime << 1, 100L);
 		}
 	}
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> ExecutionGraph Inconsistency prevents Job from recovering
> ---------------------------------------------------------
>
>                 Key: FLINK-9788
>                 URL: https://issues.apache.org/jira/browse/FLINK-9788
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.6.0
>         Environment: Rev: 4a06160
> Hadoop 2.8.3
>            Reporter: Gary Yao
>            Assignee: Till Rohrmann
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.7.0, 1.6.2, 1.5.5
>
>         Attachments: jobmanager_5000.log
>
>
> Deployment mode: YARN job mode with HA
> After killing many TaskManagers in succession, the state of the ExecutionGraph ran into
an inconsistent state, which prevented job recovery. The following stacktrace was logged in
the JobManager log several hundred times per second:
> {noformat}
> -08 16:47:18,855 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       
- Job General purpose test job (37a794195840700b98feb23e99f7ea24) switched from state RESTARTING
to RESTARTING.
> 2018-07-08 16:47:18,856 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph
       - Restarting the job General purpose test job (37a794195840700b98feb23e99f7ea24).
> 2018-07-08 16:47:18,857 DEBUG org.apache.flink.runtime.executiongraph.ExecutionGraph
       - Resetting execution vertex Source: Custom Source -> Timestamps/Watermarks (1/10)
for new execution.
> 2018-07-08 16:47:18,857 WARN  org.apache.flink.runtime.executiongraph.ExecutionGraph
       - Failed to restart the job.
> java.lang.IllegalStateException: Cannot reset a vertex that is in non-terminal state
CREATED
>         at org.apache.flink.runtime.executiongraph.ExecutionVertex.resetForNewExecution(ExecutionVertex.java:610)
>         at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.resetForNewExecution(ExecutionJobVertex.java:573)
>         at org.apache.flink.runtime.executiongraph.ExecutionGraph.restart(ExecutionGraph.java:1251)
>         at org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback.triggerFullRecovery(ExecutionGraphRestartCallback.java:59)
>         at org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy$1.run(FixedDelayRestartStrategy.java:68)
>         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>         at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>         at java.lang.Thread.run(Thread.java:748)
> {noformat}
> The resulting jobmanager log file was 4.7 GB in size. Find attached the first 5000 lines
of the log file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message