flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [1/2] flink git commit: [FLINK-2066][core] Add configuration of delay between execution retries at job level
Date Wed, 07 Oct 2015 14:11:13 GMT
Repository: flink
Updated Branches:
  refs/heads/master b489c3673 -> 8d62033c2


[FLINK-2066][core] Add configuration of delay between execution retries at job level

This closes #1223


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a437a2b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a437a2b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a437a2b3

Branch: refs/heads/master
Commit: a437a2b396eb473db5e649c3879c34b67fffc943
Parents: b489c36
Author: wangchx <wcxzjtz@gmail.com>
Authored: Sat Oct 3 17:53:31 2015 -0700
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Wed Oct 7 12:56:46 2015 +0200

----------------------------------------------------------------------
 docs/apis/programming_guide.md                  |  2 ++
 .../flink/api/common/ExecutionConfig.java       | 25 +++++++++++++++++-
 .../java/org/apache/flink/api/common/Plan.java  |  7 +++++
 .../flink/api/java/ExecutionEnvironment.java    | 25 ++++++++++++++++++
 .../plantranslate/JobGraphGenerator.java        |  1 +
 .../apache/flink/runtime/jobgraph/JobGraph.java | 27 ++++++++++++++++++++
 .../flink/api/scala/ExecutionEnvironment.scala  | 18 ++++++++++++-
 .../environment/StreamExecutionEnvironment.java | 24 +++++++++++++++++
 .../api/graph/StreamingJobGraphGenerator.java   | 15 +++++++++++
 .../api/scala/StreamExecutionEnvironment.scala  | 17 ++++++++++++
 10 files changed, 159 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 3959dc9..da141a9 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -1992,6 +1992,8 @@ With the closure cleaner disabled, it might happen that an anonymous
user functi
 
 - `getNumberOfExecutionRetries()` / `setNumberOfExecutionRetries(int numberOfExecutionRetries)`
Sets the number of times that failed tasks are re-executed. A value of zero effectively disables
fault tolerance. A value of `-1` indicates that the system default value (as defined in the
configuration) should be used.
 
+- `getExecutionRetryDelay()` / `setExecutionRetryDelay(long executionRetryDelay)` Sets the
delay in milliseconds that the system waits after a job has failed, before re-executing it.
The delay starts after all tasks have been successfully been stopped on the TaskManagers,
and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution
in order to let certain time-out related failures surface fully (like broken connections that
have not fully timed out), before attempting a re-execution and immediately failing again
due to the same problem. This parameter only has an effect if the number of execution re-tries
is one or more.
+
 - `getExecutionMode()` / `setExecutionMode()`. The default execution mode is PIPELINED. Sets
the execution mode to execute the program. The execution mode defines whether data exchanges
are performed in a batch or on a pipelined manner. 
 
 - `enableForceKryo()` / **`disableForceKryo`**. Kryo is not forced by default. Forces the
GenericTypeInformation to use the Kryo serializer for POJOS even though we could analyze them
as a POJO. In some cases this might be preferable. For example, when Flink's internal serializers
fail to handle a POJO properly.

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index df0248a..9ed3e92 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -34,6 +34,7 @@ import java.util.Objects;
  *     <li>The default parallelism of the program, i.e., how many parallel tasks to
use for
  *         all functions that do not define a specific value directly.</li>
  *     <li>The number of retries in the case of failed executions.</li>
+ *     <li>The delay between delay between execution retries.</li>
  *     <li>The {@link ExecutionMode} of the program: Batch or Pipelined.
  *         The default execution mode is {@link ExecutionMode#PIPELINED}</li>
  *     <li>Enabling or disabling the "closure cleaner". The closure cleaner pre-processes
@@ -92,6 +93,8 @@ public class ExecutionConfig implements Serializable {
 	private long autoWatermarkInterval = 0;
 
 	private boolean timestampsEnabled = false;
+	
+	private long executionRetryDelay = -1;
 
 	// Serializers and types registered with Kryo and the PojoSerializer
 	// we store them in linked maps/sets to ensure they are registered in order in all kryo
instances.
@@ -242,6 +245,13 @@ public class ExecutionConfig implements Serializable {
 	public int getNumberOfExecutionRetries() {
 		return numberOfExecutionRetries;
 	}
+	
+	/**
+	 * @return The delay between retires.
+	 */
+	public long getExecutionRetryDelay() {
+		return executionRetryDelay;
+	}
 
 	/**
 	 * Sets the number of times that failed tasks are re-executed. A value of zero
@@ -258,7 +268,20 @@ public class ExecutionConfig implements Serializable {
 		this.numberOfExecutionRetries = numberOfExecutionRetries;
 		return this;
 	}
-
+	
+	/**
+	 * Sets the delay between executions. A value of {@code -1} indicates that the default value

+	 * should be used.
+	 * @param executionRetryDelay The number of milliseconds the system will wait to retry.
+	 */
+	public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) {
+		if (executionRetryDelay < -1 ) {
+			throw new IllegalArgumentException(
+					"The delay between reties must be non-negative, or -1 (use system default)");
+		}
+		this.executionRetryDelay = executionRetryDelay;
+		return this;
+	}
 	/**
 	 * Sets the execution mode to execute the program. The execution mode defines whether
 	 * data exchanges are performed in a batch or on a pipelined manner.

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
index e0d1eb8..dc8d152 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java
@@ -303,6 +303,13 @@ public class Plan implements Visitable<Operator<?>> {
 	}
 	
 	/**
+	 * Gets the delay between retry failed task.
+	 * @return The delay the system will wait to retry.
+	 */
+	public long getExecutionRetryDelay() {
+		return getExecutionConfig().getExecutionRetryDelay();
+	}
+	/**
 	 * Gets the optimizer post-pass class for this job. The post-pass typically creates utility
classes
 	 * for data types and is specific to a particular data model (record, tuple, Scala, ...)
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index a596765..01fb15c 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -199,6 +199,31 @@ public abstract class ExecutionEnvironment {
 	}
 	
 	/**
+	 * Sets the delay that failed tasks are re-executed in milliseconds. A value of
+	 * zero effectively disables fault tolerance. A value of {@code -1}
+	 * indicates that the system default value (as defined in the configuration)
+	 * should be used.
+	 *
+	 * @param executionRetryDelay
+	 * 		The delay of time the system will wait to re-execute failed
+	 * 		tasks.
+	 */
+	public void setExecutionRetryDelay(long executionRetryDelay) {
+		config.setExecutionRetryDelay(executionRetryDelay);
+	}
+	
+	/**
+	 * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
+	 * A value of {@code -1} indicates that the system default value (as defined
+	 * in the configuration) should be used.
+	 *
+	 * @return The delay time the system will wait to re-execute failed tasks.
+	 */
+	public long getExecutionRetryDelay() {
+		return config.getExecutionRetryDelay();
+	}
+	
+	/**
 	 * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed
job.
 	 * 
 	 * @return The execution result from the latest job execution.

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index c15e47a..afd0682 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -219,6 +219,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		// create the job graph object
 		JobGraph graph = new JobGraph(jobId, program.getJobName());
 		graph.setNumberOfExecutionRetries(program.getOriginalPlan().getNumberOfExecutionRetries());
+		graph.setExecutionRetryDelay(program.getOriginalPlan().getExecutionRetryDelay());
 		graph.setAllowQueuedScheduling(false);
 		graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index e4a0209..4014a76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -79,6 +79,8 @@ public class JobGraph implements Serializable {
 	
 	/** The number of times that failed tasks should be re-executed */
 	private int numExecutionRetries;
+	
+	private long executionRetryDelay;
 
 	/** The number of seconds after which the corresponding ExecutionGraph is removed at the
 	 * job manager after it has been executed. */
@@ -211,6 +213,31 @@ public class JobGraph implements Serializable {
 	public int getNumberOfExecutionRetries() {
 		return numExecutionRetries;
 	}
+	
+	/**
+	 * Gets the delay of time the system will try to re-execute failed tasks. A value of
+	 * {@code -1} indicates the system default value (as defined in the configuration)
+	 * should be used.
+	 * @return The delay of time in milliseconds the system will try to re-execute failed tasks.
+	 */
+	public long getExecutionRetryDelay() {
+		return executionRetryDelay;
+	}
+	
+	/**
+	 * Sets the delay that failed tasks are re-executed. A value of zero
+	 * effectively disables fault tolerance. A value of {@code -1} indicates that the system
+	 * default value (as defined in the configuration) should be used.
+	 * 
+	 * @param executionRetryDelay The delay of time the system will wait to re-execute failed
tasks.
+	 */
+	public void setExecutionRetryDelay(long executionRetryDelay){
+		if (executionRetryDelay < -1) {
+			throw new IllegalArgumentException(
+					"The delay between reties must be non-negative, or -1 (use system default)");
+		}
+		this.executionRetryDelay = executionRetryDelay;
+	}
 
 	/**
 	 * Gets the timeout after which the corresponding ExecutionGraph is removed at the

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index 3427225..e27d55a 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -60,7 +60,7 @@ import scala.reflect.ClassTag
  *  - [[ExecutionEnvironment#createRemoteEnvironment]]
  *
  *  Use [[ExecutionEnvironment#getExecutionEnvironment]] to get the correct environment depending
- *  on where the program is executed. If it is run inside an IDE a loca environment will
be
+ *  on where the program is executed. If it is run inside an IDE a local environment will
be
  *  created. If the program is submitted to a cluster a remote execution environment will
  *  be created.
  */
@@ -110,6 +110,22 @@ class ExecutionEnvironment(javaEnv: JavaEnv) {
   def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
 
   /**
+   * Sets the delay that failed tasks are re-executed. A value of
+   * zero effectively disables fault tolerance. A value of "-1"
+   * indicates that the system default value (as defined in the configuration)
+   * should be used.
+   */
+  def setExecutionRetryDelay(executionRetryDelay: Long): Unit = {
+    javaEnv.setExecutionRetryDelay(executionRetryDelay)
+  }
+
+  /**
+   * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
+   * A value of "-1" indicates that the system default value (as defined
+   * in the configuration) should be used.
+   */
+  def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay
+  /**
    * Gets the UUID by which this environment is identified. The UUID sets the execution context
    * in the cluster or local environment.
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 598d0df..c2e2880 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -414,6 +414,30 @@ public abstract class StreamExecutionEnvironment {
 	}
 
 	/**
+	 * Sets the delay that failed tasks are re-executed. A value of
+	 * zero effectively disables fault tolerance. A value of {@code -1}
+	 * indicates that the system default value (as defined in the configuration)
+	 * should be used.
+	 *
+	 * @param executionRetryDelay
+	 * 		The delay of time the system will wait to re-execute failed
+	 * 		tasks.
+	 */
+	public void setExecutionRetryDelay(long executionRetryDelay){
+		config.setExecutionRetryDelay(executionRetryDelay);
+	}
+	
+	/**
+	 * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
+	 * A value of {@code -1} indicates that the system default value (as defined
+	 * in the configuration) should be used.
+	 *
+	 * @return The delay time the system will wait to re-execute failed tasks.
+	 */
+	public long getExecutionRetryDelay(){
+		return config.getExecutionRetryDelay();
+	}
+	/**
 	 * Sets the default parallelism that will be used for the local execution
 	 * environment created by {@link #createLocalEnvironment()}.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 8eb91a2..d8e81cf 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -99,6 +99,8 @@ public class StreamingJobGraphGenerator {
 		configureCheckpointing();
 
 		configureExecutionRetries();
+		
+		configureExecutionRetryDelay();
 
 		try {
 			InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), this.jobGraph.getJobConfiguration(),
ExecutionConfig.CONFIG_KEY);
@@ -419,6 +421,10 @@ public class StreamingJobGraphGenerator {
 			if(executionRetries == -1) {
 				streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE);
 			}
+			long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
+			if(executionRetryDelay == -1) {
+				streamGraph.getExecutionConfig().setExecutionRetryDelay(100 * 1000);
+			}
 		}
 	}
 
@@ -431,4 +437,13 @@ public class StreamingJobGraphGenerator {
 			jobGraph.setNumberOfExecutionRetries(0);
 		}
 	}
+	
+	private void configureExecutionRetryDelay() {
+		long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay();
+		if (executionRetryDelay != -1) {
+			jobGraph.setExecutionRetryDelay(executionRetryDelay);
+		} else {
+			jobGraph.setExecutionRetryDelay(100 * 1000);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 2474d8c..7492e48 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -210,6 +210,23 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries
 
+  /**
+   * Sets the delay that failed tasks are re-executed. A value of
+   * zero effectively disables fault tolerance. A value of "-1"
+   * indicates that the system default value (as defined in the configuration)
+   * should be used.
+   */
+  def setExecutionRetryDelay(executionRetryDelay: Long): Unit = {
+    javaEnv.setExecutionRetryDelay(executionRetryDelay)
+  }
+
+  /**
+   * Gets the delay time in milliseconds the system will wait to re-execute failed tasks.
+   * A value of "-1" indicates that the system default value (as defined
+   * in the configuration) should be used.
+   */
+  def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay
+
   // --------------------------------------------------------------------------------------------
   // Registry for types and serializers
   // --------------------------------------------------------------------------------------------


Mime
View raw message