flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/2] flink git commit: [hotfix] Rename UnrecoverableException to SuppressRestartsException
Date Wed, 09 Mar 2016 11:15:06 GMT
Repository: flink
Updated Branches:
  refs/heads/master ab17b7224 -> cdc8f0a9b


[hotfix] Rename UnrecoverableException to SuppressRestartsException


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec6ee043
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec6ee043
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec6ee043

Branch: refs/heads/master
Commit: ec6ee0430cb1bbf2f5ee954d39ee78d22a182ac2
Parents: ab17b72
Author: Ufuk Celebi <uce@apache.org>
Authored: Fri Feb 12 22:09:29 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Mar 9 12:12:55 2016 +0100

----------------------------------------------------------------------
 .../execution/SuppressRestartsException.java    | 37 ++++++++++++++++++++
 .../execution/UnrecoverableException.java       | 37 --------------------
 .../runtime/executiongraph/ExecutionGraph.java  |  8 ++---
 .../flink/runtime/jobmanager/JobManager.scala   |  6 ++--
 .../ExecutionGraphRestartTest.java              |  6 ++--
 .../test/checkpointing/SavepointITCase.java     |  4 +--
 6 files changed, 48 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ec6ee043/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
new file mode 100644
index 0000000..61a9064
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/SuppressRestartsException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.execution;
+
+/**
+ * Exception thrown in order to suppress job restarts.
+ *
+ * <p>This exception acts as a wrapper around the real cause and suppresses
+ * job restarts. The JobManager will <strong>not</strong> restart a job, which
+ * fails with this Exception.
+ */
+public class SuppressRestartsException extends RuntimeException {
+
+	private static final long serialVersionUID = 221873676920848349L;
+
+	public SuppressRestartsException(Throwable cause) {
+		super("Unrecoverable failure. This suppresses job restarts. Please check the " +
+				"stack trace for the root cause.", cause);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec6ee043/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java
deleted file mode 100644
index 5a6cd7e..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/UnrecoverableException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.execution;
-
-/**
- * Exception thrown on unrecoverable failures.
- *
- * <p>This exception acts as a wrapper around the real cause and suppresses
- * job restarts. The JobManager will <strong>not</strong> restart a job, which
- * fails with this Exception.
- */
-public class UnrecoverableException extends RuntimeException {
-
-	private static final long serialVersionUID = 221873676920848349L;
-
-	public UnrecoverableException(Throwable cause) {
-		super("Unrecoverable failure. This suppresses job restarts. Please check the " +
-				"stack trace for the root cause.", cause);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec6ee043/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index ed50bea..c17cf15 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.UnrecoverableException;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -1030,14 +1030,14 @@ public class ExecutionGraph implements Serializable {
 						}
 					}
 					else if (current == JobStatus.FAILING) {
-						boolean isRecoverable = !(failureCause instanceof UnrecoverableException);
+						boolean allowRestart = !(failureCause instanceof SuppressRestartsException);
 
-						if (isRecoverable && restartStrategy.canRestart() &&
+						if (allowRestart && restartStrategy.canRestart() &&
 								transitionState(current, JobStatus.RESTARTING)) {
 							restartStrategy.restart(this);
 							break;
 
-						} else if ((!isRecoverable || !restartStrategy.canRestart()) &&
+						} else if ((!allowRestart || !restartStrategy.canRestart()) &&
 							transitionState(current, JobStatus.FAILED, failureCause)) {
 							postRunCleanup();
 							break;

http://git-wip-us.apache.org/repos/asf/flink/blob/ec6ee043/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1c6fce8..6bda862 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -38,7 +38,7 @@ import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint._
 import org.apache.flink.runtime.client._
-import org.apache.flink.runtime.execution.UnrecoverableException
+import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.{RestartStrategy, RestartStrategyFactory}
 import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionJobVertex}
@@ -1122,12 +1122,10 @@ class JobManager(
                   executionGraph.restoreSavepoint(savepointPath)
                 } catch {
                   case e: Exception =>
-                    throw new UnrecoverableException(e)
+                    throw new SuppressRestartsException(e)
                 }
               }
             }
-
-            submittedJobGraphs.putJobGraph(new SubmittedJobGraph(jobGraph, jobInfo))
           }
 
           jobInfo.client ! decorateMessage(JobSubmitSuccess(jobGraph.getJobID))

http://git-wip-us.apache.org/repos/asf/flink/blob/ec6ee043/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index b1f11fb..646e195 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.execution.UnrecoverableException;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
@@ -337,7 +337,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	}
 
 	@Test
-	public void testNoRestartOnUnrecoverableException() throws Exception {
+	public void testNoRestartOnSuppressException() throws Exception {
 		Instance instance = ExecutionGraphTestUtils.getInstance(
 				new SimpleActorGateway(TestingUtils.directExecutionContext()),
 				NUM_TASKS);
@@ -369,7 +369,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		// Fail with unrecoverable Exception
 		eg.getAllExecutionVertices().iterator().next().fail(
-				new UnrecoverableException(new Exception("Test Exception")));
+				new SuppressRestartsException(new Exception("Test Exception")));
 
 		assertEquals(JobStatus.FAILING, eg.getState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ec6ee043/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 5386353..f98f4ab 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SavepointStoreFactory;
 import org.apache.flink.runtime.checkpoint.StateForTask;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
-import org.apache.flink.runtime.execution.UnrecoverableException;
+import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -726,7 +726,7 @@ public class SavepointITCase extends TestLogger {
 				flink.submitJobAndWait(jobGraph, false);
 			}
 			catch (Exception e) {
-				assertEquals(UnrecoverableException.class, e.getCause().getClass());
+				assertEquals(SuppressRestartsException.class, e.getCause().getClass());
 				assertEquals(IllegalArgumentException.class, e.getCause().getCause().getClass());
 			}
 		}


Mime
View raw message