flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-3534] [runtime] Prevent canceling Execution from failing
Date Mon, 29 Feb 2016 19:06:58 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.0 702627116 -> 70ce072a4


[FLINK-3534] [runtime] Prevent canceling Execution from failing

This closes #1735.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/70ce072a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/70ce072a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/70ce072a

Branch: refs/heads/release-1.0
Commit: 70ce072a484b0e4372f80f47440fdca702bb5042
Parents: 7026271
Author: Ufuk Celebi <uce@apache.org>
Authored: Mon Feb 29 15:37:45 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Feb 29 20:06:50 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |   5 +
 .../runtime/executiongraph/ExecutionGraph.java  |   2 +-
 .../ExecutionGraphRestartTest.java              | 105 ++++++++++++++++++-
 .../ExecutionVertexCancelTest.java              |  11 +-
 4 files changed, 114 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/70ce072a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index bc75664..6d5832b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -797,6 +797,11 @@ public class Execution implements Serializable {
 				return false;
 			}
 
+			if (current == CANCELING) {
+				cancelingComplete();
+				return false;
+			}
+
 			if (transitionState(current, FAILED, t)) {
 				// success (in a manner of speaking)
 				this.failureCause = t;

http://git-wip-us.apache.org/repos/asf/flink/blob/70ce072a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0d6de98..ed50bea 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -809,7 +809,7 @@ public class ExecutionGraph implements Serializable {
 	public void fail(Throwable t) {
 		while (true) {
 			JobStatus current = state;
-			if (current == JobStatus.FAILED || current == JobStatus.FAILING) {
+			if (current == JobStatus.FAILING || current.isTerminalState()) {
 				return;
 			}
 			else if (transitionState(current, JobStatus.FAILING, t)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/70ce072a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 925b574..b1f11fb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -310,7 +311,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		while (deadline.hasTimeLeft() && !success) {
 			success = true;
 			for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
-				if (vertex.getExecutionState() != ExecutionState.FAILED) {
+				ExecutionState state = vertex.getExecutionState();
+				if (state != ExecutionState.FAILED && state != ExecutionState.CANCELED) {
 					success = false;
 					Thread.sleep(100);
 					break;
@@ -490,6 +492,107 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		assertEquals(JobStatus.FINISHED, eg.getState());
 	}
 
+	/**
+	 * Tests that a graph is not restarted after cancellation via a call to
+	 * {@link ExecutionGraph#fail(Throwable)}. This can happen when a slot is
+	 * released concurrently with cancellation.
+	 */
+	@Test
+	public void testFailExecutionAfterCancel() throws Exception {
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new SimpleActorGateway(TestingUtils.directExecutionContext()),
+				2);
+
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		scheduler.newInstanceAvailable(instance);
+
+		JobVertex vertex = new JobVertex("Test Vertex");
+		vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+		vertex.setParallelism(1);
+
+		JobGraph jobGraph = new JobGraph("Test Job", vertex);
+		jobGraph.setRestartStrategyConfiguration(RestartStrategies.fixedDelayRestart(
+				Integer.MAX_VALUE, Integer.MAX_VALUE));
+
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"test job",
+				new Configuration(),
+				AkkaUtils.getDefaultTimeout(),
+				new FixedDelayRestartStrategy(1, 1000000));
+
+		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, eg.getState());
+
+		eg.scheduleForExecution(scheduler);
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		// Fail right after cancel (for example with concurrent slot release)
+		eg.cancel();
+
+		for (ExecutionVertex v : eg.getAllExecutionVertices()) {
+			v.getCurrentExecutionAttempt().fail(new Exception("Test Exception"));
+		}
+
+		assertEquals(JobStatus.CANCELED, eg.getState());
+
+		Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
+
+		execution.cancelingComplete();
+		assertEquals(JobStatus.CANCELED, eg.getState());
+	}
+
+	/**
+	 * Tests that it is possible to fail a graph via a call to
+	 * {@link ExecutionGraph#fail(Throwable)} after cancellation.
+	 */
+	@Test
+	public void testFailExecutionGraphAfterCancel() throws Exception {
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new SimpleActorGateway(TestingUtils.directExecutionContext()),
+				2);
+
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		scheduler.newInstanceAvailable(instance);
+
+		JobVertex vertex = new JobVertex("Test Vertex");
+		vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+		vertex.setParallelism(1);
+
+		JobGraph jobGraph = new JobGraph("Test Job", vertex);
+		jobGraph.setRestartStrategyConfiguration(RestartStrategies.fixedDelayRestart(
+				Integer.MAX_VALUE, Integer.MAX_VALUE));
+
+		ExecutionGraph eg = new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"test job",
+				new Configuration(),
+				AkkaUtils.getDefaultTimeout(),
+				new FixedDelayRestartStrategy(1, 1000000));
+
+		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, eg.getState());
+
+		eg.scheduleForExecution(scheduler);
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		// Fail right after cancel (for example with concurrent slot release)
+		eg.cancel();
+		assertEquals(JobStatus.CANCELLING, eg.getState());
+
+		eg.fail(new Exception("Test Exception"));
+		assertEquals(JobStatus.FAILING, eg.getState());
+
+		Execution execution = eg.getAllExecutionVertices().iterator().next().getCurrentExecutionAttempt();
+
+		execution.cancelingComplete();
+		assertEquals(JobStatus.RESTARTING, eg.getState());
+	}
+
 	private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean
haltAfterRestart) throws InterruptedException {
 
 		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));

http://git-wip-us.apache.org/repos/asf/flink/blob/70ce072a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 2daa62e..5c87315 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -399,15 +399,13 @@ public class ExecutionVertexCancelTest {
 
 			vertex.cancel();
 
-			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
+			// Callback fails, leading to CANCELED
+			assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 
 			assertTrue(slot.isReleased());
 
-			assertNotNull(vertex.getFailureCause());
-
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CREATED) > 0);
 			assertTrue(vertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
-			assertTrue(vertex.getStateTimestamp(ExecutionState.FAILED) > 0);
 		} catch (Exception e) {
 			e.printStackTrace();
 			fail(e.getMessage());
@@ -551,8 +549,7 @@ public class ExecutionVertexCancelTest {
 				Exception failureCause = new Exception("test exception");
 
 				vertex.fail(failureCause);
-				assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
-				assertEquals(failureCause, vertex.getFailureCause());
+				assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 
 				assertTrue(slot.isReleased());
 			}
@@ -592,4 +589,4 @@ public class ExecutionVertexCancelTest {
 			return result;
 		}
 	}
-}
\ No newline at end of file
+}


Mime
View raw message