flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [7/9] flink git commit: [FLINK-3410] [restart] Choose NoRestart strategy if the number of retries is set to 0
Date Tue, 23 Feb 2016 14:21:50 GMT
[FLINK-3410] [restart] Choose NoRestart strategy if the number of retries is set to 0

Add test case

This closes #1643.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6323ed44
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6323ed44
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6323ed44

Branch: refs/heads/master
Commit: 6323ed44d6dd051b09f9de9f8fbb95495b82cad6
Parents: 77c1dee
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Feb 16 01:15:39 2016 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Feb 23 15:20:08 2016 +0100

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       | 16 ++--
 .../streaming/api/RestartStrategyTest.java      | 96 ++++++++++++++++++++
 2 files changed, 106 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6323ed44/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index b31106f..a592680 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -68,6 +68,8 @@ public class ExecutionConfig implements Serializable {
 	 */
 	public static final int PARALLELISM_AUTO_MAX = Integer.MAX_VALUE;
 
+	private static final long DEFAULT_RESTART_DELAY = 10000L;
+
 	// --------------------------------------------------------------------------------------------
 
 	/** Defines how data exchange happens - batch or pipelined */
@@ -81,7 +83,7 @@ public class ExecutionConfig implements Serializable {
 	 * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
 	 */
 	@Deprecated
-	private int numberOfExecutionRetries = 0;
+	private int numberOfExecutionRetries = -1;
 
 	private boolean forceKryo = false;
 
@@ -106,7 +108,7 @@ public class ExecutionConfig implements Serializable {
 	 * @deprecated Should no longer be used because it is subsumed by RestartStrategyConfiguration
 	 */
 	@Deprecated
-	private long executionRetryDelay = 0;
+	private long executionRetryDelay = DEFAULT_RESTART_DELAY;
 
 	private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration;
 	
@@ -284,6 +286,8 @@ public class ExecutionConfig implements Serializable {
 			// support the old API calls by creating a restart strategy from them
 			if (getNumberOfExecutionRetries() > 0 && getExecutionRetryDelay() >= 0)
{
 				return RestartStrategies.fixedDelayRestart(getNumberOfExecutionRetries(), getExecutionRetryDelay());
+			} else if (getNumberOfExecutionRetries() == 0) {
+				return RestartStrategies.noRestart();
 			} else {
 				return null;
 			}
@@ -342,8 +346,8 @@ public class ExecutionConfig implements Serializable {
 	}
 
 	/**
-	 * Sets the delay between executions. A value of {@code -1} indicates that the default value
-	 * should be used.
+	 * Sets the delay between executions.
+	 *
 	 * @param executionRetryDelay The number of milliseconds the system will wait to retry.
 	 *
 	 * @return The current execution configuration
@@ -354,9 +358,9 @@ public class ExecutionConfig implements Serializable {
 	 */
 	@Deprecated
 	public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) {
-		if (executionRetryDelay < -1 ) {
+		if (executionRetryDelay < 0 ) {
 			throw new IllegalArgumentException(
-				"The delay between reties must be non-negative, or -1 (use system default)");
+				"The delay between retries must be non-negative.");
 		}
 		this.executionRetryDelay = executionRetryDelay;
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/6323ed44/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
new file mode 100644
index 0000000..9f75727
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RestartStrategyTest {
+
+	/**
+	 * Tests that in a streaming use case where checkpointing is enabled, a
+	 * fixed delay with Integer.MAX_VALUE retries is instantiated if no other restart
+	 * strategy has been specified
+	 */
+	@Test
+	public void testAutomaticRestartingWhenCheckpointing() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.enableCheckpointing();
+
+		env.fromElements(1).print();
+
+		StreamGraph graph = env.getStreamGraph();
+		JobGraph jobGraph = graph.getJobGraph();
+
+		RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getRestartStrategyConfiguration();
+
+		Assert.assertNotNull(restartStrategy);
+		Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration);
+		Assert.assertEquals(Integer.MAX_VALUE, ((RestartStrategies.FixedDelayRestartStrategyConfiguration)
restartStrategy).getRestartAttempts());
+	}
+
+	/**
+	 * Checks that in a streaming use case where checkpointing is enabled and the number
+	 * of execution retries is set to 0, restarting is deactivated
+	 */
+	@Test
+	public void testNoRestartingWhenCheckpointingAndExplicitExecutionRetriesZero() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.enableCheckpointing();
+		env.setNumberOfExecutionRetries(0);
+
+		env.fromElements(1).print();
+
+		StreamGraph graph = env.getStreamGraph();
+		JobGraph jobGraph = graph.getJobGraph();
+
+		RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getRestartStrategyConfiguration();
+
+		Assert.assertNotNull(restartStrategy);
+		Assert.assertTrue(restartStrategy instanceof RestartStrategies.NoRestartStrategyConfiguration);
+	}
+
+	/**
+	 * Checks that in a streaming use case where checkpointing is enabled and the number
+	 * of execution retries is set to 42 and the delay to 1337, fixed delay restarting is used.
+	 */
+	@Test
+	public void testFixedRestartingWhenCheckpointingAndExplicitExecutionRetriesNonZero() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.enableCheckpointing();
+		env.setNumberOfExecutionRetries(42);
+		env.getConfig().setExecutionRetryDelay(1337);
+
+		env.fromElements(1).print();
+
+		StreamGraph graph = env.getStreamGraph();
+		JobGraph jobGraph = graph.getJobGraph();
+
+		RestartStrategies.RestartStrategyConfiguration restartStrategy = jobGraph.getRestartStrategyConfiguration();
+
+		Assert.assertNotNull(restartStrategy);
+		Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration);
+		Assert.assertEquals(42, ((RestartStrategies.FixedDelayRestartStrategyConfiguration)restartStrategy).getRestartAttempts());
+		Assert.assertEquals(1337, ((RestartStrategies.FixedDelayRestartStrategyConfiguration)restartStrategy).getDelayBetweenAttempts());
+	}
+}


Mime
View raw message