flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [01/11] flink git commit: [FLINK-3050] [runtime] Add UnrecoverableException to suppress job restarts
Date Mon, 11 Jan 2016 15:31:22 GMT
Repository: flink
Updated Branches:
  refs/heads/master 009146c7e -> 77348858f


[FLINK-3050] [runtime] Add UnrecoverableException to suppress job restarts

This closes #1461.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ebbc85da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ebbc85da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ebbc85da

Branch: refs/heads/master
Commit: ebbc85da7b78c1acdac4d184bddd6dfecf1338b2
Parents: 009146c
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Dec 16 14:09:22 2015 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Jan 11 16:30:25 2016 +0100

----------------------------------------------------------------------
 .../execution/UnrecoverableException.java       | 37 ++++++++++++
 .../runtime/executiongraph/ExecutionGraph.java  | 11 +++-
 .../ExecutionGraphRestartTest.java              | 63 +++++++++++++++++++-
 3 files changed, 108 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ebbc85da/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java
new file mode 100644
index 0000000..5a6cd7e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution;
+
+/**
+ * Exception thrown on unrecoverable failures.
+ *
+ * <p>This exception acts as a wrapper around the real cause and suppresses
+ * job restarts. The JobManager will <strong>not</strong> restart a job, which
+ * fails with this Exception.
+ */
+public class UnrecoverableException extends RuntimeException {
+
+	private static final long serialVersionUID = 221873676920848349L;
+
+	public UnrecoverableException(Throwable cause) {
+		super("Unrecoverable failure. This suppresses job restarts. Please check the " +
+				"stack trace for the root cause.", cause);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebbc85da/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 663f588..9767968 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.UnrecoverableException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -937,7 +938,11 @@ public class ExecutionGraph implements Serializable {
 						}
 					}
 					else if (current == JobStatus.FAILING) {
-						if (numberOfRetriesLeft > 0 && transitionState(current, JobStatus.RESTARTING))
{
+						boolean isRecoverable = !(failureCause instanceof UnrecoverableException);
+
+						if (isRecoverable && numberOfRetriesLeft > 0 &&
+								transitionState(current, JobStatus.RESTARTING)) {
+
 							numberOfRetriesLeft--;
 							
 							if (delayBeforeRetrying > 0) {
@@ -966,7 +971,9 @@ public class ExecutionGraph implements Serializable {
 							}
 							break;
 						}
-						else if (numberOfRetriesLeft <= 0 && transitionState(current, JobStatus.FAILED,
failureCause)) {
+						else if ((!isRecoverable || numberOfRetriesLeft <= 0) &&
+								transitionState(current, JobStatus.FAILED, failureCause)) {
+
 							postRunCleanup();
 							break;
 						}

http://git-wip-us.apache.org/repos/asf/flink/blob/ebbc85da/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index a50aa2e..127ae33 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.UnrecoverableException;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -37,17 +38,20 @@ import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 public class ExecutionGraphRestartTest {
 
 	private final static int NUM_TASKS = 31;
 
 	@Test
-	public void testNotRestartManually() throws Exception {
+	public void testNoManualRestart() throws Exception {
 		Instance instance = ExecutionGraphTestUtils.getInstance(
 				new SimpleActorGateway(TestingUtils.directExecutionContext()),
 				NUM_TASKS);
@@ -83,6 +87,7 @@ public class ExecutionGraphRestartTest {
 
 		assertEquals(JobStatus.FAILED, eg.getState());
 
+		// This should not restart the graph.
 		eg.restart();
 
 		assertEquals(JobStatus.FAILED, eg.getState());
@@ -299,4 +304,60 @@ public class ExecutionGraphRestartTest {
 
 		assertEquals(JobStatus.CANCELED, executionGraph.getState());
 	}
+
+	@Test
+	public void testNoRestartOnUnrecoverableException() throws Exception {
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new SimpleActorGateway(TestingUtils.directExecutionContext()),
+				NUM_TASKS);
+
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		scheduler.newInstanceAvailable(instance);
+
+		JobVertex sender = new JobVertex("Task");
+		sender.setInvokableClass(Tasks.NoOpInvokable.class);
+		sender.setParallelism(NUM_TASKS);
+
+		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
+
+		ExecutionGraph eg = spy(new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"Test job",
+				new Configuration(),
+				AkkaUtils.getDefaultTimeout()));
+
+		eg.setNumberOfRetriesLeft(1);
+		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, eg.getState());
+
+		eg.scheduleForExecution(scheduler);
+
+		assertEquals(JobStatus.RUNNING, eg.getState());
+
+		// Fail with unrecoverable Exception
+		eg.getAllExecutionVertices().iterator().next().fail(
+				new UnrecoverableException(new Exception("Test Exception")));
+
+		assertEquals(JobStatus.FAILING, eg.getState());
+
+		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
+		}
+
+		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
+
+		// Wait for async restart
+		Deadline deadline = timeout.fromNow();
+		while (deadline.hasTimeLeft() && eg.getState() != JobStatus.FAILED) {
+			Thread.sleep(100);
+		}
+
+		assertEquals(JobStatus.FAILED, eg.getState());
+
+		// No restart
+		verify(eg, never()).restart();
+		assertEquals(1, eg.getNumberOfRetriesLeft());
+	}
 }


Mime
View raw message