flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/3] flink git commit: [FLINK-6665] [FLINK-6667] [distributed coordination] Use a callback and a ScheduledExecutor for ExecutionGraph restarts
Date Thu, 20 Jul 2017 13:52:19 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.3 129a82fba -> 39f5b1144


[FLINK-6665] [FLINK-6667] [distributed coordination] Use a callback and a ScheduledExecutor
for ExecutionGraph restarts

Initial work by zjureel@gmail.com , improved by sewen@apache.org.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c0803d5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c0803d5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c0803d5

Branch: refs/heads/release-1.3
Commit: 6c0803d5e607d8e0e647a2559570fedf7968a9dc
Parents: 129a82f
Author: zjureel <zjureel@gmail.com>
Authored: Tue Jul 18 19:27:56 2017 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Jul 20 15:10:17 2017 +0200

----------------------------------------------------------------------
 .../runtime/executiongraph/ExecutionGraph.java  |   7 +-
 .../restart/ExecutionGraphRestartCallback.java  |  51 ++++++++
 .../restart/ExecutionGraphRestarter.java        |  45 -------
 .../restart/FailureRateRestartStrategy.java     |  15 ++-
 .../restart/FixedDelayRestartStrategy.java      |  15 ++-
 .../restart/NoRestartStrategy.java              |   5 +-
 .../executiongraph/restart/RestartCallback.java |  32 +++++
 .../executiongraph/restart/RestartStrategy.java |  12 +-
 .../ExecutionGraphMetricsTest.java              |  17 +--
 .../ExecutionGraphRestartTest.java              |  51 ++------
 .../executiongraph/ExecutionGraphTestUtils.java |  11 +-
 .../restart/FailureRateRestartStrategyTest.java | 128 +++++++++++++++++++
 .../restart/FixedDelayRestartStrategyTest.java  |  77 ++++++++---
 .../restart/InfiniteDelayRestartStrategy.java   |  10 +-
 .../restart/LatchedRestarter.java               |  38 ++++++
 .../executiongraph/restart/NoOpRestarter.java   |  28 ++++
 16 files changed, 407 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 7c13936..9f29faf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -44,11 +44,14 @@ import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
+import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -1388,7 +1391,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 				if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
 					LOG.info("Restarting the job {} ({}).", getJobName(), getJobID());
-					restartStrategy.restart(this);
+
+					RestartCallback restarter = new ExecutionGraphRestartCallback(this);
+					restartStrategy.restart(restarter, new ScheduledExecutorServiceAdapter(futureExecutor));
 
 					return true;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
new file mode 100644
index 0000000..5874f91
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestartCallback.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@link RestartCallback} that abstracts restart calls on an {@link ExecutionGraph}. 
+ * 
+ * <p>This callback implementation is one-shot; it can only be used once.
+ */
+public class ExecutionGraphRestartCallback implements RestartCallback {
+
+	/** The ExecutionGraph to restart */
+	private final ExecutionGraph execGraph;
+
+	/** Atomic flag to make sure this is used only once */
+	private final AtomicBoolean used;
+
+	public ExecutionGraphRestartCallback(ExecutionGraph execGraph) {
+		this.execGraph = checkNotNull(execGraph);
+		this.used = new AtomicBoolean(false);
+	}
+
+	@Override
+	public void triggerFullRecovery() {
+		if (used.compareAndSet(false, true)) {
+			execGraph.restart();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java
deleted file mode 100644
index 9287694..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/ExecutionGraphRestarter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph.restart;
-
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-
-class ExecutionGraphRestarter {
-	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphRestarter.class);
-	public static Callable<Object> restartWithDelay(final ExecutionGraph executionGraph,
final long delayBetweenRestartAttemptsInMillis) {
-		return new Callable<Object>() {
-			@Override
-			public Object call() throws Exception {
-				try {
-					LOG.info("Delaying retry of job execution for {} ms ...", delayBetweenRestartAttemptsInMillis);
-					// do the delay
-					Thread.sleep(delayBetweenRestartAttemptsInMillis);
-				} catch(InterruptedException e) {
-					// should only happen on shutdown
-				}
-				executionGraph.restart();
-				return null;
-			}
-		};
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
index d95e1c3..6580ec3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategy.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.executiongraph.restart;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.Preconditions;
 
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
  * with a fixed time delay in between.
  */
 public class FailureRateRestartStrategy implements RestartStrategy {
+
 	private final Time failuresInterval;
 	private final Time delayInterval;
 	private final int maxFailuresPerInterval;
@@ -66,16 +67,22 @@ public class FailureRateRestartStrategy implements RestartStrategy {
 	}
 
 	@Override
-	public void restart(final ExecutionGraph executionGraph) {
+	public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
 		if (isRestartTimestampsQueueFull()) {
 			restartTimestampsDeque.remove();
 		}
 		restartTimestampsDeque.add(System.currentTimeMillis());
-		FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayInterval.toMilliseconds()),
executionGraph.getFutureExecutor());
+
+		executor.schedule(new Runnable() {
+			@Override
+			public void run() {
+				restarter.triggerFullRecovery();
+			}
+		}, delayInterval.getSize(), delayInterval.getUnit());
 	}
 
 	private boolean isRestartTimestampsQueueFull() {
-		return restartTimestampsDeque.size() == maxFailuresPerInterval;
+		return restartTimestampsDeque.size() >= maxFailuresPerInterval;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
index f51ea7c..0dd700a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategy.java
@@ -20,16 +20,19 @@ package org.apache.flink.runtime.executiongraph.restart;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.util.Preconditions;
 import scala.concurrent.duration.Duration;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * Restart strategy which tries to restart the given {@link ExecutionGraph} a fixed number
of times
  * with a fixed time delay in between.
  */
 public class FixedDelayRestartStrategy implements RestartStrategy {
+
 	private final int maxNumberRestartAttempts;
 	private final long delayBetweenRestartAttempts;
 	private int currentRestartAttempt;
@@ -56,9 +59,15 @@ public class FixedDelayRestartStrategy implements RestartStrategy {
 	}
 
 	@Override
-	public void restart(final ExecutionGraph executionGraph) {
+	public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
 		currentRestartAttempt++;
-		FlinkFuture.supplyAsync(ExecutionGraphRestarter.restartWithDelay(executionGraph, delayBetweenRestartAttempts),
executionGraph.getFutureExecutor());
+
+		executor.schedule(new Runnable() {
+			@Override
+			public void run() {
+				restarter.triggerFullRecovery();
+			}
+		}, delayBetweenRestartAttempts, TimeUnit.MILLISECONDS);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
index 958d9ac..5502d2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/NoRestartStrategy.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.executiongraph.restart;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 
 /**
@@ -32,8 +33,8 @@ public class NoRestartStrategy implements RestartStrategy {
 	}
 
 	@Override
-	public void restart(ExecutionGraph executionGraph) {
-		throw new RuntimeException("NoRestartStrategy does not support restart.");
+	public void restart(RestartCallback restarter, ScheduledExecutor executor) {
+		throw new UnsupportedOperationException("NoRestartStrategy does not support restart.");
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java
new file mode 100644
index 0000000..6499be3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartCallback.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+/**
+ * A callback to trigger restarts, passed to the {@link RestartStrategy} to
+ * trigger recovery on the ExecutionGraph. 
+ */
+public interface RestartCallback {
+
+	/**
+	 * Triggers a full recovery in the target ExecutionGraph.
+	 * A full recovery resets all vertices to the state of the latest checkpoint.
+	 */
+	void triggerFullRecovery();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
index 2880c01..60e2e8b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph.restart;
 
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 
 /**
@@ -33,9 +34,14 @@ public interface RestartStrategy {
 	boolean canRestart();
 
 	/**
-	 * Restarts the given {@link ExecutionGraph}.
+	 * Called by the ExecutionGraph to eventually trigger a full recovery.
+	 * The recovery must be triggered on the given callback object, and may be delayed
+	 * with the help of the given scheduled executor.
+	 * 
+	 * <p>The thread that calls this method is not supposed to block/sleep.
 	 *
-	 * @param executionGraph The ExecutionGraph to be restarted
+	 * @param restarter The hook to restart the ExecutionGraph
+	 * @param executor An scheduled executor to delay the restart
 	 */
-	void restart(ExecutionGraph executionGraph);
+	void restart(RestartCallback restarter, ScheduledExecutor executor);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 6b5ceae..0785a26 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -24,11 +24,13 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -255,25 +257,20 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 	static class TestingRestartStrategy implements RestartStrategy {
 
-		private boolean restartable = true;
-		private ExecutionGraph executionGraph = null;
+		private RestartCallback restarter;
 
 		@Override
 		public boolean canRestart() {
-			return restartable;
+			return true;
 		}
 
 		@Override
-		public void restart(ExecutionGraph executionGraph) {
-			this.executionGraph = executionGraph;
-		}
-
-		public void setRestartable(boolean restartable) {
-			this.restartable = restartable;
+		public void restart(RestartCallback restarter, ScheduledExecutor executor) {
+			this.restarter = restarter;
 		}
 
 		public void restartExecutionGraph() {
-			executionGraph.restart();
+			restarter.triggerFullRecovery();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
index eeb6c69..81702a2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRestartTest.java
@@ -18,21 +18,24 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.dispatch.Futures;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
-import org.apache.flink.runtime.executiongraph.restart.FailureRateRestartStrategy;
+import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -41,6 +44,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
@@ -56,11 +60,9 @@ import scala.concurrent.impl.Promise;
 
 import java.io.IOException;
 import java.util.Iterator;
-import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -159,34 +161,6 @@ public class ExecutionGraphRestartTest extends TestLogger {
 	}
 
 	@Test
-	public void taskShouldFailWhenFailureRateLimitExceeded() throws Exception {
-		FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(2, Time.of(10,
TimeUnit.SECONDS), Time.of(0, TimeUnit.SECONDS));
-		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS);
-		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
-		ExecutionGraph eg = executionGraphInstanceTuple.f0;
-
-		restartAfterFailure(eg, timeout, false);
-		restartAfterFailure(eg, timeout, false);
-		makeAFailureAndWait(eg, timeout);
-		//failure rate limit exceeded, so task should be failed
-		assertEquals(JobStatus.FAILED, eg.getState());
-	}
-
-	@Test
-	public void taskShouldNotFailWhenFailureRateLimitWasNotExceeded() throws Exception {
-		FailureRateRestartStrategy restartStrategy = new FailureRateRestartStrategy(1, Time.of(1,
TimeUnit.MILLISECONDS), Time.of(0, TimeUnit.SECONDS));
-		FiniteDuration timeout = new FiniteDuration(2, TimeUnit.SECONDS);
-		Tuple2<ExecutionGraph, Instance> executionGraphInstanceTuple = createExecutionGraph(restartStrategy);
-		ExecutionGraph eg = executionGraphInstanceTuple.f0;
-
-		//task restarted many times, but after all job is still running, because rate limit was
not exceeded
-		restartAfterFailure(eg, timeout, false);
-		restartAfterFailure(eg, timeout, false);
-		restartAfterFailure(eg, timeout, false);
-		assertEquals(JobStatus.RUNNING, eg.getState());
-	}
-
-	@Test
 	public void testCancelWhileRestarting() throws Exception {
 		// We want to manually control the restart and delay
 		RestartStrategy restartStrategy = new InfiniteDelayRestartStrategy();
@@ -618,23 +592,20 @@ public class ExecutionGraphRestartTest extends TestLogger {
 		}
 
 		@Override
-		public void restart(final ExecutionGraph executionGraph) {
-			Futures.future(new Callable<Object>() {
+		public void restart(final RestartCallback restarter, ScheduledExecutor executor) {
+			executor.execute(new Runnable() {
 				@Override
-				public Object call() throws Exception {
+				public void run() {
 					try {
-
 						Await.ready(doRestart.future(), timeout);
-						executionGraph.restart();
+						restarter.triggerFullRecovery();
 					} catch (Exception e) {
 						exception = e;
 					}
 
 					restartDone.success(true);
-
-					return null;
 				}
-			}, TestingUtils.defaultExecutionContext());
+			});
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 140e984..51b5c7f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -254,9 +254,7 @@ public class ExecutionGraphTestUtils {
 	 * restart strategy.
 	 */
 	public static ExecutionGraph createSimpleTestGraph(RestartStrategy restartStrategy) throws
Exception {
-		JobVertex vertex = new JobVertex("vertex");
-		vertex.setInvokableClass(NoOpInvokable.class);
-		vertex.setParallelism(10);
+		JobVertex vertex = createNoOpVertex(10);
 
 		return createSimpleTestGraph(new JobID(), restartStrategy, vertex);
 	}
@@ -313,6 +311,13 @@ public class ExecutionGraphTestUtils {
 				1,
 				TEST_LOGGER);
 	}
+	
+	public static JobVertex createNoOpVertex(int parallelism) {
+		JobVertex vertex = new JobVertex("vertex");
+		vertex.setInvokableClass(NoOpInvokable.class);
+		vertex.setParallelism(parallelism);
+		return vertex;
+	}
 
 	// ------------------------------------------------------------------------
 	//  utility mocking methods

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java
new file mode 100644
index 0000000..b42dd77
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FailureRateRestartStrategyTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+
+import org.junit.After;
+import org.junit.Test;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for the {@link FailureRateRestartStrategy}.
+ */
+public class FailureRateRestartStrategyTest {
+
+	public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
+
+	public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
+
+	@After
+	public void shutdownExecutor() {
+		executorService.shutdownNow();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testManyFailuresWithinRate() throws Exception {
+		final int numAttempts = 10;
+		final int intervalMillis = 1;
+
+		final FailureRateRestartStrategy restartStrategy =
+				new FailureRateRestartStrategy(1, Time.milliseconds(intervalMillis), Time.milliseconds(0));
+
+		for (int attempsLeft = numAttempts; attempsLeft > 0; --attempsLeft) {
+			assertTrue(restartStrategy.canRestart());
+			restartStrategy.restart(new NoOpRestarter(), executor);
+			sleepGuaranteed(2 * intervalMillis);
+		}
+
+		assertTrue(restartStrategy.canRestart());
+	}
+
+	@Test
+	public void testFailuresExceedingRate() throws Exception {
+		final int numFailures = 3;
+		final int intervalMillis = 10_000;
+
+		final FailureRateRestartStrategy restartStrategy =
+				new FailureRateRestartStrategy(numFailures, Time.milliseconds(intervalMillis), Time.milliseconds(0));
+
+		for (int failuresLeft = numFailures; failuresLeft > 0; --failuresLeft) {
+			assertTrue(restartStrategy.canRestart());
+			restartStrategy.restart(new NoOpRestarter(), executor);
+		}
+
+		// now the rate should be exceeded
+		assertFalse(restartStrategy.canRestart());
+	}
+
+	@Test
+	public void testDelay() throws Exception {
+		final long restartDelay = 2;
+		final int numberRestarts = 10;
+
+		final FailureRateRestartStrategy strategy =
+			new FailureRateRestartStrategy(numberRestarts + 1, Time.milliseconds(1), Time.milliseconds(restartDelay));
+
+		for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) {
+			assertTrue(strategy.canRestart());
+
+			final OneShotLatch sync = new OneShotLatch();
+			final RestartCallback restarter = new LatchedRestarter(sync);
+
+			final long time = System.nanoTime();
+			strategy.restart(restarter, executor);
+			sync.await();
+
+			final long elapsed = System.nanoTime() - time;
+			assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This method makes sure that the actual interval and is not spuriously waking up.
+	 */
+	private static void sleepGuaranteed(long millis) throws InterruptedException {
+		final long deadline = System.nanoTime() + millis * 1_000_000;
+
+		long nanosToSleep;
+		while ((nanosToSleep = deadline - System.nanoTime()) > 0) {
+			long millisToSleep = nanosToSleep / 1_000_000;
+			if (nanosToSleep % 1_000_000 != 0) {
+				millisToSleep++;
+			}
+
+			Thread.sleep(millisToSleep);
+		}
+	}
+	
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
index 4beedb0..17504d8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/FixedDelayRestartStrategyTest.java
@@ -18,34 +18,75 @@
 
 package org.apache.flink.runtime.executiongraph.restart;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 
-import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.junit.After;
 import org.junit.Test;
-import org.mockito.Mockito;
-import scala.concurrent.ExecutionContext$;
 
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit test for the {@link FixedDelayRestartStrategy}.
+ */
 public class FixedDelayRestartStrategyTest {
 
+	public final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
+
+	public final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
+
+	@After
+	public void shutdownExecutor() {
+		executorService.shutdownNow();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testNumberOfRestarts() throws Exception {
+		final int numberRestarts = 10;
+
+		final FixedDelayRestartStrategy strategy =
+				new FixedDelayRestartStrategy(numberRestarts, 0L);
+
+		for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) {
+			// two calls to 'canRestart()' to make sure this is not used to maintain the counter
+			assertTrue(strategy.canRestart());
+			assertTrue(strategy.canRestart());
+
+			strategy.restart(new NoOpRestarter(), executor);
+		}
+
+		assertFalse(strategy.canRestart());
+	}
+
 	@Test
-	public void testFixedDelayRestartStrategy() {
-		int numberRestarts = 10;
-		long restartDelay = 10;
+	public void testDelay() throws Exception {
+		final long restartDelay = 10;
+		final int numberRestarts = 10;
+
+		final FixedDelayRestartStrategy strategy =
+				new FixedDelayRestartStrategy(numberRestarts, restartDelay);
+
+		for (int restartsLeft = numberRestarts; restartsLeft > 0; --restartsLeft) {
+			assertTrue(strategy.canRestart());
 
-		FixedDelayRestartStrategy fixedDelayRestartStrategy = new FixedDelayRestartStrategy(
-			numberRestarts,
-			restartDelay);
+			final OneShotLatch sync = new OneShotLatch();
+			final RestartCallback restarter = new LatchedRestarter(sync);
 
-		ExecutionGraph executionGraph = mock(ExecutionGraph.class);
-		when(executionGraph.getFutureExecutor())
-			.thenReturn(ExecutionContext$.MODULE$.fromExecutor(MoreExecutors.directExecutor()));
+			final long time = System.nanoTime();
+			strategy.restart(restarter, executor);
+			sync.await();
 
-		while(fixedDelayRestartStrategy.canRestart()) {
-			fixedDelayRestartStrategy.restart(executionGraph);
+			final long elapsed = System.nanoTime() - time;
+			assertTrue("Not enough delay", elapsed >= restartDelay * 1_000_000);
 		}
 
-		Mockito.verify(executionGraph, Mockito.times(numberRestarts)).restart();
+		assertFalse(strategy.canRestart());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
index c1cbdd3..d145298 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/InfiniteDelayRestartStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph.restart;
 
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -27,6 +28,7 @@ import org.slf4j.LoggerFactory;
  * Actually {@link ExecutionGraph} will never be restarted. No additional threads will be
used.
  */
 public class InfiniteDelayRestartStrategy implements RestartStrategy {
+
 	private static final Logger LOG = LoggerFactory.getLogger(InfiniteDelayRestartStrategy.class);
 
 	private final int maxRestartAttempts;
@@ -43,15 +45,11 @@ public class InfiniteDelayRestartStrategy implements RestartStrategy {
 
 	@Override
 	public boolean canRestart() {
-		if (maxRestartAttempts >= 0) {
-			return restartAttemptCounter < maxRestartAttempts;
-		} else {
-			return true;
-		}
+		return maxRestartAttempts < 0 || restartAttemptCounter < maxRestartAttempts;
 	}
 
 	@Override
-	public void restart(ExecutionGraph executionGraph) {
+	public void restart(RestartCallback restarter, ScheduledExecutor executor) {
 		LOG.info("Delaying retry of job execution forever");
 
 		if (maxRestartAttempts >= 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java
new file mode 100644
index 0000000..7682fab
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/LatchedRestarter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+
+/**
+ * A testing RestartCallback that triggers a latch when restart is triggered.
+ */
+class LatchedRestarter implements RestartCallback {
+
+	private final OneShotLatch latch;
+
+	LatchedRestarter(OneShotLatch latch) {
+		this.latch = latch;
+	}
+
+	@Override
+	public void triggerFullRecovery() {
+		latch.trigger();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6c0803d5/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java
new file mode 100644
index 0000000..04a7c10
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/restart/NoOpRestarter.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.restart;
+
+/**
+ * A testing RestartCallback that does nothing. 
+ */
+class NoOpRestarter implements RestartCallback {
+
+	@Override
+	public void triggerFullRecovery() {}
+}


Mime
View raw message