Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B6C3217C61 for ; Wed, 7 Oct 2015 14:11:13 +0000 (UTC) Received: (qmail 24777 invoked by uid 500); 7 Oct 2015 14:11:13 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 24740 invoked by uid 500); 7 Oct 2015 14:11:13 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 24731 invoked by uid 99); 7 Oct 2015 14:11:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 07 Oct 2015 14:11:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4617BE0329; Wed, 7 Oct 2015 14:11:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fhueske@apache.org To: commits@flink.apache.org Date: Wed, 07 Oct 2015 14:11:13 -0000 Message-Id: <01c5607c38554662a696cbd263e21826@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-2066][core] Add configuration of delay between execution retries at job level Repository: flink Updated Branches: refs/heads/master b489c3673 -> 8d62033c2 [FLINK-2066][core] Add configuration of delay between execution retries at job level This closes #1223 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a437a2b3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a437a2b3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a437a2b3 Branch: refs/heads/master Commit: a437a2b396eb473db5e649c3879c34b67fffc943 Parents: b489c36 Author: wangchx Authored: Sat Oct 3 17:53:31 2015 -0700 Committer: Fabian Hueske Committed: Wed Oct 7 12:56:46 2015 +0200 ---------------------------------------------------------------------- docs/apis/programming_guide.md | 2 ++ .../flink/api/common/ExecutionConfig.java | 25 +++++++++++++++++- .../java/org/apache/flink/api/common/Plan.java | 7 +++++ .../flink/api/java/ExecutionEnvironment.java | 25 ++++++++++++++++++ .../plantranslate/JobGraphGenerator.java | 1 + .../apache/flink/runtime/jobgraph/JobGraph.java | 27 ++++++++++++++++++++ .../flink/api/scala/ExecutionEnvironment.scala | 18 ++++++++++++- .../environment/StreamExecutionEnvironment.java | 24 +++++++++++++++++ .../api/graph/StreamingJobGraphGenerator.java | 15 +++++++++++ .../api/scala/StreamExecutionEnvironment.scala | 17 ++++++++++++ 10 files changed, 159 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/docs/apis/programming_guide.md ---------------------------------------------------------------------- diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md index 3959dc9..da141a9 100644 --- a/docs/apis/programming_guide.md +++ b/docs/apis/programming_guide.md @@ -1992,6 +1992,8 @@ With the closure cleaner disabled, it might happen that an anonymous user functi - `getNumberOfExecutionRetries()` / `setNumberOfExecutionRetries(int numberOfExecutionRetries)` Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of `-1` indicates that the system default value (as defined in the configuration) should be used. +- `getExecutionRetryDelay()` / `setExecutionRetryDelay(long executionRetryDelay)` Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more. + - `getExecutionMode()` / `setExecutionMode()`. The default execution mode is PIPELINED. Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner. - `enableForceKryo()` / **`disableForceKryo`**. Kryo is not forced by default. Forces the GenericTypeInformation to use the Kryo serializer for POJOS even though we could analyze them as a POJO. In some cases this might be preferable. For example, when Flink's internal serializers fail to handle a POJO properly. http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java index df0248a..9ed3e92 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java @@ -34,6 +34,7 @@ import java.util.Objects; *
  • The default parallelism of the program, i.e., how many parallel tasks to use for * all functions that do not define a specific value directly.
  • *
  • The number of retries in the case of failed executions.
  • + *
  • The delay between delay between execution retries.
  • *
  • The {@link ExecutionMode} of the program: Batch or Pipelined. * The default execution mode is {@link ExecutionMode#PIPELINED}
  • *
  • Enabling or disabling the "closure cleaner". The closure cleaner pre-processes @@ -92,6 +93,8 @@ public class ExecutionConfig implements Serializable { private long autoWatermarkInterval = 0; private boolean timestampsEnabled = false; + + private long executionRetryDelay = -1; // Serializers and types registered with Kryo and the PojoSerializer // we store them in linked maps/sets to ensure they are registered in order in all kryo instances. @@ -242,6 +245,13 @@ public class ExecutionConfig implements Serializable { public int getNumberOfExecutionRetries() { return numberOfExecutionRetries; } + + /** + * @return The delay between retires. + */ + public long getExecutionRetryDelay() { + return executionRetryDelay; + } /** * Sets the number of times that failed tasks are re-executed. A value of zero @@ -258,7 +268,20 @@ public class ExecutionConfig implements Serializable { this.numberOfExecutionRetries = numberOfExecutionRetries; return this; } - + + /** + * Sets the delay between executions. A value of {@code -1} indicates that the default value + * should be used. + * @param executionRetryDelay The number of milliseconds the system will wait to retry. + */ + public ExecutionConfig setExecutionRetryDelay(long executionRetryDelay) { + if (executionRetryDelay < -1 ) { + throw new IllegalArgumentException( + "The delay between reties must be non-negative, or -1 (use system default)"); + } + this.executionRetryDelay = executionRetryDelay; + return this; + } /** * Sets the execution mode to execute the program. The execution mode defines whether * data exchanges are performed in a batch or on a pipelined manner. http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-core/src/main/java/org/apache/flink/api/common/Plan.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java index e0d1eb8..dc8d152 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/Plan.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/Plan.java @@ -303,6 +303,13 @@ public class Plan implements Visitable> { } /** + * Gets the delay between retry failed task. + * @return The delay the system will wait to retry. + */ + public long getExecutionRetryDelay() { + return getExecutionConfig().getExecutionRetryDelay(); + } + /** * Gets the optimizer post-pass class for this job. The post-pass typically creates utility classes * for data types and is specific to a particular data model (record, tuple, Scala, ...) * http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index a596765..01fb15c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -199,6 +199,31 @@ public abstract class ExecutionEnvironment { } /** + * Sets the delay that failed tasks are re-executed in milliseconds. A value of + * zero effectively disables fault tolerance. A value of {@code -1} + * indicates that the system default value (as defined in the configuration) + * should be used. + * + * @param executionRetryDelay + * The delay of time the system will wait to re-execute failed + * tasks. + */ + public void setExecutionRetryDelay(long executionRetryDelay) { + config.setExecutionRetryDelay(executionRetryDelay); + } + + /** + * Gets the delay time in milliseconds the system will wait to re-execute failed tasks. + * A value of {@code -1} indicates that the system default value (as defined + * in the configuration) should be used. + * + * @return The delay time the system will wait to re-execute failed tasks. + */ + public long getExecutionRetryDelay() { + return config.getExecutionRetryDelay(); + } + + /** * Returns the {@link org.apache.flink.api.common.JobExecutionResult} of the last executed job. * * @return The execution result from the latest job execution. http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index c15e47a..afd0682 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -219,6 +219,7 @@ public class JobGraphGenerator implements Visitor { // create the job graph object JobGraph graph = new JobGraph(jobId, program.getJobName()); graph.setNumberOfExecutionRetries(program.getOriginalPlan().getNumberOfExecutionRetries()); + graph.setExecutionRetryDelay(program.getOriginalPlan().getExecutionRetryDelay()); graph.setAllowQueuedScheduling(false); graph.setSessionTimeout(program.getOriginalPlan().getSessionTimeout()); http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index e4a0209..4014a76 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -79,6 +79,8 @@ public class JobGraph implements Serializable { /** The number of times that failed tasks should be re-executed */ private int numExecutionRetries; + + private long executionRetryDelay; /** The number of seconds after which the corresponding ExecutionGraph is removed at the * job manager after it has been executed. */ @@ -211,6 +213,31 @@ public class JobGraph implements Serializable { public int getNumberOfExecutionRetries() { return numExecutionRetries; } + + /** + * Gets the delay of time the system will try to re-execute failed tasks. A value of + * {@code -1} indicates the system default value (as defined in the configuration) + * should be used. + * @return The delay of time in milliseconds the system will try to re-execute failed tasks. + */ + public long getExecutionRetryDelay() { + return executionRetryDelay; + } + + /** + * Sets the delay that failed tasks are re-executed. A value of zero + * effectively disables fault tolerance. A value of {@code -1} indicates that the system + * default value (as defined in the configuration) should be used. + * + * @param executionRetryDelay The delay of time the system will wait to re-execute failed tasks. + */ + public void setExecutionRetryDelay(long executionRetryDelay){ + if (executionRetryDelay < -1) { + throw new IllegalArgumentException( + "The delay between reties must be non-negative, or -1 (use system default)"); + } + this.executionRetryDelay = executionRetryDelay; + } /** * Gets the timeout after which the corresponding ExecutionGraph is removed at the http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index 3427225..e27d55a 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -60,7 +60,7 @@ import scala.reflect.ClassTag * - [[ExecutionEnvironment#createRemoteEnvironment]] * * Use [[ExecutionEnvironment#getExecutionEnvironment]] to get the correct environment depending - * on where the program is executed. If it is run inside an IDE a loca environment will be + * on where the program is executed. If it is run inside an IDE a local environment will be * created. If the program is submitted to a cluster a remote execution environment will * be created. */ @@ -110,6 +110,22 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries /** + * Sets the delay that failed tasks are re-executed. A value of + * zero effectively disables fault tolerance. A value of "-1" + * indicates that the system default value (as defined in the configuration) + * should be used. + */ + def setExecutionRetryDelay(executionRetryDelay: Long): Unit = { + javaEnv.setExecutionRetryDelay(executionRetryDelay) + } + + /** + * Gets the delay time in milliseconds the system will wait to re-execute failed tasks. + * A value of "-1" indicates that the system default value (as defined + * in the configuration) should be used. + */ + def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay + /** * Gets the UUID by which this environment is identified. The UUID sets the execution context * in the cluster or local environment. */ http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 598d0df..c2e2880 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -414,6 +414,30 @@ public abstract class StreamExecutionEnvironment { } /** + * Sets the delay that failed tasks are re-executed. A value of + * zero effectively disables fault tolerance. A value of {@code -1} + * indicates that the system default value (as defined in the configuration) + * should be used. + * + * @param executionRetryDelay + * The delay of time the system will wait to re-execute failed + * tasks. + */ + public void setExecutionRetryDelay(long executionRetryDelay){ + config.setExecutionRetryDelay(executionRetryDelay); + } + + /** + * Gets the delay time in milliseconds the system will wait to re-execute failed tasks. + * A value of {@code -1} indicates that the system default value (as defined + * in the configuration) should be used. + * + * @return The delay time the system will wait to re-execute failed tasks. + */ + public long getExecutionRetryDelay(){ + return config.getExecutionRetryDelay(); + } + /** * Sets the default parallelism that will be used for the local execution * environment created by {@link #createLocalEnvironment()}. * http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 8eb91a2..d8e81cf 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -99,6 +99,8 @@ public class StreamingJobGraphGenerator { configureCheckpointing(); configureExecutionRetries(); + + configureExecutionRetryDelay(); try { InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), this.jobGraph.getJobConfiguration(), ExecutionConfig.CONFIG_KEY); @@ -419,6 +421,10 @@ public class StreamingJobGraphGenerator { if(executionRetries == -1) { streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE); } + long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay(); + if(executionRetryDelay == -1) { + streamGraph.getExecutionConfig().setExecutionRetryDelay(100 * 1000); + } } } @@ -431,4 +437,13 @@ public class StreamingJobGraphGenerator { jobGraph.setNumberOfExecutionRetries(0); } } + + private void configureExecutionRetryDelay() { + long executionRetryDelay = streamGraph.getExecutionConfig().getExecutionRetryDelay(); + if (executionRetryDelay != -1) { + jobGraph.setExecutionRetryDelay(executionRetryDelay); + } else { + jobGraph.setExecutionRetryDelay(100 * 1000); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a437a2b3/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala index 2474d8c..7492e48 100644 --- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala +++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala @@ -210,6 +210,23 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) { */ def getNumberOfExecutionRetries = javaEnv.getNumberOfExecutionRetries + /** + * Sets the delay that failed tasks are re-executed. A value of + * zero effectively disables fault tolerance. A value of "-1" + * indicates that the system default value (as defined in the configuration) + * should be used. + */ + def setExecutionRetryDelay(executionRetryDelay: Long): Unit = { + javaEnv.setExecutionRetryDelay(executionRetryDelay) + } + + /** + * Gets the delay time in milliseconds the system will wait to re-execute failed tasks. + * A value of "-1" indicates that the system default value (as defined + * in the configuration) should be used. + */ + def getExecutionRetryDelay = javaEnv.getExecutionRetryDelay + // -------------------------------------------------------------------------------------------- // Registry for types and serializers // --------------------------------------------------------------------------------------------