flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-3190] failure rate restart strategy
Date Mon, 11 Jul 2016 22:02:10 GMT
Repository: flink
Updated Branches:
  refs/heads/master 81cf22966 -> a7274d566


[FLINK-3190] failure rate restart strategy

Add toString method to Time

Reintroduce org.apache.flink.streaming.api.windowing.time.Time for backwards compatibility

Remove Duration from FailureRateRestarStrategy

This closes #1954.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7274d56
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7274d56
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7274d56

Branch: refs/heads/master
Commit: a7274d5661421a66dcb998efd72b547301e1dc0f
Parents: 81cf229
Author: Michal Fijolek <michalfijolek91@gmail.com>
Authored: Sun Mar 13 01:40:15 2016 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Jul 12 00:00:38 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/fault_tolerance.md          |  91 ++++-
 docs/setup/config.md                            |  12 +-
 .../restartstrategy/RestartStrategies.java      |  80 +++-
 .../org/apache/flink/api/common/time/Time.java  | 132 +++++++
 .../flink/configuration/ConfigConstants.java    |  26 +-
 .../restart/ExecutionGraphRestarter.java        |  45 +++
 .../restart/FailureRateRestartStrategy.java     | 114 ++++++
 .../restart/FixedDelayRestartStrategy.java      |  33 +-
 .../restart/RestartStrategyFactory.java         |  13 +-
 .../ExecutionGraphRestartTest.java              | 382 +++++++------------
 .../streaming/api/RestartStrategyTest.java      |   2 +-
 .../api/scala/WindowReduceITCase.scala          |   2 +-
 .../WindowCheckpointingITCase.java              |   2 -
 ...SimpleRecoveryFailureRateStrategyITBase.java |  41 ++
 ...RecoveryFixedDelayRestartStrategyITBase.java |  40 ++
 .../test/recovery/SimpleRecoveryITCase.java     | 292 --------------
 .../test/recovery/SimpleRecoveryITCaseBase.java | 263 +++++++++++++
 17 files changed, 980 insertions(+), 590 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/docs/apis/streaming/fault_tolerance.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/fault_tolerance.md b/docs/apis/streaming/fault_tolerance.md
index 89c2557..80b25ef 100644
--- a/docs/apis/streaming/fault_tolerance.md
+++ b/docs/apis/streaming/fault_tolerance.md
@@ -244,6 +244,10 @@ The description of each restart strategy contains more information about the res
         <td>fixed-delay</td>
     </tr>
     <tr>
+        <td>Failure rate</td>
+        <td>failure-rate</td>
+    </tr>
+    <tr>
         <td>No restart</td>
         <td>none</td>
     </tr>
@@ -261,18 +265,18 @@ In case of a failure the system tries to restart the job 3 times and waits 10 se
 <div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.fixedDelay(
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
   3, // number of restart attempts 
-  10000 // delay in milliseconds
+  Time.of(10, TimeUnit.SECONDS) // delay
 ));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.fixedDelay(
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
   3, // number of restart attempts 
-  10000 // delay in milliseconds
+  Time.of(10, TimeUnit.SECONDS) // delay
 ))
 {% endhighlight %}
 </div>
@@ -325,18 +329,18 @@ The fixed delay restart strategy can also be set programmatically:
 <div data-lang="java" markdown="1">
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-env.setRestartStrategy(RestartStrategies.fixedDelay(
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
   3, // number of restart attempts 
-  10000 // delay in milliseconds
+  Time.of(10, TimeUnit.SECONDS) // delay
 ));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
 val env = ExecutionEnvironment.getExecutionEnvironment()
-env.setRestartStrategy(RestartStrategies.fixedDelay(
+env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
   3, // number of restart attempts 
-  10000 // delay in milliseconds
+  Time.of(10, TimeUnit.SECONDS) // delay
 ))
 {% endhighlight %}
 </div>
@@ -358,6 +362,77 @@ The default value is the value of *akka.ask.timeout*.
 
 {% top %}
 
+### Failure Rate Restart Strategy
+
+The failure rate restart strategy restarts job after failure, but when `failure rate` (failures per time interval) is exceeded, the job eventually fails.
+In-between two consecutive restart attempts, the restart strategy waits a fixed amount of time.
+
+This strategy is enabled as default by setting the following configuration parameter in `flink-conf.yaml`.
+
+~~~
+restart-strategy: failure-rate
+~~~
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 40%">Configuration Parameter</th>
+      <th class="text-left" style="width: 40%">Description</th>
+      <th class="text-left">Default Value</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td><it>restart-strategy.failure-rate.max-failures-per-interval</it></td>
+        <td>Maximum number of restarts in given time interval before failing a job</td>
+        <td>1</td>
+    </tr>
+    <tr>
+        <td><it>restart-strategy.failure-rate.failure-rate-interval</it></td>
+        <td>Time interval for measuring failure rate.</td>
+        <td>1 minute</td>
+    </tr>
+    <tr>
+        <td><it>restart-strategy.failure-rate.delay</it></td>
+        <td>Delay between two consecutive restart attempts</td>
+        <td><it>akka.ask.timeout</it></td>
+    </tr>
+  </tbody>
+</table>
+
+~~~
+restart-strategy.failure-rate.max-failures-per-interval: 3
+restart-strategy.failure-rate.failure-rate-interval: 5 min
+restart-strategy.failure-rate.delay: 10 s
+~~~
+
+The failure rate restart strategy can also be set programmatically:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+env.setRestartStrategy(RestartStrategies.failureRateRestart(
+  3, // max failures per interval
+  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
+  Time.of(10, TimeUnit.SECONDS) // delay
+));
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment()
+env.setRestartStrategy(RestartStrategies.failureRateRestart(
+  3, // max failures per unit
+  Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate
+  Time.of(10, TimeUnit.SECONDS) // delay
+))
+{% endhighlight %}
+</div>
+</div>
+
+{% top %}
+
 ### No Restart Strategy
 
 The job fails directly and no restart is attempted.

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/docs/setup/config.md
----------------------------------------------------------------------
diff --git a/docs/setup/config.md b/docs/setup/config.md
index 46c6c9a..1e4f83a 100644
--- a/docs/setup/config.md
+++ b/docs/setup/config.md
@@ -132,8 +132,9 @@ If you are on YARN, then it is sufficient to authenticate the client with Kerber
 - `blob.server.port`: Port definition for the blob server (serving user jar's) on the Taskmanagers. By default the port is set to 0, which means that the operating system is picking an ephemeral port. Flink also accepts a list of ports ("50100,50101"), ranges ("50100-50200") or a combination of both. It is recommended to set a range of ports to avoid collisions when multiple JobManagers are running on the same machine.
 
 - `restart-strategy`: Default restart strategy to use in case that no restart strategy has been specified for the submitted job.
-Currently, it can be chosen between using a fixed delay restart strategy and to turn it off.
+Currently, it can be chosen from fixed delay restart strategy, failure rate restart strategy or no restart strategy.
 To use the fixed delay strategy you have to specify "fixed-delay".
+To use the failure rate strategy you have to specify "failure-rate".
 To turn the restart behaviour off you have to specify "none".
 Default value "none".
 
@@ -143,6 +144,15 @@ Default value is 1.
 - `restart-strategy.fixed-delay.delay`: Delay between restart attempts, used if the default restart strategy is set to "fixed-delay".
 Default value is the `akka.ask.timeout`.
 
+- `restart-strategy.failure-rate.max-failures-per-interval`: Maximum number of restarts in given time interval before failing a job in "failure-rate" strategy. 
+Default value is 1.
+
+- `restart-strategy.failure-rate.failure-rate-interval`: Time interval for measuring failure rate in "failure-rate" strategy.
+Default value is `1 minute`.
+
+- `restart-strategy.failure-rate.delay`: Delay between restart attempts, used if the default restart strategy is set to "failure-rate".
+Default value is the `akka.ask.timeout`.
+
 ## Full Reference
 
 ### HDFS

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
index 12f9d08..d5db466 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/restartstrategy/RestartStrategies.java
@@ -19,8 +19,10 @@
 package org.apache.flink.api.common.restartstrategy;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.time.Time;
 
 import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This class defines methods to generate RestartStrategyConfigurations. These configurations are
@@ -47,11 +49,31 @@ public class RestartStrategies {
 	 * @param delayBetweenAttempts Delay in-between restart attempts for the FixedDelayRestartStrategy
 	 * @return FixedDelayRestartStrategy
 	 */
-	public static RestartStrategyConfiguration fixedDelayRestart(
-		int restartAttempts,
-		long delayBetweenAttempts) {
+	public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, long delayBetweenAttempts) {
+		return fixedDelayRestart(restartAttempts, Time.of(delayBetweenAttempts, TimeUnit.MILLISECONDS));
+	}
+
+	/**
+	 * Generates a FixedDelayRestartStrategyConfiguration.
+	 *
+	 * @param restartAttempts Number of restart attempts for the FixedDelayRestartStrategy
+	 * @param delayInterval Delay in-between restart attempts for the FixedDelayRestartStrategy
+	 * @return FixedDelayRestartStrategy
+	 */
+	public static RestartStrategyConfiguration fixedDelayRestart(int restartAttempts, Time delayInterval) {
+		return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayInterval);
+	}
 
-		return new FixedDelayRestartStrategyConfiguration(restartAttempts, delayBetweenAttempts);
+	/**
+	 * Generates a FailureRateRestartStrategyConfiguration.
+	 *
+	 * @param failureRate Maximum number of restarts in given interval {@code failureInterval} before failing a job
+	 * @param failureInterval Time interval for failures
+	 * @param delayInterval Delay in-between restart attempts
+	 */
+	public static FailureRateRestartStrategyConfiguration failureRateRestart(
+			int failureRate, Time failureInterval, Time delayInterval) {
+		return new FailureRateRestartStrategyConfiguration(failureRate, failureInterval, delayInterval);
 	}
 
 	public abstract static class RestartStrategyConfiguration implements Serializable {
@@ -80,24 +102,26 @@ public class RestartStrategies {
 		private static final long serialVersionUID = 4149870149673363190L;
 
 		private final int restartAttempts;
-		private final long delayBetweenAttempts;
+		private final Time delayBetweenAttemptsInterval;
 
-		FixedDelayRestartStrategyConfiguration(int restartAttempts, long delayBetweenAttempts) {
+		FixedDelayRestartStrategyConfiguration(int restartAttempts, Time delayBetweenAttemptsInterval) {
 			this.restartAttempts = restartAttempts;
-			this.delayBetweenAttempts = delayBetweenAttempts;
+			this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
 		}
 
 		public int getRestartAttempts() {
 			return restartAttempts;
 		}
 
-		public long getDelayBetweenAttempts() {
-			return delayBetweenAttempts;
+		public Time getDelayBetweenAttemptsInterval() {
+			return delayBetweenAttemptsInterval;
 		}
 
 		@Override
 		public int hashCode() {
-			return 31 * restartAttempts + (int)(delayBetweenAttempts ^ (delayBetweenAttempts >>> 32));
+			int result = restartAttempts;
+			result = 31 * result + (delayBetweenAttemptsInterval != null ? delayBetweenAttemptsInterval.hashCode() : 0);
+			return result;
 		}
 
 		@Override
@@ -105,7 +129,7 @@ public class RestartStrategies {
 			if (obj instanceof FixedDelayRestartStrategyConfiguration) {
 				FixedDelayRestartStrategyConfiguration other = (FixedDelayRestartStrategyConfiguration) obj;
 
-				return restartAttempts == other.restartAttempts && delayBetweenAttempts == other.delayBetweenAttempts;
+				return restartAttempts == other.restartAttempts && delayBetweenAttemptsInterval.equals(other.delayBetweenAttemptsInterval);
 			} else {
 				return false;
 			}
@@ -113,8 +137,40 @@ public class RestartStrategies {
 
 		@Override
 		public String getDescription() {
-			return "Restart with fixed delay (" + delayBetweenAttempts + " ms). #"
+			return "Restart with fixed delay (" + delayBetweenAttemptsInterval + " ms). #"
 				+ restartAttempts + " restart attempts.";
 		}
 	}
+
+	final public static class FailureRateRestartStrategyConfiguration extends RestartStrategyConfiguration {
+		private static final long serialVersionUID = 1195028697539661739L;
+		private final int maxFailureRate;
+
+		private final Time failureInterval;
+		private final Time delayBetweenAttemptsInterval;
+
+		public FailureRateRestartStrategyConfiguration(int maxFailureRate, Time failureInterval, Time delayBetweenAttemptsInterval) {
+			this.maxFailureRate = maxFailureRate;
+			this.failureInterval = failureInterval;
+			this.delayBetweenAttemptsInterval = delayBetweenAttemptsInterval;
+		}
+
+		public int getMaxFailureRate() {
+			return maxFailureRate;
+		}
+
+		public Time getFailureInterval() {
+			return failureInterval;
+		}
+
+		public Time getDelayBetweenAttemptsInterval() {
+			return delayBetweenAttemptsInterval;
+		}
+
+		@Override
+		public String getDescription() {
+			return "Failure rate restart with maximum of " + maxFailureRate + " failures within interval " + failureInterval.toString()
+					+ " and fixed delay " + delayBetweenAttemptsInterval.toString();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java b/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java
new file mode 100644
index 0000000..8883ddb
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/time/Time.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.time;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+import java.util.concurrent.TimeUnit;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The definition of a time interval.
+ *
+ * Note: This class should replace org.apache.flink.streaming.api.windowing.time.Time in Flink 2.0
+ */
+@PublicEvolving
+public final class Time implements Serializable {
+
+	private static final long serialVersionUID = -350254188460915999L;
+
+	/** The time unit for this policy's time interval */
+	private final TimeUnit unit;
+
+	/** The size of the windows generated by this policy */
+	private final long size;
+
+	/** Instantiation only via factory method. */
+	private Time(long size, TimeUnit unit) {
+		this.unit = checkNotNull(unit, "time unit may not be null");
+		this.size = size;
+
+	}
+
+	// ------------------------------------------------------------------------
+	//  Properties
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the time unit for this policy's time interval.
+	 * @return The time unit for this policy's time interval.
+	 */
+	public TimeUnit getUnit() {
+		return unit;
+	}
+
+	/**
+	 * Gets the length of this policy's time interval.
+	 * @return The length of this policy's time interval.
+	 */
+	public long getSize() {
+		return size;
+	}
+
+	/**
+	 * Converts the time interval to milliseconds.
+	 * @return The time interval in milliseconds.
+	 */
+	public long toMilliseconds() {
+		return unit.toMillis(size);
+	}
+
+	@Override
+	public String toString() {
+		return toMilliseconds() + " ms";
+	}
+
+	// ------------------------------------------------------------------------
+	//  Factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a new {@link Time} of the given duration and {@link TimeUnit}.
+	 *
+	 * @param size The duration of time.
+	 * @param unit The unit of time of the duration, for example {@code TimeUnit.SECONDS}.
+	 * @return The time policy.
+	 */
+	public static Time of(long size, TimeUnit unit) {
+		return new Time(size, unit);
+	}
+
+	/**
+	 * Creates a new {@link Time} that represents the given number of milliseconds.
+	 */
+	public static Time milliseconds(long milliseconds) {
+		return of(milliseconds, TimeUnit.MILLISECONDS);
+	}
+
+	/**
+	 * Creates a new {@link Time} that represents the given number of seconds.
+	 */
+	public static Time seconds(long seconds) {
+		return of(seconds, TimeUnit.SECONDS);
+	}
+
+	/**
+	 * Creates a new {@link Time} that represents the given number of minutes.
+	 */
+	public static Time minutes(long minutes) {
+		return of(minutes, TimeUnit.MINUTES);
+	}
+
+	/**
+	 * Creates a new {@link Time} that represents the given number of hours.
+	 */
+	public static Time hours(long hours) {
+		return of(hours, TimeUnit.HOURS);
+	}
+
+	/**
+	 * Creates a new {@link Time} that represents the given number of days.
+	 */
+	public static Time days(long days) {
+		return of(days, TimeUnit.DAYS);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 2945fff..5715796 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -43,7 +43,8 @@ public final class ConfigConstants {
 
 	/**
 	 * Defines the restart strategy to be used. It can be "off", "none", "disable" to be disabled or
-	 * it can be "fixeddelay", "fixed-delay" to use the FixedDelayRestartStrategy. You can also
+	 * it can be "fixeddelay", "fixed-delay" to use the FixedDelayRestartStrategy or it can
+	 * be "failurerate", "failure-rate" to use FailureRateRestartStrategy. You can also
 	 * specify a class name which implements the RestartStrategy interface and has a static
 	 * create method which takes a Configuration object.
 	 */
@@ -57,13 +58,34 @@ public final class ConfigConstants {
 	public static final String RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS = "restart-strategy.fixed-delay.attempts";
 
 	/**
-	 * Delay between two consecutive restart attempts. It can be specified using Scala's
+	 * Delay between two consecutive restart attempts in FixedDelayRestartStrategy. It can be specified using Scala's
 	 * FiniteDuration notation: "1 min", "20 s"
 	 */
 	@PublicEvolving
 	public static final String RESTART_STRATEGY_FIXED_DELAY_DELAY = "restart-strategy.fixed-delay.delay";
 
 	/**
+	 * Maximum number of restarts in given time interval {@link #RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL} before failing a job
+	 * in FailureRateRestartStrategy.
+	 */
+	@PublicEvolving
+	public static final String RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL = "restart-strategy.failure-rate.max-failures-per-interval";
+
+	/**
+	 * Time interval in which greater amount of failures than {@link #RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL} causes
+	 * job fail in FailureRateRestartStrategy. It can be specified using Scala's FiniteDuration notation: "1 min", "20 s"
+	 */
+	@PublicEvolving
+	public static final String RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL = "restart-strategy.failure-rate.failure-rate-interval";
+
+	/**
+	 * Delay between two consecutive restart attempts in FailureRateRestartStrategy.
+	 * It can be specified using Scala's FiniteDuration notation: "1 min", "20 s".
+	 */
+	@PublicEvolving
+	public static final String RESTART_STRATEGY_FAILURE_RATE_DELAY = "restart-strategy.failure-rate.delay";
+
+	/**
 	 * Config parameter for the number of re-tries for failed tasks. Setting this
 	 * value to 0 effectively disables fault tolerance.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java
new file mode 100644
index 0000000..9287694
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+
+class ExecutionGraphRestarter {
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class);
+	public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph, final long delayBetweenRestartAttemptsInMillis) {
+		return new Callable<Object>() {
+			@Override
+			public Object call() throws Exception {
+				try {
+					LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis);
+					// do the delay
+					Thread.sleep(delayBetweenRestartAttemptsInMillis);
+				} catch(InterruptedException e) {
+					// should only happen on shutdown
+				}
+				executionGraph.restart();
+				return null;
+			}
+		};
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
new file mode 100644
index 0000000..528eacb
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.Duration;
+
+import java.util.ArrayDeque;
+import java.util.concurrent.TimeUnit;
+
+import static akka.dispatch.Futures.future;
+
+/**
+ * Restart strategy which tries to restart the given {@link ExecutionGraph} when failure rate exceeded
+ * with a fixed time delay in between.
+ */
+public class FailureRateRestartStrategy implements RestartStrategy {
+	private final Time failuresInterval;
+	private final Time delayInterval;
+	private final int maxFailuresPerInterval;
+	private final ArrayDeque<Long> restartTimestampsDeque;
+
+	public FailureRateRestartStrategy(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) {
+		Preconditions.checkNotNull(failuresInterval, "Failures interval cannot be null.");
+		Preconditions.checkNotNull(delayInterval, "Delay interval cannot be null.");
+		Preconditions.checkArgument(maxFailuresPerInterval > 0, "Maximum number of restart attempts per time unit must be greater than 0.");
+		Preconditions.checkArgument(failuresInterval.getSize() > 0, "Failures interval must be greater than 0 ms.");
+		Preconditions.checkArgument(delayInterval.getSize() >= 0, "Delay interval must be at least 0 ms.");
+
+		this.failuresInterval = failuresInterval;
+		this.delayInterval = delayInterval;
+		this.maxFailuresPerInterval = maxFailuresPerInterval;
+		this.restartTimestampsDeque = new ArrayDeque<>(maxFailuresPerInterval);
+	}
+
+	@Override
+	public boolean canRestart() {
+		if (isRestartTimestampsQueueFull()) {
+			Long now = System.currentTimeMillis();
+			Long earliestFailure = restartTimestampsDeque.peek();
+
+			return (now - earliestFailure) > failuresInterval.toMilliseconds();
+		} else {
+			return true;
+		}
+	}
+
+	@Override
+	public void restart(final ExecutionGraph executionGraph) {
+		if (isRestartTimestampsQueueFull()) {
+			restartTimestampsDeque.remove();
+		}
+		restartTimestampsDeque.add(System.currentTimeMillis());
+		future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()), executionGraph.getExecutionContext());
+	}
+
+	private boolean isRestartTimestampsQueueFull() {
+		return restartTimestampsDeque.size() == maxFailuresPerInterval;
+	}
+
+	public static FailureRateRestartStrategyFactory createFactory(Configuration configuration) throws Exception {
+		int maxFailuresPerInterval = configuration.getInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1);
+		String failuresIntervalString = configuration.getString(
+				ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, Duration.apply(1, TimeUnit.MINUTES).toString()
+		);
+		String timeoutString = configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, ConfigConstants.DEFAULT_AKKA_ASK_TIMEOUT);
+		String delayString = configuration.getString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, timeoutString);
+
+		Duration failuresInterval = Duration.apply(failuresIntervalString);
+		Duration delay = Duration.apply(delayString);
+
+
+		return new FailureRateRestartStrategyFactory(maxFailuresPerInterval, Time.milliseconds(failuresInterval.toMillis()), Time.milliseconds(delay.toMillis()));
+	}
+
+	public static class FailureRateRestartStrategyFactory extends RestartStrategyFactory {
+		private static final long serialVersionUID = -373724639430960480L;
+
+		private final int maxFailuresPerInterval;
+		private final Time failuresInterval;
+		private final Time delayInterval;
+
+		public FailureRateRestartStrategyFactory(int maxFailuresPerInterval, Time failuresInterval, Time delayInterval) {
+			this.maxFailuresPerInterval = maxFailuresPerInterval;
+			this.failuresInterval = Preconditions.checkNotNull(failuresInterval);
+			this.delayInterval = Preconditions.checkNotNull(delayInterval);
+		}
+
+		@Override
+		public RestartStrategy createRestartStrategy() {
+			return new FailureRateRestartStrategy(maxFailuresPerInterval, failuresInterval, delayInterval);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index ac06379..8053e95 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -22,12 +22,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import scala.concurrent.duration.Duration;
 
-import java.util.concurrent.Callable;
-
 import static akka.dispatch.Futures.future;
 
 /**
@@ -35,9 +31,6 @@ import static akka.dispatch.Futures.future;
  * with a fixed time delay in between.
  */
 public class FixedDelayRestartStrategy implements RestartStrategy {
-	private static final Logger LOG = LoggerFactory.getLogger(FixedDelayRestartStrategy.class);
-
-
 	private final int maxNumberRestartAttempts;
 	private final long delayBetweenRestartAttempts;
 	private int currentRestartAttempt;
@@ -66,21 +59,7 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 	@Override
 	public void restart(final ExecutionGraph executionGraph) {
 		currentRestartAttempt++;
-
-		future(new Callable<Object>() {
-			@Override
-			public Object call() throws Exception {
-				try {
-					LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttempts);
-					// do the delay
-					Thread.sleep(delayBetweenRestartAttempts);
-				} catch(InterruptedException e) {
-					// should only happen on shutdown
-				}
-				executionGraph.restart();
-				return null;
-			}
-		}, executionGraph.getExecutionContext());
+		future(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts), executionGraph.getExecutionContext());
 	}
 
 	/**
@@ -109,12 +88,12 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 		} catch (NumberFormatException nfe) {
 			if (delayString.equals(timeoutString)) {
 				throw new Exception("Invalid config value for " +
-					ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE + ": " + timeoutString +
-					". Value must be a valid duration (such as '10 s' or '1 min')");
+						ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE + ": " + timeoutString +
+						". Value must be a valid duration (such as '10 s' or '1 min')");
 			} else {
 				throw new Exception("Invalid config value for " +
-					ConfigConstants.EXECUTION_RETRY_DELAY_KEY + ": " + delayString +
-					". Value must be a valid duration (such as '100 milli' or '10 s')");
+						ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY + ": " + delayString +
+						". Value must be a valid duration (such as '100 milli' or '10 s')");
 			}
 		}
 
@@ -146,4 +125,4 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 			return new FixedDelayRestartStrategy(maxAttempts, delay);
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
index e58d775..ae92b3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyFactory.java
@@ -57,7 +57,15 @@ public abstract class RestartStrategyFactory implements Serializable {
 
 			return new FixedDelayRestartStrategy(
 				fixedDelayConfig.getRestartAttempts(),
-				fixedDelayConfig.getDelayBetweenAttempts());
+				fixedDelayConfig.getDelayBetweenAttemptsInterval().toMilliseconds());
+		} else if (restartStrategyConfiguration instanceof RestartStrategies.FailureRateRestartStrategyConfiguration) {
+			RestartStrategies.FailureRateRestartStrategyConfiguration config =
+					(RestartStrategies.FailureRateRestartStrategyConfiguration) restartStrategyConfiguration;
+			return new FailureRateRestartStrategy(
+					config.getMaxFailureRate(),
+					config.getFailureInterval(),
+					config.getDelayBetweenAttemptsInterval()
+			);
 		} else {
 			throw new IllegalArgumentException("Unknown restart strategy configuration " +
 				restartStrategyConfiguration + ".");
@@ -110,6 +118,9 @@ public abstract class RestartStrategyFactory implements Serializable {
 			case "fixeddelay":
 			case "fixed-delay":
 				return FixedDelayRestartStrategy.createFactory(configuration);
+			case "failurerate":
+			case "failure-rate":
+				return FailureRateRestartStrategy.createFactory(configuration);
 			default:
 				try {
 					Class<?> clazz = Class.forName(restartStrategyName);

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index 12b866e..0d09e38 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -22,12 +22,15 @@ import akka.dispatch.Futures;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.FailureRateRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -48,6 +51,7 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.concurrent.impl.Promise;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
@@ -69,33 +73,9 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	@Test
 	public void testNoManualRestart() throws Exception {
-		Instance instance = ExecutionGraphTestUtils.getInstance(
-				new SimpleActorGateway(TestingUtils.directExecutionContext()),
-				NUM_TASKS);
-
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-		scheduler.newInstanceAvailable(instance);
-
-		JobVertex sender = new JobVertex("Task");
-		sender.setInvokableClass(Tasks.NoOpInvokable.class);
-		sender.setParallelism(NUM_TASKS);
-
-		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
-
-		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				new JobID(),
-				"test job",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				new NoRestartStrategy());
-		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-
-		assertEquals(JobStatus.CREATED, eg.getState());
-
-		eg.scheduleForExecution(scheduler);
-		assertEquals(JobStatus.RUNNING, eg.getState());
+		NoRestartStrategy restartStrategy = new NoRestartStrategy();
+		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
+		ExecutionGraph eg = executionGraphInstanceTuple.f0;
 
 		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
 
@@ -121,14 +101,9 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		
 		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 		scheduler.newInstanceAvailable(instance);
-		
-		JobVertex groupVertex = new JobVertex("Task1");
-		groupVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-		groupVertex.setParallelism(NUM_TASKS);
 
-		JobVertex groupVertex2 = new JobVertex("Task2");
-		groupVertex2.setInvokableClass(Tasks.NoOpInvokable.class);
-		groupVertex2.setParallelism(NUM_TASKS);
+		JobVertex groupVertex = newJobVertex("Task1", NUM_TASKS, Tasks.NoOpInvokable.class);
+		JobVertex groupVertex2 = newJobVertex("Task2", NUM_TASKS, Tasks.NoOpInvokable.class);
 
 		SlotSharingGroup sharingGroup = new SlotSharingGroup();
 		groupVertex.setSlotSharingGroup(sharingGroup);
@@ -137,14 +112,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		
 		//initiate and schedule job
 		JobGraph jobGraph = new JobGraph("Pointwise job", groupVertex, groupVertex2);
-		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			new JobID(),
-			"test job",
-			new Configuration(),
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new FixedDelayRestartStrategy(1, 0L));
+		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 0L));
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
@@ -180,71 +148,48 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	@Test
 	public void testRestartAutomatically() throws Exception {
-		Instance instance = ExecutionGraphTestUtils.getInstance(
-				new SimpleActorGateway(TestingUtils.directExecutionContext()),
-				NUM_TASKS);
-
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-		scheduler.newInstanceAvailable(instance);
-
-		JobVertex sender = new JobVertex("Task");
-		sender.setInvokableClass(Tasks.NoOpInvokable.class);
-		sender.setParallelism(NUM_TASKS);
+		RestartStrategy restartStrategy = new FixedDelayRestartStrategy(1, 1000);
+		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
+		ExecutionGraph eg = executionGraphInstanceTuple.f0;
 
-		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
-
-		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				new JobID(),
-				"Test job",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				new FixedDelayRestartStrategy(1, 1000));
-		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-
-		assertEquals(JobStatus.CREATED, eg.getState());
+		restartAfterFailure(eg, new FiniteDuration(2, TimeUnit.MINUTES), true);
+	}
 
-		eg.scheduleForExecution(scheduler);
+	@Test
+	public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception {
+		FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(2, Time.of(10, TimeUnit.SECONDS), Time.of(0, TimeUnit.SECONDS));
+		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS);
+		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
+		ExecutionGraph eg = executionGraphInstanceTuple.f0;
+
+		restartAfterFailure(eg, timeout, false);
+		restartAfterFailure(eg, timeout, false);
+		makeAFailureAndWait(eg, timeout);
+		//failure rate limit exceeded, so task should be failed
+		assertEquals(JobStatus.FAILED, eg.getState());
+	}
 
+	@Test
+	public void taskShouldNotFailWhenFailureRateLimitWasNotExceeded() throws Exception {
+		FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(1, Time.of(1, TimeUnit.MILLISECONDS), Time.of(0, TimeUnit.SECONDS));
+		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS);
+		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
+		ExecutionGraph eg = executionGraphInstanceTuple.f0;
+
+		//task restarted many times, but after all job is still running, because rate limit was not exceeded
+		restartAfterFailure(eg, timeout, false);
+		restartAfterFailure(eg, timeout, false);
+		restartAfterFailure(eg, timeout, false);
 		assertEquals(JobStatus.RUNNING, eg.getState());
-		restartAfterFailure(eg, new FiniteDuration(2, TimeUnit.MINUTES), true);
 	}
 
 	@Test
 	public void testCancelWhileRestarting() throws Exception {
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-
-		Instance instance = ExecutionGraphTestUtils.getInstance(
-				new SimpleActorGateway(TestingUtils.directExecutionContext()),
-				NUM_TASKS);
-
-		scheduler.newInstanceAvailable(instance);
-
-		// Blocking program
-		ExecutionGraph executionGraph = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				new JobID(),
-				"TestJob",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				// We want to manually control the restart and delay
-				new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
-
-		JobVertex jobVertex = new JobVertex("NoOpInvokable");
-		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-		jobVertex.setParallelism(NUM_TASKS);
-
-		JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
-
-		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-
-		assertEquals(JobStatus.CREATED, executionGraph.getState());
-
-		executionGraph.scheduleForExecution(scheduler);
-
-		assertEquals(JobStatus.RUNNING, executionGraph.getState());
+		// We want to manually control the restart and delay
+		FixedDelayRestartStrategy restartStrategy = new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE);
+		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
+		ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
+		Instance instance = executionGraphInstanceTuple.f1;
 
 		// Kill the instance and wait for the job to restart
 		instance.markDead();
@@ -331,46 +276,13 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	@Test
 	public void testCancelWhileFailing() throws Exception {
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-
-		Instance instance = ExecutionGraphTestUtils.getInstance(
-				new SimpleActorGateway(TestingUtils.directExecutionContext()),
-				NUM_TASKS);
-
-		scheduler.newInstanceAvailable(instance);
-
-		// Blocking program
-		ExecutionGraph executionGraph = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				new JobID(),
-				"TestJob",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				// We want to manually control the restart and delay
-				new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE));
-
-		// Spy on the graph
-		executionGraph = spy(executionGraph);
-
-		// Do nothing here, because we don't want to transition out of
-		// the FAILING state.
+		// We want to manually control the restart and delay
+		FixedDelayRestartStrategy restartStrategy = new FixedDelayRestartStrategy(Integer.MAX_VALUE, Long.MAX_VALUE);
+		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createSpyExecutionGraph(restartStrategy);
+		ExecutionGraph executionGraph = executionGraphInstanceTuple.f0;
+		Instance instance = executionGraphInstanceTuple.f1;
 		doNothing().when(executionGraph).jobVertexInFinalState();
 
-		JobVertex jobVertex = new JobVertex("NoOpInvokable");
-		jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
-		jobVertex.setParallelism(NUM_TASKS);
-
-		JobGraph jobGraph = new JobGraph("TestJob", jobVertex);
-
-		executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-
-		assertEquals(JobStatus.CREATED, executionGraph.getState());
-
-		executionGraph.scheduleForExecution(scheduler);
-
-		assertEquals(JobStatus.RUNNING, executionGraph.getState());
-
 		// Kill the instance...
 		instance.markDead();
 
@@ -410,35 +322,8 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 	@Test
 	public void testNoRestartOnSuppressException() throws Exception {
-		Instance instance = ExecutionGraphTestUtils.getInstance(
-				new SimpleActorGateway(TestingUtils.directExecutionContext()),
-				NUM_TASKS);
-
-		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
-		scheduler.newInstanceAvailable(instance);
-
-		JobVertex sender = new JobVertex("Task");
-		sender.setInvokableClass(Tasks.NoOpInvokable.class);
-		sender.setParallelism(NUM_TASKS);
-
-		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
-
-		ExecutionGraph eg = spy(new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			new JobID(),
-			"Test job",
-			new Configuration(),
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new FixedDelayRestartStrategy(1, 1000)));
-
-		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
-
-		assertEquals(JobStatus.CREATED, eg.getState());
-
-		eg.scheduleForExecution(scheduler);
-
-		assertEquals(JobStatus.RUNNING, eg.getState());
+		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createSpyExecutionGraph(new FixedDelayRestartStrategy(1, 1000));
+		ExecutionGraph eg = executionGraphInstanceTuple.f0;
 
 		// Fail with unrecoverable Exception
 		eg.getAllExecutionVertices().iterator().next().fail(
@@ -484,25 +369,10 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 		scheduler.newInstanceAvailable(instance);
 
-		JobVertex sender = new JobVertex("Task1");
-		sender.setInvokableClass(Tasks.NoOpInvokable.class);
-		sender.setParallelism(1);
-
-		JobVertex receiver = new JobVertex("Task2");
-		receiver.setInvokableClass(Tasks.NoOpInvokable.class);
-		receiver.setParallelism(1);
-
+		JobVertex sender = newJobVertex("Task1", 1, Tasks.NoOpInvokable.class);
+		JobVertex receiver = newJobVertex("Task2", 1, Tasks.NoOpInvokable.class);
 		JobGraph jobGraph = new JobGraph("Pointwise job", sender, receiver);
-
-		ExecutionGraph eg = new ExecutionGraph(
-			TestingUtils.defaultExecutionContext(),
-			new JobID(),
-			"test job",
-			new Configuration(),
-			new SerializedValue<>(new ExecutionConfig()),
-			AkkaUtils.getDefaultTimeout(),
-			new FixedDelayRestartStrategy(1, 1000));
-
+		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000));
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
 		assertEquals(JobStatus.CREATED, eg.getState());
@@ -522,30 +392,12 @@ public class ExecutionGraphRestartTest extends TestLogger {
 
 		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
 
-		Deadline deadline = timeout.fromNow();
-
-		while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
-			Thread.sleep(100);
-		}
+		waitForAsyncRestart(eg, timeout);
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
-		// Wait for deploying after async restart
-		deadline = timeout.fromNow();
-
 		// Wait for all resources to be assigned after async restart
-		boolean success = false;
-		while (deadline.hasTimeLeft() && !success) {
-			success = true;
-
-			for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-				if (vertex.getCurrentExecutionAttempt().getAssignedResource() == null) {
-					success = false;
-					Thread.sleep(100);
-					break;
-				}
-			}
-		}
+		waitForAllResourcesToBeAssignedAfterAsyncRestart(eg, timeout.fromNow());
 
 		// At this point all resources have been assigned
 		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
@@ -580,9 +432,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 		scheduler.newInstanceAvailable(instance);
 
-		JobVertex vertex = new JobVertex("Test Vertex");
-		vertex.setInvokableClass(Tasks.NoOpInvokable.class);
-		vertex.setParallelism(1);
+		JobVertex vertex = newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class);
 
 		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
@@ -590,14 +440,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		JobGraph jobGraph = new JobGraph("Test Job", vertex);
 		jobGraph.setExecutionConfig(executionConfig);
 
-		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				new JobID(),
-				"test job",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				new FixedDelayRestartStrategy(1, 1000000));
+		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000));
 
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
@@ -634,9 +477,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
 		scheduler.newInstanceAvailable(instance);
 
-		JobVertex vertex = new JobVertex("Test Vertex");
-		vertex.setInvokableClass(Tasks.NoOpInvokable.class);
-		vertex.setParallelism(1);
+		JobVertex vertex = newJobVertex("Test Vertex", 1, Tasks.NoOpInvokable.class);
 
 		ExecutionConfig executionConfig = new ExecutionConfig();
 		executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(
@@ -644,14 +485,7 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		JobGraph jobGraph = new JobGraph("Test Job", vertex);
 		jobGraph.setExecutionConfig(executionConfig);
 
-		ExecutionGraph eg = new ExecutionGraph(
-				TestingUtils.defaultExecutionContext(),
-				new JobID(),
-				"test job",
-				new Configuration(),
-				new SerializedValue<>(new ExecutionConfig()),
-				AkkaUtils.getDefaultTimeout(),
-				new FixedDelayRestartStrategy(1, 1000000));
+		ExecutionGraph eg = newExecutionGraph(new FixedDelayRestartStrategy(1, 1000000));
 
 		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
 
@@ -789,25 +623,76 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		}
 	}
 
-	private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {
+	private static Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy) throws Exception {
+		return createExecutionGraph(restartStrategy, false);
+	}
 
-		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
-		assertEquals(JobStatus.FAILING, eg.getState());
+	private static Tuple2<ExecutionGraph, Instance> createSpyExecutionGraph(RestartStrategy restartStrategy) throws Exception {
+		return createExecutionGraph(restartStrategy, true);
+	}
 
-		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
-			vertex.getCurrentExecutionAttempt().cancelingComplete();
-		}
+	private static Tuple2<ExecutionGraph, Instance> createExecutionGraph(RestartStrategy restartStrategy, boolean isSpy) throws Exception {
+		Instance instance = ExecutionGraphTestUtils.getInstance(
+				new SimpleActorGateway(TestingUtils.directExecutionContext()),
+				NUM_TASKS);
 
-		// Wait for async restart
-		Deadline deadline = timeout.fromNow();
-		while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
-			Thread.sleep(100);
+		Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
+		scheduler.newInstanceAvailable(instance);
+
+		JobVertex sender = newJobVertex("Task", NUM_TASKS, Tasks.NoOpInvokable.class);
+
+		JobGraph jobGraph = new JobGraph("Pointwise job", sender);
+
+		ExecutionGraph eg = newExecutionGraph(restartStrategy);
+		if (isSpy) {
+			eg = spy(eg);
 		}
+		eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
+
+		assertEquals(JobStatus.CREATED, eg.getState());
+
+		eg.scheduleForExecution(scheduler);
+		assertEquals(JobStatus.RUNNING, eg.getState());
+		return new Tuple2<>(eg, instance);
+	}
+
+	private static JobVertex newJobVertex(String task1, int numTasks, Class<Tasks.NoOpInvokable> invokable) {
+		JobVertex groupVertex = new JobVertex(task1);
+		groupVertex.setInvokableClass(invokable);
+		groupVertex.setParallelism(numTasks);
+		return groupVertex;
+	}
+
+	private static ExecutionGraph newExecutionGraph(RestartStrategy restartStrategy) throws IOException {
+		return new ExecutionGraph(
+				TestingUtils.defaultExecutionContext(),
+				new JobID(),
+				"Test job",
+				new Configuration(),
+				new SerializedValue<>(new ExecutionConfig()),
+				AkkaUtils.getDefaultTimeout(),
+				restartStrategy);
+	}
+
+	private static void restartAfterFailure(ExecutionGraph eg, FiniteDuration timeout, boolean haltAfterRestart) throws InterruptedException {
+		makeAFailureAndWait(eg, timeout);
 
 		assertEquals(JobStatus.RUNNING, eg.getState());
 
 		// Wait for deploying after async restart
-		deadline = timeout.fromNow();
+		Deadline deadline = timeout.fromNow();
+		waitForAllResourcesToBeAssignedAfterAsyncRestart(eg, deadline);
+
+		if (haltAfterRestart) {
+			if (deadline.hasTimeLeft()) {
+				haltExecution(eg);
+			} else {
+				fail("Failed to wait until all execution attempts left the state DEPLOYING.");
+			}
+		}
+	}
+
+	private static void waitForAllResourcesToBeAssignedAfterAsyncRestart(ExecutionGraph eg, Deadline deadline) throws InterruptedException {
 		boolean success = false;
 
 		while (deadline.hasTimeLeft() && !success) {
@@ -821,13 +706,24 @@ public class ExecutionGraphRestartTest extends TestLogger {
 				}
 			}
 		}
+	}
 
-		if (haltAfterRestart) {
-			if (deadline.hasTimeLeft()) {
-				haltExecution(eg);
-			} else {
-				fail("Failed to wait until all execution attempts left the state DEPLOYING.");
-			}
+	private static void makeAFailureAndWait(ExecutionGraph eg, FiniteDuration timeout) throws InterruptedException {
+		eg.getAllExecutionVertices().iterator().next().fail(new Exception("Test Exception"));
+		assertEquals(JobStatus.FAILING, eg.getState());
+
+		for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
+			vertex.getCurrentExecutionAttempt().cancelingComplete();
+		}
+
+		// Wait for async restart
+		waitForAsyncRestart(eg, timeout);
+	}
+
+	private static void waitForAsyncRestart(ExecutionGraph eg, FiniteDuration timeout) throws InterruptedException {
+		Deadline deadline = timeout.fromNow();
+		while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
+			Thread.sleep(100);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
index 7bb726d..c57bea7 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/RestartStrategyTest.java
@@ -95,6 +95,6 @@ public class RestartStrategyTest {
 		Assert.assertNotNull(restartStrategy);
 		Assert.assertTrue(restartStrategy instanceof RestartStrategies.FixedDelayRestartStrategyConfiguration);
 		Assert.assertEquals(42, ((RestartStrategies.FixedDelayRestartStrategyConfiguration)restartStrategy).getRestartAttempts());
-		Assert.assertEquals(1337, ((RestartStrategies.FixedDelayRestartStrategyConfiguration)restartStrategy).getDelayBetweenAttempts());
+		Assert.assertEquals(1337, ((RestartStrategies.FixedDelayRestartStrategyConfiguration)restartStrategy).getDelayBetweenAttemptsInterval().toMilliseconds());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
index ffd94fc..7c414fa 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/WindowReduceITCase.scala
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.api.scala
 
 import java.util.concurrent.TimeUnit
 
-import org.apache.flink.api.common.functions.{ReduceFunction, FoldFunction}
+import org.apache.flink.api.common.functions.ReduceFunction
 import org.apache.flink.api.java.tuple.Tuple
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
index aa5ff3b..2d634de 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/WindowCheckpointingITCase.java
@@ -23,10 +23,8 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
new file mode 100644
index 0000000..0c5d14b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFailureRateStrategyITBase.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.BeforeClass;
+
+public class SimpleRecoveryFailureRateStrategyITBase extends SimpleRecoveryITCaseBase {
+	@BeforeClass
+	public static void setupCluster() {
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+		config.setString(ConfigConstants.RESTART_STRATEGY, "failure-rate");
+		config.setInteger(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_MAX_FAILURES_PER_INTERVAL, 1);
+		config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_FAILURE_RATE_INTERVAL, "1 second");
+		config.setString(ConfigConstants.RESTART_STRATEGY_FAILURE_RATE_DELAY, "100 ms");
+
+		cluster = new ForkableFlinkMiniCluster(config, false);
+
+		cluster.start();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
new file mode 100644
index 0000000..6355a8f
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryFixedDelayRestartStrategyITBase.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.BeforeClass;
+
+public class SimpleRecoveryFixedDelayRestartStrategyITBase extends SimpleRecoveryITCaseBase {
+	@BeforeClass
+	public static void setupCluster() {
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
+		config.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
+		config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
+		config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 ms");
+
+		cluster = new ForkableFlinkMiniCluster(config, false);
+
+		cluster.start();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
deleted file mode 100644
index 0a0f451..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCase.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.recovery;
-
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-/**
- * A series of tests (reusing one FlinkMiniCluster) where tasks fail (one or more time)
- * and the recovery should restart them to verify job completion.
- */
-@SuppressWarnings("serial")
-public class SimpleRecoveryITCase {
-
-	private static ForkableFlinkMiniCluster cluster;
-
-	@BeforeClass
-	public static void setupCluster() {
-		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
-		config.setString(ConfigConstants.RESTART_STRATEGY, "fixed-delay");
-		config.setInteger(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, 1);
-		config.setString(ConfigConstants.RESTART_STRATEGY_FIXED_DELAY_DELAY, "100 ms");
-
-		cluster = new ForkableFlinkMiniCluster(config, false);
-
-		cluster.start();
-	}
-
-	@AfterClass
-	public static void teardownCluster() {
-		try {
-			cluster.stop();
-		}
-		catch (Throwable t) {
-			System.err.println("Error stopping cluster on shutdown");
-			t.printStackTrace();
-			fail("ClusterClient shutdown caused an exception: " + t.getMessage());
-		}
-	}
-
-	@Test
-	public void testFailedRunThenSuccessfulRun() {
-
-		try {
-			List<Long> resultCollection = new ArrayList<Long>();
-
-			// attempt 1
-			{
-				ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-						"localhost", cluster.getLeaderRPCPort());
-
-				env.setParallelism(4);
-				env.setRestartStrategy(RestartStrategies.noRestart());
-				env.getConfig().disableSysoutLogging();
-
-				env.generateSequence(1, 10)
-						.rebalance()
-						.map(new FailingMapper1<Long>())
-						.reduce(new ReduceFunction<Long>() {
-							@Override
-							public Long reduce(Long value1, Long value2) {
-								return value1 + value2;
-							}
-						})
-						.output(new LocalCollectionOutputFormat<Long>(resultCollection));
-
-				try {
-					JobExecutionResult res = env.execute();
-					String msg = res == null ? "null result" : "result in " + res.getNetRuntime() + " ms";
-					fail("The program should have failed, but returned " + msg);
-				}
-				catch (ProgramInvocationException e) {
-					// expected
-				}
-			}
-
-			// attempt 2
-			{
-				ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-						"localhost", cluster.getLeaderRPCPort());
-
-				env.setParallelism(4);
-				env.setRestartStrategy(RestartStrategies.noRestart());
-				env.getConfig().disableSysoutLogging();
-
-				env.generateSequence(1, 10)
-						.rebalance()
-						.map(new FailingMapper1<Long>())
-						.reduce(new ReduceFunction<Long>() {
-							@Override
-							public Long reduce(Long value1, Long value2) {
-								return value1 + value2;
-							}
-						})
-						.output(new LocalCollectionOutputFormat<Long>(resultCollection));
-
-				try {
-					JobExecutionResult result = env.execute();
-					assertTrue(result.getNetRuntime() >= 0);
-					assertNotNull(result.getAllAccumulatorResults());
-					assertTrue(result.getAllAccumulatorResults().isEmpty());
-				}
-				catch (JobExecutionException e) {
-					fail("The program should have succeeded on the second run");
-				}
-
-				long sum = 0;
-				for (long l : resultCollection) {
-					sum += l;
-				}
-				assertEquals(55, sum);
-			}
-
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testRestart() {
-		try {
-			List<Long> resultCollection = new ArrayList<Long>();
-
-			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-
-			env.setParallelism(4);
-			// the default restart strategy should be taken
-			env.getConfig().disableSysoutLogging();
-
-			env.generateSequence(1, 10)
-					.rebalance()
-					.map(new FailingMapper2<Long>())
-					.reduce(new ReduceFunction<Long>() {
-						@Override
-						public Long reduce(Long value1, Long value2) {
-							return value1 + value2;
-						}
-					})
-					.output(new LocalCollectionOutputFormat<Long>(resultCollection));
-
-			try {
-				JobExecutionResult result = env.execute();
-				assertTrue(result.getNetRuntime() >= 0);
-				assertNotNull(result.getAllAccumulatorResults());
-				assertTrue(result.getAllAccumulatorResults().isEmpty());
-			}
-			catch (JobExecutionException e) {
-				fail("The program should have succeeded on the second run");
-			}
-
-			long sum = 0;
-			for (long l : resultCollection) {
-				sum += l;
-			}
-			assertEquals(55, sum);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testRestartMultipleTimes() {
-		try {
-			List<Long> resultCollection = new ArrayList<Long>();
-
-			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-					"localhost", cluster.getLeaderRPCPort());
-
-			env.setParallelism(4);
-			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 100));
-			env.getConfig().disableSysoutLogging();
-
-			env.generateSequence(1, 10)
-					.rebalance()
-					.map(new FailingMapper3<Long>())
-					.reduce(new ReduceFunction<Long>() {
-						@Override
-						public Long reduce(Long value1, Long value2) {
-							return value1 + value2;
-						}
-					})
-					.output(new LocalCollectionOutputFormat<Long>(resultCollection));
-
-			try {
-				JobExecutionResult result = env.execute();
-				assertTrue(result.getNetRuntime() >= 0);
-				assertNotNull(result.getAllAccumulatorResults());
-				assertTrue(result.getAllAccumulatorResults().isEmpty());
-			}
-			catch (JobExecutionException e) {
-				fail("The program should have succeeded on the second run");
-			}
-
-			long sum = 0;
-			for (long l : resultCollection) {
-				sum += l;
-			}
-			assertEquals(55, sum);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	// ------------------------------------------------------------------------------------
-
-	private static class FailingMapper1<T> extends RichMapFunction<T, T> {
-
-		private static volatile int failuresBeforeSuccess = 1;
-
-		@Override
-		public T map(T value) throws Exception {
-			if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
-				failuresBeforeSuccess--;
-				throw new Exception("Test Failure");
-			}
-
-			return value;
-		}
-	}
-
-	private static class FailingMapper2<T> extends RichMapFunction<T, T> {
-
-		private static volatile int failuresBeforeSuccess = 1;
-
-		@Override
-		public T map(T value) throws Exception {
-			if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
-				failuresBeforeSuccess--;
-				throw new Exception("Test Failure");
-			}
-
-			return value;
-		}
-	}
-
-	private static class FailingMapper3<T> extends RichMapFunction<T, T> {
-
-		private static volatile int failuresBeforeSuccess = 3;
-
-		@Override
-		public T map(T value) throws Exception {
-			if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
-				failuresBeforeSuccess--;
-				throw new Exception("Test Failure");
-			}
-
-			return value;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7274d56/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
new file mode 100644
index 0000000..004340c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/SimpleRecoveryITCaseBase.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.recovery;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.test.util.ForkableFlinkMiniCluster;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.*;
+
+/**
+ * A series of tests (reusing one FlinkMiniCluster) where tasks fail (one or more time)
+ * and the recovery should restart them to verify job completion.
+ */
+@SuppressWarnings("serial")
+public abstract class SimpleRecoveryITCaseBase {
+
+	protected static ForkableFlinkMiniCluster cluster;
+
+	@AfterClass
+	public static void teardownCluster() {
+		try {
+			cluster.stop();
+		}
+		catch (Throwable t) {
+			System.err.println("Error stopping cluster on shutdown");
+			t.printStackTrace();
+			fail("ClusterClient shutdown caused an exception: " + t.getMessage());
+		}
+	}
+
+	@Test
+	public void testFailedRunThenSuccessfulRun() {
+
+		try {
+			List<Long> resultCollection = new ArrayList<Long>();
+
+			// attempt 1
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+						"localhost", cluster.getLeaderRPCPort());
+
+				env.setParallelism(4);
+				env.setRestartStrategy(RestartStrategies.noRestart());
+				env.getConfig().disableSysoutLogging();
+
+				env.generateSequence(1, 10)
+						.rebalance()
+						.map(new FailingMapper1<Long>())
+						.reduce(new ReduceFunction<Long>() {
+							@Override
+							public Long reduce(Long value1, Long value2) {
+								return value1 + value2;
+							}
+						})
+						.output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+				try {
+					JobExecutionResult res = env.execute();
+					String msg = res == null ? "null result" : "result in " + res.getNetRuntime() + " ms";
+					fail("The program should have failed, but returned " + msg);
+				}
+				catch (ProgramInvocationException e) {
+					// expected
+				}
+			}
+
+			// attempt 2
+			{
+				ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+						"localhost", cluster.getLeaderRPCPort());
+
+				env.setParallelism(4);
+				env.setRestartStrategy(RestartStrategies.noRestart());
+				env.getConfig().disableSysoutLogging();
+
+				env.generateSequence(1, 10)
+						.rebalance()
+						.map(new FailingMapper1<Long>())
+						.reduce(new ReduceFunction<Long>() {
+							@Override
+							public Long reduce(Long value1, Long value2) {
+								return value1 + value2;
+							}
+						})
+						.output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+				executeAndRunAssertions(env);
+
+				long sum = 0;
+				for (long l : resultCollection) {
+					sum += l;
+				}
+				assertEquals(55, sum);
+			}
+
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	private void executeAndRunAssertions(ExecutionEnvironment env) throws Exception {
+		try {
+            JobExecutionResult result = env.execute();
+            assertTrue(result.getNetRuntime() >= 0);
+            assertNotNull(result.getAllAccumulatorResults());
+            assertTrue(result.getAllAccumulatorResults().isEmpty());
+        }
+        catch (JobExecutionException e) {
+            fail("The program should have succeeded on the second run");
+        }
+	}
+
+	@Test
+	public void testRestart() {
+		try {
+			List<Long> resultCollection = new ArrayList<Long>();
+
+			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getLeaderRPCPort());
+
+			env.setParallelism(4);
+			// the default restart strategy should be taken
+			env.getConfig().disableSysoutLogging();
+
+			env.generateSequence(1, 10)
+					.rebalance()
+					.map(new FailingMapper2<Long>())
+					.reduce(new ReduceFunction<Long>() {
+						@Override
+						public Long reduce(Long value1, Long value2) {
+							return value1 + value2;
+						}
+					})
+					.output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+			executeAndRunAssertions(env);
+
+			long sum = 0;
+			for (long l : resultCollection) {
+				sum += l;
+			}
+			assertEquals(55, sum);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testRestartMultipleTimes() {
+		try {
+			List<Long> resultCollection = new ArrayList<Long>();
+
+			ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
+					"localhost", cluster.getLeaderRPCPort());
+
+			env.setParallelism(4);
+			env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, 100));
+			env.getConfig().disableSysoutLogging();
+
+			env.generateSequence(1, 10)
+					.rebalance()
+					.map(new FailingMapper3<Long>())
+					.reduce(new ReduceFunction<Long>() {
+						@Override
+						public Long reduce(Long value1, Long value2) {
+							return value1 + value2;
+						}
+					})
+					.output(new LocalCollectionOutputFormat<Long>(resultCollection));
+
+			executeAndRunAssertions(env);
+
+			long sum = 0;
+			for (long l : resultCollection) {
+				sum += l;
+			}
+			assertEquals(55, sum);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------------------
+
+	private static class FailingMapper1<T> extends RichMapFunction<T, T> {
+
+		private static volatile int failuresBeforeSuccess = 1;
+
+		@Override
+		public T map(T value) throws Exception {
+			if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+				failuresBeforeSuccess--;
+				throw new Exception("Test Failure");
+			}
+
+			return value;
+		}
+	}
+
+	private static class FailingMapper2<T> extends RichMapFunction<T, T> {
+
+		private static volatile int failuresBeforeSuccess = 1;
+
+		@Override
+		public T map(T value) throws Exception {
+			if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+				failuresBeforeSuccess--;
+				throw new Exception("Test Failure");
+			}
+
+			return value;
+		}
+	}
+
+	private static class FailingMapper3<T> extends RichMapFunction<T, T> {
+
+		private static volatile int failuresBeforeSuccess = 3;
+
+		@Override
+		public T map(T value) throws Exception {
+			if (failuresBeforeSuccess > 0 && getRuntimeContext().getIndexOfThisSubtask() == 1) {
+				failuresBeforeSuccess--;
+				throw new Exception("Test Failure");
+			}
+
+			return value;
+		}
+	}
+}


Mime
View raw message