flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/5] flink git commit: [FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph
Date Wed, 03 May 2017 18:07:53 GMT
[FLINK-5869] [flip-1] Add basic abstraction for Failover Strategies to ExecutionGraph

  - Rename 'ExecutionGraph.fail()' to 'ExecutionGraph.failGlobally()' to differentiate from fine grained failures/recovery
  - Add base class for FailoverStrategy
  - Add default implementation (restart all tasks)
  - Add logic to load the failover strategy from the configuration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ed85fe4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ed85fe4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ed85fe4

Branch: refs/heads/master
Commit: 8ed85fe49b7595546a8f968e0faa1fa7d4da47ec
Parents: e006127
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Mar 21 19:13:34 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed May 3 19:17:23 2017 +0200

----------------------------------------------------------------------
 .../flink/configuration/JobManagerOptions.java  |   7 +
 .../java/org/apache/flink/util/StringUtils.java |  20 +
 .../flink/runtime/execution/ExecutionState.java |  33 +-
 .../flink/runtime/executiongraph/Execution.java |  56 ++-
 .../runtime/executiongraph/ExecutionGraph.java  | 315 +++++++++++----
 .../executiongraph/ExecutionGraphBuilder.java   |  10 +
 .../executiongraph/ExecutionJobVertex.java      | 172 +++-----
 .../runtime/executiongraph/ExecutionVertex.java |  87 +++-
 .../GlobalModVersionMismatch.java               |  46 +++
 .../failover/FailoverStrategy.java              |  92 +++++
 .../failover/FailoverStrategyLoader.java        |  72 ++++
 .../failover/RestartAllStrategy.java            |  80 ++++
 .../failover/RestartIndividualStrategy.java     | 173 ++++++++
 .../metrics/NumberOfFullRestartsGauge.java      |  47 +++
 .../flink/runtime/jobmaster/JobMaster.java      |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |   1 +
 .../checkpoint/CoordinatorShutdownTest.java     |   4 +-
 ...ExecutionGraphCheckpointCoordinatorTest.java |   4 +-
 .../ExecutionGraphDeploymentTest.java           | 114 ++----
 .../ExecutionGraphMetricsTest.java              |  12 +-
 .../ExecutionGraphRestartTest.java              |  33 +-
 .../ExecutionGraphSchedulingTest.java           |   9 +-
 .../ExecutionGraphSignalsTest.java              | 399 -------------------
 .../executiongraph/ExecutionGraphStopTest.java  | 176 ++++++++
 .../ExecutionGraphSuspendTest.java              | 307 ++++++++++++++
 .../executiongraph/ExecutionGraphTestUtils.java | 263 ++++++++++--
 .../ExecutionGraphVariousFailuesTest.java       | 118 ++++++
 .../ExecutionVertexCancelTest.java              |  45 +--
 .../ExecutionVertexLocalityTest.java            |   2 +-
 .../executiongraph/ExecutionVertexStopTest.java | 129 ------
 .../executiongraph/FinalizeOnMasterTest.java    |  96 +++++
 .../executiongraph/GlobalModVersionTest.java    | 212 ++++++++++
 .../IndividualRestartsConcurrencyTest.java      | 332 +++++++++++++++
 .../TerminalJobStatusListener.java              |  45 ---
 .../utils/SimpleAckingTaskManagerGateway.java   | 121 ++++++
 .../utils/SimpleSlotProvider.java               | 106 +++++
 .../runtime/jobmanager/JobManagerTest.java      |   2 +-
 .../LeaderChangeJobRecoveryTest.java            |   9 +-
 .../partitioner/RescalePartitionerTest.java     |   2 +
 .../ManuallyTriggeredDirectExecutor.java        |  70 ++++
 41 files changed, 2808 insertions(+), 1021 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index 10d9e16..5481d7a 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -73,6 +73,13 @@ public class JobManagerOptions {
 			.withDeprecatedKeys("job-manager.max-attempts-history-size");
 
 	/**
+	 * The maximum number of prior execution attempts kept in history.
+	 */
+	public static final ConfigOption<String> EXECUTION_FAILOVER_STRATEGY =
+		key("jobmanager.execution.failover-strategy")
+			.defaultValue("full");
+
+	/**
 	 * This option specifies the interval in order to trigger a resource manager reconnection if the connection
 	 * to the resource manager has been lost.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
index abd6ba6..6638062 100644
--- a/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/StringUtils.java
@@ -330,6 +330,26 @@ public final class StringUtils {
 		return true;
 	}
 
+	/**
+	 * If both string arguments are non-null, this method concatenates them with ' and '.
+	 * If only one of the arguments is non-null, this method returns the non-null argument.
+	 * If both arguments are null, this method returns null.
+	 * 
+	 * @param s1 The first string argument
+	 * @param s2 The second string argument
+	 * 
+	 * @return The concatenated string, or non-null argument, or null 
+	 */
+	@Nullable
+	public static String concatenateWithAnd(@Nullable String s1, @Nullable String s2) {
+		if (s1 != null) {
+			return s2 == null ? s1 : s1 + " and " + s2;
+		}
+		else {
+			return s2 != null ? s2 : null;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 
 	/** Prevent instantiation of this utility class */

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
index d6ff0cd..53ca8b3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionState.java
@@ -25,27 +25,27 @@ package org.apache.flink.runtime.execution;
  * <pre>{@code
  *
  *     CREATED  -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
- *            |         |            |          |
- *            |         |            |   +------+
- *            |         |            V   V
- *            |         |         CANCELLING -----+----> CANCELED
- *            |         |                         |
- *            |        +-------------------------+
- *            |
- *            |                                   ... -> FAILED
- *           V
+ *        |            |            |          |
+ *        |            |            |   +------+
+ *        |            |            V   V
+ *        |            |         CANCELLING -----+----> CANCELED
+ *        |            |                         |
+ *        |            +-------------------------+
+ *        |
+ *        |                                   ... -> FAILED
+ *        V
  *    RECONCILING  -> RUNNING | FINISHED | CANCELED | FAILED
  *
  * }</pre>
  *
  * <p>It is possible to enter the {@code RECONCILING} state from {@code CREATED}
  * state if job manager fail over, and the {@code RECONCILING} state can switch into
- * any existing task state.</p>
+ * any existing task state.
  *
- * <p>It is possible to enter the {@code FAILED} state from any other state.</p>
+ * <p>It is possible to enter the {@code FAILED} state from any other state.
  *
  * <p>The states {@code FINISHED}, {@code CANCELED}, and {@code FAILED} are
- * considered terminal states.</p>
+ * considered terminal states.
  */
 public enum ExecutionState {
 
@@ -56,7 +56,14 @@ public enum ExecutionState {
 	DEPLOYING,
 	
 	RUNNING,
-	
+
+	/**
+	 * This state marks "successfully completed". It can only be reached when a
+	 * program reaches the "end of its input". The "end of input" can be reached
+	 * when consuming a bounded input (fix set of files, bounded query, etc) or
+	 * when stopping a program (not cancelling!) which make the input look like
+	 * it reached its end at a specific point.
+	 */
 	FINISHED,
 	
 	CANCELING,

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 2680849..c0f1f39 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -74,12 +74,13 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times (for recovery,
- * or other re-computation), this class tracks the state of a single execution of that vertex and the resources.
+ * A single execution of a vertex. While an {@link ExecutionVertex} can be executed multiple times
+ * (for recovery, re-computation, re-configuration), this class tracks the state of a single execution
+ * of that vertex and the resources.
  * 
- * <p>NOTE ABOUT THE DESIGN RATIONAL:
+ * <h2>Lock free state transitions</h2>
  * 
- * <p>In several points of the code, we need to deal with possible concurrent state changes and actions.
+ * In several points of the code, we need to deal with possible concurrent state changes and actions.
  * For example, while the call to deploy a task (send it to the TaskManager) happens, the task gets cancelled.
  * 
  * <p>We could lock the entire portion of the code (decision to deploy, deploy, set state to running) such that
@@ -113,6 +114,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	/** The unique ID marking the specific execution instant of the task */
 	private final ExecutionAttemptID attemptId;
 
+	/** Gets the global modification version of the execution graph when this execution was created.
+	 * This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
+	 * to resolve conflicts between concurrent modification by global and local failover actions. */
+	private final long globalModVersion;
+
+	/** The timestamps when state transitions occurred, indexed by {@link ExecutionState#ordinal()} */ 
 	private final long[] stateTimestamps;
 
 	private final int attemptNumber;
@@ -146,10 +153,27 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Creates a new Execution attempt.
+	 * 
+	 * @param executor
+	 *             The executor used to dispatch callbacks from futures and asynchronous RPC calls.
+	 * @param vertex
+	 *             The execution vertex to which this Execution belongs
+	 * @param attemptNumber
+	 *             The execution attempt number.
+	 * @param globalModVersion
+	 *             The global modification version of the execution graph when this execution was created
+	 * @param startTimestamp
+	 *             The timestamp that marks the creation of this Execution
+	 * @param timeout
+	 *             The timeout for RPC calls like deploy/cancel/stop.
+	 */
 	public Execution(
 			Executor executor,
 			ExecutionVertex vertex,
 			int attemptNumber,
+			long globalModVersion,
 			long startTimestamp,
 			Time timeout) {
 
@@ -158,6 +182,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		this.attemptId = new ExecutionAttemptID();
 		this.timeout = checkNotNull(timeout);
 
+		this.globalModVersion = globalModVersion;
 		this.attemptNumber = attemptNumber;
 
 		this.stateTimestamps = new long[ExecutionState.values().length];
@@ -190,6 +215,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		return state;
 	}
 
+	/**
+	 * Gets the global modification version of the execution graph when this execution was created.
+	 * 
+	 * <p>This version is bumped in the ExecutionGraph whenever a global failover happens. It is used
+	 * to resolve conflicts between concurrent modification by global and local failover actions.
+	 */
+	public long getGlobalModVersion() {
+		return globalModVersion;
+	}
+
 	public SimpleSlot getAssignedResource() {
 		return assignedResource;
 	}
@@ -252,6 +287,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	//  Actions
 	// --------------------------------------------------------------------------------------------
 
+	public boolean scheduleForExecution() {
+		SlotProvider resourceProvider = getVertex().getExecutionGraph().getSlotProvider();
+		boolean allowQueued = getVertex().getExecutionGraph().isQueuedSchedulingAllowed();
+		return scheduleForExecution(resourceProvider, allowQueued);
+	}
+
 	/**
 	 * NOTE: This method only throws exceptions if it is in an illegal state to be scheduled, or if the tasks needs
 	 *       to be scheduled immediately and no resource is available. If the task is accepted by the schedule, any
@@ -381,9 +422,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				taskState,
 				attemptNumber);
 
-			// register this execution at the execution graph, to receive call backs
-			vertex.getExecutionGraph().registerExecution(this);
-			
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
 			final Future<Acknowledge> submitResultFuture = taskManagerGateway.submitTask(deployment, timeout);
@@ -823,7 +861,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				if (current != FAILED) {
 					String message = String.format("Asynchronous race: Found state %s after successful cancel call.", state);
 					LOG.error(message);
-					vertex.getExecutionGraph().fail(new Exception(message));
+					vertex.getExecutionGraph().failGlobal(new Exception(message));
 				}
 				return;
 			}
@@ -1069,7 +1107,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			// make sure that the state transition completes normally.
 			// potential errors (in listeners may not affect the main logic)
 			try {
-				vertex.notifyStateTransition(attemptId, targetState, error);
+				vertex.notifyStateTransition(this, targetState, error);
 			}
 			catch (Throwable t) {
 				LOG.error("Error while notifying execution graph of execution state transition.", t);

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index fff1ea2..5eaa637 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import org.apache.commons.lang3.StringUtils;
-
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
@@ -42,11 +40,15 @@ import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils.ConjunctFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -65,10 +67,12 @@ import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
+import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -91,6 +95,7 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -100,9 +105,9 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * The execution graph is the central data structure that coordinates the distributed
  * execution of a data flow. It keeps representations of each parallel task, each
- * intermediate result, and the communication between them.
+ * intermediate stream, and the communication between them.
  *
- * The execution graph consists of the following constructs:
+ * <p>The execution graph consists of the following constructs:
  * <ul>
  *     <li>The {@link ExecutionJobVertex} represents one vertex from the JobGraph (usually one operation like
  *         "map" or "join") during execution. It holds the aggregated state of all parallel subtasks.
@@ -118,12 +123,41 @@ import static org.apache.flink.util.Preconditions.checkState;
  *         about deployment of tasks and updates in the task status always use the ExecutionAttemptID to
  *         address the message receiver.</li>
  * </ul>
+ * 
+ * <h2>Global and local failover</h2>
+ * 
+ * The Execution Graph has two failover modes: <i>global failover</i> and <i>local failover</i>.
+ * 
+ * <p>A <b>global failover</b> aborts the task executions for all vertices and restarts whole
+ * data flow graph from the last completed checkpoint. Global failover is considered the
+ * "fallback strategy" that is used when a local failover is unsuccessful, or when a issue is
+ * found in the state of the ExecutionGraph that could mark it as inconsistent (caused by a bug).
+ * 
+ * <p>A <b>local failover</b> is triggered when an individual vertex execution (a task) fails.
+ * The local failover is coordinated by the {@link FailoverStrategy}. A local failover typically
+ * attempts to restart as little as possible, but as much as necessary.
+ * 
+ * <p>Between local- and global failover, the global failover always takes precedence, because it
+ * is the core mechanism that the ExecutionGraph relies on to bring back consistency. The
+ * guard that, the ExecutionGraph maintains a <i>global modification version</i>, which is incremented
+ * with every global failover (and other global actions, like job cancellation, or terminal
+ * failure). Local failover is always scoped by the modification version that the execution graph
+ * had when the failover was triggered. If a new global modification version is reached during
+ * local failover (meaning there is a concurrent global failover), the failover strategy has to
+ * yield before the global failover.  
  */
 public class ExecutionGraph implements AccessExecutionGraph, Archiveable<ArchivedExecutionGraph> {
 
+	/** In place updater for the execution graph's current state. Avoids having to use an
+	 * AtomicReference and thus makes the frequent read access a bit faster */
 	private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
 
+	/** In place updater for the execution graph's current global recovery version.
+	 * Avoids having to use an AtomicLong and thus makes the frequent read access a bit faster */
+	private static final AtomicLongFieldUpdater<ExecutionGraph> GLOBAL_VERSION_UPDATER =
+			AtomicLongFieldUpdater.newUpdater(ExecutionGraph.class, "globalModVersion");
+
 	/** The log object used for debugging. */
 	static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
 
@@ -169,6 +203,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** Listeners that receive messages whenever a single task execution changes its status */
 	private final List<ExecutionStatusListener> executionListeners;
 
+	/** The implementation that decides how to recover the failures of tasks */
+	private final FailoverStrategy failoverStrategy;
+
 	/** Timestamps (in milliseconds as returned by {@code System.currentTimeMillis()} when
 	 * the execution graph transitioned into a certain state. The index into this array is the
 	 * ordinal of the enum value, i.e. the timestamp when the graph went into state "RUNNING" is
@@ -178,6 +215,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** The timeout for all messages that require a response/acknowledgement */
 	private final Time rpcCallTimeout;
 
+	/** The timeout for bulk slot allocation (eager scheduling mode). After this timeout,
+	 * slots are released and a recovery is triggered */
+	private final Time scheduleAllocationTimeout;
+
 	/** Strategy to use for restarts */
 	private final RestartStrategy restartStrategy;
 
@@ -190,6 +231,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** Registered KvState instances reported by the TaskManagers. */
 	private final KvStateLocationRegistry kvStateLocationRegistry;
 
+	/** The total number of vertices currently in the execution graph */
 	private int numVerticesTotal;
 
 	// ------ Configuration of the Execution -------
@@ -203,8 +245,6 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * from results than need to be materialized. */
 	private ScheduleMode scheduleMode = ScheduleMode.LAZY_FROM_SOURCES;
 
-	private final Time scheduleAllocationTimeout;
-
 	// ------ Execution status and progress. These values are volatile, and accessed under the lock -------
 
 	private final AtomicInteger verticesFinished;
@@ -212,6 +252,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	/** Current status of the job execution */
 	private volatile JobStatus state = JobStatus.CREATED;
 
+	/** A future that completes once the job has reached a terminal state */
+	private volatile CompletableFuture<JobStatus> terminationFuture;
+
+	/** On each global recovery, this version is incremented. The version breaks conflicts
+	 * between concurrent restart attempts by local failover strategies */
+	private volatile long globalModVersion;
+
 	/** The exception that caused the job to fail. This is set to the first root exception
 	 * that was not recoverable and triggered job failure */
 	private volatile Throwable failureCause;
@@ -233,7 +280,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * This constructor is for tests only, because it does not include class loading information.
+	 * This constructor is for tests only, because it sets default values for many fields.
 	 */
 	@VisibleForTesting
 	ExecutionGraph(
@@ -255,6 +302,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			serializedConfig,
 			timeout,
 			restartStrategy,
+			new RestartAllStrategy.Factory(),
 			Collections.<BlobKey>emptyList(),
 			Collections.<URL>emptyList(),
 			slotProvider,
@@ -270,6 +318,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			SerializedValue<ExecutionConfig> serializedConfig,
 			Time timeout,
 			RestartStrategy restartStrategy,
+			FailoverStrategy.Factory failoverStrategyFactory,
 			List<BlobKey> requiredJarFiles,
 			List<URL> requiredClasspaths,
 			SlotProvider slotProvider,
@@ -322,6 +371,13 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		this.kvStateLocationRegistry = new KvStateLocationRegistry(jobId, getAllVertices());
 
 		this.verticesFinished = new AtomicInteger();
+
+		this.globalModVersion = 1L;
+
+		// the failover strategy must be instantiated last, so that the execution graph
+		// is ready by the time the failover strategy sees it
+		this.failoverStrategy = checkNotNull(failoverStrategyFactory.create(this), "null failover strategy");
+		LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName());
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -541,6 +597,18 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return failureCause;
 	}
 
+	/**
+	 * Gets the number of full restarts that the execution graph went through.
+	 * If a full restart recovery is currently pending, this recovery is included in the
+	 * count.
+	 * 
+	 * @return The number of full restarts so far
+	 */
+	public long getNumberOfFullRestarts() {
+		// subtract one, because the version starts at one
+		return globalModVersion - 1;
+	}
+
 	@Override
 	public String getFailureCauseAsString() {
 		return ExceptionUtils.stringifyException(failureCause);
@@ -689,11 +757,12 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	// --------------------------------------------------------------------------------------------
 
 	public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d "
-					+ "vertices and %d intermediate results.", topologiallySorted.size(), tasks.size(), intermediateResults.size()));
-		}
 
+		LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} " +
+				"vertices and {} intermediate results.",
+				topologiallySorted.size(), tasks.size(), intermediateResults.size());
+
+		final ArrayList<ExecutionJobVertex> newExecJobVertices = new ArrayList<>(topologiallySorted.size());
 		final long createTimestamp = System.currentTimeMillis();
 
 		for (JobVertex jobVertex : topologiallySorted) {
@@ -704,7 +773,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 			// create the execution job vertex and attach it to the graph
 			ExecutionJobVertex ejv =
-					new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, createTimestamp);
+					new ExecutionJobVertex(this, jobVertex, 1, rpcCallTimeout, globalModVersion, createTimestamp);
 			ejv.connectToPredecessors(this.intermediateResults);
 
 			ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
@@ -723,7 +792,11 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 			this.verticesInCreationOrder.add(ejv);
 			this.numVerticesTotal += ejv.getParallelism();
+			newExecJobVertices.add(ejv);
 		}
+
+		terminationFuture = new FlinkCompletableFuture<>();
+		failoverStrategy.notifyNewVertices(newExecJobVertices);
 	}
 
 	public void scheduleForExecution() throws JobException {
@@ -856,7 +929,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 						// ExecutionGraph notices
 						// we need to go into recovery and make sure to release all slots
 						try {
-							fail(t);
+							failGlobal(t);
 						}
 						finally {
 							ExecutionGraphUtils.releaseAllSlotsSilently(resources);
@@ -889,6 +962,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			if (current == JobStatus.RUNNING || current == JobStatus.CREATED) {
 				if (transitionState(current, JobStatus.CANCELLING)) {
 
+					// make sure no concurrent local actions interfere with the cancellation
+					incrementGlobalModVersion();
+
 					final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
 
 					// cancel all tasks (that still need cancelling)
@@ -920,8 +996,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			else if (current == JobStatus.RESTARTING) {
 				synchronized (progressLock) {
 					if (transitionState(current, JobStatus.CANCELED)) {
-						postRunCleanup();
-						progressLock.notifyAll();
+						onTerminalState(JobStatus.CANCELED);
 
 						LOG.info("Canceled during restart.");
 						return;
@@ -951,7 +1026,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	 * Suspends the current ExecutionGraph.
 	 *
 	 * The JobStatus will be directly set to SUSPENDED iff the current state is not a terminal
-	 * state. All ExecutionJobVertices will be canceled and the postRunCleanup is executed.
+	 * state. All ExecutionJobVertices will be canceled and the onTerminalState() is executed.
 	 *
 	 * The SUSPENDED state is a local terminal state which stops the execution of the job but does
 	 * not remove the job from the HA job store so that it can be recovered by another JobManager.
@@ -962,21 +1037,23 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		while (true) {
 			JobStatus currentState = state;
 
-			if (currentState.isGloballyTerminalState()) {
+			if (currentState.isTerminalState()) {
 				// stay in a terminal state
 				return;
 			} else if (transitionState(currentState, JobStatus.SUSPENDED, suspensionCause)) {
 				this.failureCause = suspensionCause;
 
+				// make sure no concurrent local actions interfere with the cancellation
+				incrementGlobalModVersion();
+
 				for (ExecutionJobVertex ejv: verticesInCreationOrder) {
 					ejv.cancel();
 				}
 
 				synchronized (progressLock) {
-						postRunCleanup();
-						progressLock.notifyAll();
+					onTerminalState(JobStatus.SUSPENDED);
 
-						LOG.info("Job {} has been suspended.", getJobID());
+					LOG.info("Job {} has been suspended.", getJobID());
 				}
 
 				return;
@@ -984,7 +1061,18 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	public void fail(Throwable t) {
+	/**
+	 * Fails the execution graph globally. This failure will not be recovered by a specific
+	 * failover strategy, but results in a full restart of all tasks.
+	 * 
+	 * <p>This global failure is meant to be triggered in cases where the consistency of the
+	 * execution graph' state cannot be guaranteed any more (for example when catching unexpected
+	 * exceptions that indicate a bug or an unexpected call race), and where a full restart is the
+	 * safe way to get consistency back.
+	 * 
+	 * @param t The exception that caused the failure.
+	 */
+	public void failGlobal(Throwable t) {
 		while (true) {
 			JobStatus current = state;
 			// stay in these states
@@ -1003,6 +1091,9 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			else if (transitionState(current, JobStatus.FAILING, t)) {
 				this.failureCause = t;
 
+				// make sure no concurrent local actions interfere with the cancellation
+				incrementGlobalModVersion();
+
 				// we build a future that is complete once all vertices have reached a terminal state
 				final ArrayList<Future<?>> futures = new ArrayList<>(verticesInCreationOrder.size());
 
@@ -1044,23 +1135,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					throw new IllegalStateException("Can only restart job from state restarting.");
 				}
 
-				if (slotProvider == null) {
-					throw new IllegalStateException("The execution graph has not been scheduled before - slotProvider is null.");
-				}
-
 				this.currentExecutions.clear();
 
-				Collection<CoLocationGroup> colGroups = new HashSet<>();
+				final Collection<CoLocationGroup> colGroups = new HashSet<>();
+				final long resetTimestamp = System.currentTimeMillis();
 
 				for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
 
 					CoLocationGroup cgroup = jv.getCoLocationGroup();
-					if(cgroup != null && !colGroups.contains(cgroup)){
+					if (cgroup != null && !colGroups.contains(cgroup)){
 						cgroup.resetConstraints();
 						colGroups.add(cgroup);
 					}
 
-					jv.resetForNewExecution();
+					jv.resetForNewExecution(resetTimestamp, globalModVersion);
 				}
 
 				for (int i = 0; i < stateTimestamps.length; i++) {
@@ -1083,7 +1171,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 		catch (Throwable t) {
 			LOG.warn("Failed to restart the job.", t);
-			fail(t);
+			failGlobal(t);
 		}
 	}
 
@@ -1124,32 +1212,45 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		return null;
 	}
 
+	@VisibleForTesting
+	public Future<JobStatus> getTerminationFuture() {
+		return terminationFuture;
+	}
+
+	@VisibleForTesting
+	public JobStatus waitUntilTerminal() throws InterruptedException {
+		try {
+			return terminationFuture.get();
+		}
+		catch (ExecutionException e) {
+			// this should never happen
+			// it would be a bug, so we  don't expect this to be handled and throw
+			// an unchecked exception here
+			throw new RuntimeException(e);
+		}
+	}
+
 	/**
-	 * For testing: This waits until the job execution has finished.
+	 * Gets the failover strategy used by the execution graph to recover from failures of tasks.
 	 */
-	public void waitUntilFinished() throws InterruptedException {
-		// we may need multiple attempts in the presence of failures / recovery
-		while (true) {
-			for (ExecutionJobVertex ejv : verticesInCreationOrder) {
-				for (ExecutionVertex ev : ejv.getTaskVertices()) {
-					try {
-						ev.getCurrentExecutionAttempt().getTerminationFuture().get();
-					}
-					catch (ExecutionException e) {
-						// this should never happen
-						throw new RuntimeException(e);
-					}
-				}
-			}
+	public FailoverStrategy getFailoverStrategy() {
+		return this.failoverStrategy;
+	}
 
-			// now that all vertices have been (at some point) in a terminal state,
-			// we need to check if the job as a whole has entered a final state
-			if (state.isTerminalState()) {
-				return;
-			}
-		}
+	/**
+	 * Gets the current global modification version of the ExecutionGraph.
+	 * The global modification version is incremented with each global action (cancel/fail/restart)
+	 * and is used to disambiguate concurrent modifications between local and global
+	 * failover actions.
+	 */
+	long getGlobalModVersion() {
+		return globalModVersion;
 	}
 
+	// ------------------------------------------------------------------------
+	//  State Transitions
+	// ------------------------------------------------------------------------
+
 	private boolean transitionState(JobStatus current, JobStatus newState) {
 		return transitionState(current, newState, null);
 	}
@@ -1175,11 +1276,45 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
+	private long incrementGlobalModVersion() {
+		return GLOBAL_VERSION_UPDATER.incrementAndGet(this);
+	}
+
+	// ------------------------------------------------------------------------
+	//  Job Status Progress
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Called whenever a vertex reaches state FINISHED (completed successfully).
+	 * Once all vertices are in the FINISHED state, the program is successfully done.
+	 */
 	void vertexFinished() {
-		int numFinished = verticesFinished.incrementAndGet();
+		final int numFinished = verticesFinished.incrementAndGet();
 		if (numFinished == numVerticesTotal) {
 			// done :-)
-			allVerticesInTerminalState();
+
+			// check whether we are still in "RUNNING" and trigger the final cleanup
+			if (state == JobStatus.RUNNING) {
+				// we do the final cleanup in the I/O executor, because it may involve
+				// some heavier work
+
+				try {
+					for (ExecutionJobVertex ejv : verticesInCreationOrder) {
+						ejv.getJobVertex().finalizeOnMaster(getUserClassLoader());
+					}
+				}
+				catch (Throwable t) {
+					ExceptionUtils.rethrowIfFatalError(t);
+					failGlobal(new Exception("Failed to finalize execution on master", t));
+					return;
+				}
+
+				// if we do not make this state transition, then a concurrent
+				// cancellation or failure happened
+				if (transitionState(JobStatus.RUNNING, JobStatus.FINISHED)) {
+					onTerminalState(JobStatus.FINISHED);
+				}
+			}
 		}
 	}
 
@@ -1187,6 +1322,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		verticesFinished.getAndDecrement();
 	}
 
+	/**
+	 * This method is a callback during cancellation/failover and called when all tasks
+	 * have reached a terminal state (cancelled/failed/finished).
+	 */
 	private void allVerticesInTerminalState() {
 		// we are done, transition to the final state
 		JobStatus current;
@@ -1194,14 +1333,11 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 			current = this.state;
 
 			if (current == JobStatus.RUNNING) {
-				if (transitionState(current, JobStatus.FINISHED)) {
-					postRunCleanup();
-					break;
-				}
+				failGlobal(new Exception("ExecutionGraph went into allVerticesInTerminalState() from RUNNING"));
 			}
 			else if (current == JobStatus.CANCELLING) {
 				if (transitionState(current, JobStatus.CANCELED)) {
-					postRunCleanup();
+					onTerminalState(JobStatus.CANCELED);
 					break;
 				}
 			}
@@ -1221,7 +1357,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				break;
 			}
 			else {
-				fail(new Exception("ExecutionGraph went into final state from state " + current));
+				failGlobal(new Exception("ExecutionGraph went into final state from state " + current));
 				break;
 			}
 		}
@@ -1255,18 +1391,16 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 					restartStrategy.restart(this);
 
 					return true;
-				} else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
-					final List<String> reasonsForNoRestart = new ArrayList<>(2);
-					if (!isFailureCauseAllowingRestart) {
-						reasonsForNoRestart.add("a type of SuppressRestartsException was thrown");
-					}
-					if (!isRestartStrategyAllowingRestart) {
-						reasonsForNoRestart.add("the restart strategy prevented it");
-					}
+				}
+				else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
+					final String cause1 = isFailureCauseAllowingRestart ? null :  
+							"a type of SuppressRestartsException was thrown";
+					final String cause2 = isRestartStrategyAllowingRestart ? null :
+						"the restart strategy prevented it";
 
 					LOG.info("Could not restart the job {} ({}) because {}.", getJobName(), getJobID(),
-						StringUtils.join(reasonsForNoRestart, " and "), failureCause);
-					postRunCleanup();
+						StringUtils.concatenateWithAnd(cause1, cause2), failureCause);
+					onTerminalState(JobStatus.FAILED);
 
 					return true;
 				} else {
@@ -1280,16 +1414,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	private void postRunCleanup() {
+	private void onTerminalState(JobStatus status) {
 		try {
 			CheckpointCoordinator coord = this.checkpointCoordinator;
 			this.checkpointCoordinator = null;
 			if (coord != null) {
-				coord.shutdown(state);
+				coord.shutdown(status);
 			}
-		} catch (Exception e) {
+		}
+		catch (Exception e) {
 			LOG.error("Error while cleaning up after execution", e);
 		}
+		finally {
+			terminationFuture.complete(status);
+		}
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -1343,7 +1481,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 				ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
 
 				// failures during updates leave the ExecutionGraph inconsistent
-				fail(t);
+				failGlobal(t);
 				return false;
 			}
 		}
@@ -1405,7 +1543,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	void registerExecution(Execution exec) {
 		Execution previous = currentExecutions.putIfAbsent(exec.getAttemptId(), exec);
 		if (previous != null) {
-			fail(new Exception("Trying to register execution " + exec + " for already used ID " + exec.getAttemptId()));
+			failGlobal(new Exception("Trying to register execution " + exec + " for already used ID " + exec.getAttemptId()));
 		}
 	}
 
@@ -1413,7 +1551,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		Execution contained = currentExecutions.remove(exec.getAttemptId());
 
 		if (contained != null && contained != exec) {
-			fail(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
+			failGlobal(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
 		}
 	}
 
@@ -1471,21 +1609,21 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 	}
 
 	void notifyExecutionChange(
-			JobVertexID vertexId, int subtask, ExecutionAttemptID executionID,
-			ExecutionState newExecutionState, Throwable error)
-	{
-		ExecutionJobVertex vertex = getJobVertex(vertexId);
+			final Execution execution,
+			final ExecutionState newExecutionState,
+			final Throwable error) {
 
 		if (executionListeners.size() > 0) {
+			final ExecutionJobVertex vertex = execution.getVertex().getJobVertex();
 			final String message = error == null ? null : ExceptionUtils.stringifyException(error);
 			final long timestamp = System.currentTimeMillis();
 
 			for (ExecutionStatusListener listener : executionListeners) {
 				try {
 					listener.executionStatusChanged(
-							getJobID(), vertexId, vertex.getJobVertex().getName(),
-							vertex.getParallelism(), subtask, executionID, newExecutionState,
-							timestamp, message);
+							getJobID(), vertex.getJobVertexId(), vertex.getJobVertex().getName(),
+							vertex.getParallelism(), execution.getParallelSubtaskIndex(),
+							execution.getAttemptId(), newExecutionState, timestamp, message);
 				} catch (Throwable t) {
 					LOG.warn("Error while notifying ExecutionStatusListener", t);
 				}
@@ -1494,7 +1632,20 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 
 		// see what this means for us. currently, the first FAILED state means -> FAILED
 		if (newExecutionState == ExecutionState.FAILED) {
-			fail(error);
+			final Throwable ex = error != null ? error : new FlinkException("Unknown Error (missing cause)");
+
+			// by filtering out late failure calls, we can save some work in
+			// avoiding redundant local failover
+			if (execution.getGlobalModVersion() == globalModVersion) {
+				try {
+					failoverStrategy.onTaskFailure(execution, ex);
+				}
+				catch (Throwable t) {
+					// bug in the failover strategy - fall back to global failover
+					LOG.warn("Error in failover strategy - falling back to global restart", t);
+					failGlobal(ex);
+				}
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index b40817f..88863e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -34,7 +34,10 @@ import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
+import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
 import org.apache.flink.runtime.executiongraph.metrics.DownTimeGauge;
+import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge;
 import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
@@ -90,6 +93,9 @@ public class ExecutionGraphBuilder {
 		final String jobName = jobGraph.getName();
 		final JobID jobId = jobGraph.getJobID();
 
+		final FailoverStrategy.Factory failoverStrategy = 
+				FailoverStrategyLoader.loadFailoverStrategy(jobManagerConfig, log);
+
 		// create a new execution graph, if none exists so far
 		final ExecutionGraph executionGraph = (prior != null) ? prior :
 				new ExecutionGraph(
@@ -101,6 +107,7 @@ public class ExecutionGraphBuilder {
 						jobGraph.getSerializedExecutionConfig(),
 						timeout,
 						restartStrategy,
+						failoverStrategy,
 						jobGraph.getUserJarBlobKeys(),
 						jobGraph.getClasspaths(),
 						slotProvider,
@@ -269,6 +276,9 @@ public class ExecutionGraphBuilder {
 		metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
 		metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
 		metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
+		metrics.gauge(NumberOfFullRestartsGauge.METRIC_NAME, new NumberOfFullRestartsGauge(executionGraph));
+
+		executionGraph.getFailoverStrategy().registerMetrics(metrics);
 
 		return executionGraph;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 3197e65..3a98e0a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
@@ -28,7 +29,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
-import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.concurrent.Future;
@@ -58,6 +58,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * An {@code ExecutionJobVertex} is part of the {@link ExecutionGraph}, and the peer 
+ * to the {@link JobVertex}.
+ * 
+ * <p>The {@code ExecutionJobVertex} corresponds to a parallelized operation. It
+ * contains an {@link ExecutionVertex} for each parallel instance of that operation.
+ */
 public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable<ArchivedExecutionJobVertex> {
 
 	/** Use the same log for all ExecutionGraph classes */
@@ -115,21 +122,26 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	private SerializedValue<TaskInformation> serializedTaskInformation;
 
 	private InputSplitAssigner splitAssigner;
-	
-	public ExecutionJobVertex(
+
+	/**
+	 * Convenience constructor for testing.
+	 */
+	@VisibleForTesting
+	ExecutionJobVertex(
 		ExecutionGraph graph,
 		JobVertex jobVertex,
 		int defaultParallelism,
 		Time timeout) throws JobException {
 
-		this(graph, jobVertex, defaultParallelism, timeout, System.currentTimeMillis());
+		this(graph, jobVertex, defaultParallelism, timeout, 1L, System.currentTimeMillis());
 	}
-	
+
 	public ExecutionJobVertex(
 		ExecutionGraph graph,
 		JobVertex jobVertex,
 		int defaultParallelism,
 		Time timeout,
+		long initialGlobalModVersion,
 		long createTimestamp) throws JobException {
 
 		if (graph == null || jobVertex == null) {
@@ -190,18 +202,24 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		// create all task vertices
 		for (int i = 0; i < numTaskVertices; i++) {
 			ExecutionVertex vertex = new ExecutionVertex(
-					this, i, this.producedDataSets, timeout, createTimestamp, maxPriorAttemptsHistoryLength);
+					this,
+					i,
+					producedDataSets,
+					timeout,
+					initialGlobalModVersion,
+					createTimestamp,
+					maxPriorAttemptsHistoryLength);
 
 			this.taskVertices[i] = vertex;
 		}
-		
+
 		// sanity check for the double referencing between intermediate result partitions and execution vertices
 		for (IntermediateResult ir : this.producedDataSets) {
 			if (ir.getNumberOfAssignedPartitions() != parallelism) {
 				throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");
 			}
 		}
-		
+
 		// set up the input splits, if the vertex has any
 		try {
 			@SuppressWarnings("unchecked")
@@ -508,7 +526,8 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		}
 	}
 
-	public void resetForNewExecution() {
+	public void resetForNewExecution(final long timestamp, final long expectedGlobalModVersion)
+			throws GlobalModVersionMismatch {
 
 		synchronized (stateMonitor) {
 			// check and reset the sharing groups with scheduler hints
@@ -517,7 +536,7 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 			}
 
 			for (int i = 0; i < parallelism; i++) {
-				taskVertices[i].resetForNewExecution();
+				taskVertices[i].resetForNewExecution(timestamp, expectedGlobalModVersion);
 			}
 
 			// set up the input splits again
@@ -558,112 +577,36 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	}
 
 	// --------------------------------------------------------------------------------------------
-	//  Static / pre-assigned input splits
+	//  Archiving
 	// --------------------------------------------------------------------------------------------
 
-	private List<LocatableInputSplit>[] computeLocalInputSplitsPerTask(InputSplit[] splits) throws JobException {
-		
-		final int numSubTasks = getParallelism();
-		
-		// sanity check
-		if (numSubTasks > splits.length) {
-			throw new JobException("Strictly local assignment requires at least as many splits as subtasks.");
-		}
-		
-		// group the splits by host while preserving order per host
-		Map<String, List<LocatableInputSplit>> splitsByHost = new HashMap<String, List<LocatableInputSplit>>();
-		
-		for (InputSplit split : splits) {
-			// check that split has exactly one local host
-			if(!(split instanceof LocatableInputSplit)) {
-				throw new JobException("Invalid InputSplit type " + split.getClass().getCanonicalName() + ". " +
-						"Strictly local assignment requires LocatableInputSplit");
-			}
-			LocatableInputSplit lis = (LocatableInputSplit) split;
-
-			if (lis.getHostnames() == null) {
-				throw new JobException("LocatableInputSplit has no host information. " +
-						"Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
-			}
-			else if (lis.getHostnames().length != 1) {
-				throw new JobException("Strictly local assignment requires exactly one hostname for each LocatableInputSplit.");
-			}
-			String hostName = lis.getHostnames()[0];
-			
-			if (hostName == null) {
-				throw new JobException("For strictly local input split assignment, no null host names are allowed.");
-			}
-
-			List<LocatableInputSplit> hostSplits = splitsByHost.get(hostName);
-			if (hostSplits == null) {
-				hostSplits = new ArrayList<LocatableInputSplit>();
-				splitsByHost.put(hostName, hostSplits);
-			}
-			hostSplits.add(lis);
-		}
-		
-		
-		int numHosts = splitsByHost.size();
-		
-		if (numSubTasks < numHosts) {
-			throw new JobException("Strictly local split assignment requires at least as " +
-					"many parallel subtasks as distinct split hosts. Please increase the parallelism " +
-					"of DataSource "+this.getJobVertex().getName()+" to at least "+numHosts+".");
-		}
-
-		// get list of hosts in deterministic order
-		List<String> hosts = new ArrayList<String>(splitsByHost.keySet());
-		Collections.sort(hosts);
-		
-		@SuppressWarnings("unchecked")
-		List<LocatableInputSplit>[] subTaskSplitAssignment = (List<LocatableInputSplit>[]) new List<?>[numSubTasks];
-		
-		final int subtasksPerHost = numSubTasks / numHosts;
-		final int hostsWithOneMore = numSubTasks % numHosts;
-		
-		int subtaskNum = 0;
-		
-		// we go over all hosts and distribute the hosts' input splits
-		// over the subtasks
-		for (int hostNum = 0; hostNum < numHosts; hostNum++) {
-			String host = hosts.get(hostNum);
-			List<LocatableInputSplit> splitsOnHost = splitsByHost.get(host);
-			
-			int numSplitsOnHost = splitsOnHost.size();
-			
-			// the number of subtasks to split this over.
-			// NOTE: if the host has few splits, some subtasks will not get anything.
-			int subtasks = Math.min(numSplitsOnHost, 
-							hostNum < hostsWithOneMore ? subtasksPerHost + 1 : subtasksPerHost);
-			
-			int splitsPerSubtask = numSplitsOnHost / subtasks;
-			int subtasksWithOneMore = numSplitsOnHost % subtasks;
-			
-			int splitnum = 0;
-			
-			// go over the subtasks and grab a subrange of the input splits
-			for (int i = 0; i < subtasks; i++) {
-				int numSplitsForSubtask = (i < subtasksWithOneMore ? splitsPerSubtask + 1 : splitsPerSubtask);
-				
-				List<LocatableInputSplit> splitList;
-				
-				if (numSplitsForSubtask == numSplitsOnHost) {
-					splitList = splitsOnHost;
-				}
-				else {
-					splitList = new ArrayList<LocatableInputSplit>(numSplitsForSubtask);
-					for (int k = 0; k < numSplitsForSubtask; k++) {
-						splitList.add(splitsOnHost.get(splitnum++));
-					}
-				}
-				
-				subTaskSplitAssignment[subtaskNum++] = splitList;
-			}
-		}
-		
-		return subTaskSplitAssignment;
+	@Override
+	public ArchivedExecutionJobVertex archive() {
+		return new ArchivedExecutionJobVertex(this);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Static Utilities
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A utility function that computes an "aggregated" state for the vertex.
+	 * 
+	 * <p>This state is not used anywhere in the  coordination, but can be used for display
+	 * in dashboards to as a summary for how the particular parallel operation represented by
+	 * this ExecutionJobVertex is currently behaving.
+	 * 
+	 * <p>For example, if at least one parallel task is failed, the aggregate state is failed.
+	 * If not, and at least one parallel task is cancelling (or cancelled), the aggregate state
+	 * is cancelling (or cancelled). If all tasks are finished, the aggregate state is finished,
+	 * and so on.
+	 * 
+	 * @param verticesPerState The number of vertices in each state (indexed by the ordinal of
+	 *                         the ExecutionState values).
+	 * @param parallelism The parallelism of the ExecutionJobVertex
+	 * 
+	 * @return The aggregate state of this ExecutionJobVertex. 
+	 */
 	public static ExecutionState getAggregateJobVertexState(int[] verticesPerState, int parallelism) {
 		if (verticesPerState == null || verticesPerState.length != ExecutionState.values().length) {
 			throw new IllegalArgumentException("Must provide an array as large as there are execution states.");
@@ -739,9 +682,4 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 
 		return expanded;
 	}
-
-	@Override
-	public ArchivedExecutionJobVertex archive() {
-		return new ArchivedExecutionJobVertex(this);
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index bcf7a7c..e8c1984 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
@@ -97,7 +98,11 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 
 	// --------------------------------------------------------------------------------------------
 
-	public ExecutionVertex(
+	/**
+	 * Convenience constructor for tests. Sets various fields to default values.
+	 */
+	@VisibleForTesting
+	ExecutionVertex(
 			ExecutionJobVertex jobVertex,
 			int subTaskIndex,
 			IntermediateResult[] producedDataSets,
@@ -108,24 +113,28 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 				subTaskIndex,
 				producedDataSets,
 				timeout,
+				1L,
 				System.currentTimeMillis(),
 				JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE.defaultValue());
 	}
 
+	/**
+	 * 
+	 * @param timeout
+	 *            The RPC timeout to use for deploy / cancel calls
+	 * @param initialGlobalModVersion
+	 *            The global modification version to initialize the first Execution with.
+	 * @param createTimestamp
+	 *            The timestamp for the vertex creation, used to initialize the first Execution with.
+	 * @param maxPriorExecutionHistoryLength
+	 *            The number of prior Executions (= execution attempts) to keep.
+	 */
 	public ExecutionVertex(
 			ExecutionJobVertex jobVertex,
 			int subTaskIndex,
 			IntermediateResult[] producedDataSets,
 			Time timeout,
-			int maxPriorExecutionHistoryLength) {
-		this(jobVertex, subTaskIndex, producedDataSets, timeout, System.currentTimeMillis(), maxPriorExecutionHistoryLength);
-	}
-
-	public ExecutionVertex(
-			ExecutionJobVertex jobVertex,
-			int subTaskIndex,
-			IntermediateResult[] producedDataSets,
-			Time timeout,
+			long initialGlobalModVersion,
 			long createTimestamp,
 			int maxPriorExecutionHistoryLength) {
 
@@ -151,6 +160,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			getExecutionGraph().getFutureExecutor(),
 			this,
 			0,
+			initialGlobalModVersion,
 			createTimestamp,
 			timeout);
 
@@ -163,6 +173,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			this.locationConstraint = null;
 		}
 
+		getExecutionGraph().registerExecution(currentExecution);
+
 		this.timeout = timeout;
 	}
 
@@ -508,11 +520,40 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	//   Actions
 	// --------------------------------------------------------------------------------------------
 
-	public Execution resetForNewExecution() {
-
+	/**
+	 * Archives the current Execution and creates a new Execution for this vertex.
+	 * 
+	 * <p>This method atomically checks if the ExecutionGraph is still of an expected
+	 * global mod. version and replaces the execution if that is the case. If the ExecutionGraph
+	 * has increased its global mod. version in the meantime, this operation fails.
+	 * 
+	 * <p>This mechanism can be used to prevent conflicts between various concurrent recovery and
+	 * reconfiguration actions in a similar way as "optimistic concurrency control".
+	 * 
+	 * @param timestamp
+	 *             The creation timestamp for the new Execution
+	 * @param originatingGlobalModVersion
+	 *             The 
+	 * 
+	 * @return Returns the new created Execution. 
+	 * 
+	 * @throws GlobalModVersionMismatch Thrown, if the execution graph has a new global mod
+	 *                                  version than the one passed to this message.
+	 */
+	public Execution resetForNewExecution(final long timestamp, final long originatingGlobalModVersion)
+			throws GlobalModVersionMismatch
+	{
 		LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex());
 
 		synchronized (priorExecutions) {
+			// check if another global modification has been triggered since the
+			// action that originally caused this reset/restart happened
+			final long actualModVersion = getExecutionGraph().getGlobalModVersion();
+			if (actualModVersion > originatingGlobalModVersion) {
+				// global change happened since, reject this action
+				throw new GlobalModVersionMismatch(originatingGlobalModVersion, actualModVersion);
+			}
+
 			final Execution oldExecution = currentExecution;
 			final ExecutionState oldState = oldExecution.getState();
 
@@ -522,8 +563,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 				final Execution newExecution = new Execution(
 					getExecutionGraph().getFutureExecutor(),
 					this,
-						oldExecution.getAttemptNumber()+1,
-					System.currentTimeMillis(),
+					oldExecution.getAttemptNumber() + 1,
+					originatingGlobalModVersion,
+					timestamp,
 					timeout);
 
 				this.currentExecution = newExecution;
@@ -533,6 +575,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 					this.locationConstraint = grp.getLocationConstraint(subTaskIndex);
 				}
 
+				// register this execution at the execution graph, to receive call backs
+				getExecutionGraph().registerExecution(newExecution);
+
 				// if the execution was 'FINISHED' before, tell the ExecutionGraph that
 				// we take one step back on the road to reaching global FINISHED
 				if (oldState == FINISHED) {
@@ -640,9 +685,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	// --------------------------------------------------------------------------------------------
 
 	void executionFinished(Execution execution) {
-		if (execution == currentExecution) {
-			getExecutionGraph().vertexFinished();
-		}
+		getExecutionGraph().vertexFinished();
 	}
 
 	void executionCanceled(Execution execution) {
@@ -658,10 +701,14 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Simply forward this notification. This is for logs and event archivers.
+	 * Simply forward this notification
 	 */
-	void notifyStateTransition(ExecutionAttemptID executionId, ExecutionState newState, Throwable error) {
-		getExecutionGraph().notifyExecutionChange(getJobvertexId(), subTaskIndex, executionId, newState, error);
+	void notifyStateTransition(Execution execution, ExecutionState newState, Throwable error) {
+		// only forward this notification if the execution is still the current execution
+		// otherwise we have an outdated execution
+		if (currentExecution == execution) {
+			getExecutionGraph().notifyExecutionChange(execution, newState, error);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GlobalModVersionMismatch.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GlobalModVersionMismatch.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GlobalModVersionMismatch.java
new file mode 100644
index 0000000..bc96805
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/GlobalModVersionMismatch.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+/**
+ * An exception that indicates a mismatch between the expected global modification version
+ * of the execution graph, and the actual modification version.
+ */
+public class GlobalModVersionMismatch extends Exception {
+
+	private static final long serialVersionUID = 6643688395797045098L;
+
+	private final long expectedModVersion;
+
+	private final long actualModVersion;
+
+	public GlobalModVersionMismatch(long expectedModVersion, long actualModVersion) {
+		super("expected=" + expectedModVersion + ", actual=" + actualModVersion);
+		this.expectedModVersion = expectedModVersion;
+		this.actualModVersion = actualModVersion;
+	}
+
+	public long expectedModVersion() {
+		return expectedModVersion;
+	}
+
+	public long actualModVersion() {
+		return actualModVersion;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategy.java
new file mode 100644
index 0000000..2c4313f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategy.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+
+import java.util.List;
+
+/**
+ * A {@code FailoverStrategy} describes how the job computation recovers from task
+ * failures.
+ * 
+ * <p>Failover strategies implement recovery logic for failures of tasks. The execution
+ * graph still implements "global failure / recovery" (which restarts all tasks) as
+ * a fallback plan or safety net in cases where it deems that the state of the graph
+ * may have become inconsistent.
+ */
+public abstract class FailoverStrategy {
+
+
+	// ------------------------------------------------------------------------
+	//  failover implementation
+	// ------------------------------------------------------------------------ 
+
+	/**
+	 * Called by the execution graph when a task failure occurs.
+	 * 
+	 * @param taskExecution The execution attempt of the failed task. 
+	 * @param cause The exception that caused the task failure.
+	 */
+	public abstract void onTaskFailure(Execution taskExecution, Throwable cause);
+
+	/**
+	 * Called whenever new vertices are added to the ExecutionGraph.
+	 * 
+	 * @param newJobVerticesTopological The newly added vertices, in topological order.
+	 */
+	public abstract void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological);
+
+	/**
+	 * Gets the name of the failover strategy, for logging purposes.
+	 */
+	public abstract String getStrategyName();
+
+	/**
+	 * Tells the FailoverStrategy to register its metrics.
+	 * 
+	 * <p>The default implementation does nothing
+	 * 
+	 * @param metricGroup The metric group to register the metrics at
+	 */
+	public void registerMetrics(MetricGroup metricGroup) {}
+
+	// ------------------------------------------------------------------------
+	//  factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * This factory is a necessary indirection when creating the FailoverStrategy to that
+	 * we can have both the FailoverStrategy final in the ExecutionGraph, and the
+	 * ExecutionGraph final in the FailOverStrategy.
+	 */
+	public interface Factory {
+
+		/**
+		 * Instantiates the {@code FailoverStrategy}.
+		 * 
+		 * @param executionGraph The execution graph for which the strategy implements failover.
+		 * @return The instantiated failover strategy.
+		 */
+		FailoverStrategy create(ExecutionGraph executionGraph);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
new file mode 100644
index 0000000..f18a90f
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverStrategyLoader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nullable;
+
+/**
+ * A utility class to load failover strategies from the configuration. 
+ */
+public class FailoverStrategyLoader {
+
+	/** Config name for the {@link RestartAllStrategy} */
+	public static final String FULL_RESTART_STRATEGY_NAME = "full";
+
+	/** Config name for the strategy that restarts individual tasks */
+	public static final String INDIVIDUAL_RESTART_STRATEGY_NAME = "individual";
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Loads a FailoverStrategy Factory from the given configuration.
+	 */
+	public static FailoverStrategy.Factory loadFailoverStrategy(Configuration config, @Nullable Logger logger) {
+		final String strategyParam = config.getString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY);
+
+		if (StringUtils.isNullOrWhitespaceOnly(strategyParam)) {
+			if (logger != null) {
+				logger.warn("Null config value for {} ; using default failover strategy (full restarts).",
+						JobManagerOptions.EXECUTION_FAILOVER_STRATEGY.key());
+			}
+
+			return new RestartAllStrategy.Factory();
+		}
+		else {
+			switch (strategyParam.toLowerCase()) {
+				case FULL_RESTART_STRATEGY_NAME:
+					return new RestartAllStrategy.Factory();
+
+				case INDIVIDUAL_RESTART_STRATEGY_NAME:
+					return new RestartIndividualStrategy.Factory();
+
+				default:
+					// we could interpret the parameter as a factory class name and instantiate that
+					// for now we simply do not support this
+					throw new IllegalConfigurationException("Unknown failover strategy: " + strategyParam);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartAllStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartAllStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartAllStrategy.java
new file mode 100644
index 0000000..21f42b9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartAllStrategy.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple failover strategy that triggers a restart of all tasks in the
+ * execution graph, via {@link ExecutionGraph#failGlobal(Throwable)}.
+ */
+public class RestartAllStrategy extends FailoverStrategy {
+
+	/** The execution graph to recover */
+	private final ExecutionGraph executionGraph;
+
+	/**
+	 * Creates a new failover strategy that recovers from failures by restarting all tasks
+	 * of the execution graph.
+	 * 
+	 * @param executionGraph The execution graph to handle.
+	 */
+	public RestartAllStrategy(ExecutionGraph executionGraph) {
+		this.executionGraph = checkNotNull(executionGraph);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void onTaskFailure(Execution taskExecution, Throwable cause) {
+		// this strategy makes every task failure a global failure
+		executionGraph.failGlobal(cause);
+	}
+
+	@Override
+	public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
+		// nothing to do
+	}
+
+	@Override
+	public String getStrategyName() {
+		return "full graph restart";
+	}
+
+	// ------------------------------------------------------------------------
+	//  factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Factory that instantiates the RestartAllStrategy.
+	 */
+	public static class Factory implements FailoverStrategy.Factory {
+
+		@Override
+		public FailoverStrategy create(ExecutionGraph executionGraph) {
+			return new RestartAllStrategy(executionGraph);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
new file mode 100644
index 0000000..0a449b8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/RestartIndividualStrategy.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.failover;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Simple failover strategy that restarts each task individually.
+ * This strategy is only applicable if the entire job consists unconnected
+ * tasks, meaning each task is its own component.
+ */
+public class RestartIndividualStrategy extends FailoverStrategy {
+
+	private static final Logger LOG = LoggerFactory.getLogger(RestartIndividualStrategy.class);
+
+	// ------------------------------------------------------------------------
+
+	/** The execution graph to recover */
+	private final ExecutionGraph executionGraph;
+
+	/** The executor that executes restart callbacks */
+	private final Executor callbackExecutor;
+
+	private final SimpleCounter numTaskFailures;
+
+	/**
+	 * Creates a new failover strategy that recovers from failures by restarting all tasks
+	 * of the execution graph.
+	 * 
+	 * <p>The strategy will use the ExecutionGraph's future executor for callbacks.
+	 * 
+	 * @param executionGraph The execution graph to handle.
+	 */
+	public RestartIndividualStrategy(ExecutionGraph executionGraph) {
+		this(executionGraph, executionGraph.getFutureExecutor());
+	}
+
+	/**
+	 * Creates a new failover strategy that recovers from failures by restarting all tasks
+	 * of the execution graph.
+	 *
+	 * @param executionGraph The execution graph to handle.
+	 * @param callbackExecutor The executor that executes restart callbacks
+	 */
+	public RestartIndividualStrategy(ExecutionGraph executionGraph, Executor callbackExecutor) {
+		this.executionGraph = checkNotNull(executionGraph);
+		this.callbackExecutor = checkNotNull(callbackExecutor);
+
+		this.numTaskFailures = new SimpleCounter();
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void onTaskFailure(Execution taskExecution, Throwable cause) {
+
+		// to better handle the lack of resources (potentially by a scale-in), we
+		// make failures due to missing resources global failures 
+		if (cause instanceof NoResourceAvailableException) {
+			LOG.info("Not enough resources to schedule {} - triggering full recovery.", taskExecution);
+			executionGraph.failGlobal(cause);
+			return;
+		}
+
+		LOG.info("Recovering task failure for {} (#{}) via individual restart.", 
+				taskExecution.getVertex().getTaskNameWithSubtaskIndex(), taskExecution.getAttemptNumber());
+
+		numTaskFailures.inc();
+
+		// trigger the restart once the task has reached its terminal state
+		// Note: currently all tasks passed here are already in their terminal state,
+		//       so we could actually avoid the future. We use it anyways because it is cheap and
+		//       it helps to support better testing
+		final Future<ExecutionState> terminationFuture = taskExecution.getTerminationFuture();
+
+		final ExecutionVertex vertexToRecover = taskExecution.getVertex(); 
+		final long globalModVersion = taskExecution.getGlobalModVersion();
+
+		terminationFuture.thenAcceptAsync(new AcceptFunction<ExecutionState>() {
+			@Override
+			public void accept(ExecutionState value) {
+				try {
+					long createTimestamp = System.currentTimeMillis();
+					Execution newExecution = vertexToRecover.resetForNewExecution(createTimestamp, globalModVersion);
+					newExecution.scheduleForExecution();
+				}
+				catch (GlobalModVersionMismatch e) {
+					// this happens if a concurrent global recovery happens. simply do nothing.
+				}
+				catch (Exception e) {
+					executionGraph.failGlobal(
+							new Exception("Error during fine grained recovery - triggering full recovery", e));
+				}
+			}
+		}, callbackExecutor);
+	}
+
+	@Override
+	public void notifyNewVertices(List<ExecutionJobVertex> newJobVerticesTopological) {
+		// we validate here that the vertices are in fact not connected to
+		// any other vertices
+		for (ExecutionJobVertex ejv : newJobVerticesTopological) {
+			List<IntermediateResult> inputs = ejv.getInputs();
+			IntermediateResult[] outputs = ejv.getProducedDataSets();
+
+			if ((inputs != null && inputs.size() > 0) || (outputs != null && outputs.length > 0)) {
+				throw new FlinkRuntimeException("Incompatible failover strategy - strategy '" + 
+						getStrategyName() + "' can only handle jobs with only disconnected tasks.");
+			}
+		}
+	}
+
+	@Override
+	public String getStrategyName() {
+		return "Individual Task Restart";
+	}
+
+	@Override
+	public void registerMetrics(MetricGroup metricGroup) {
+		metricGroup.counter("task_failures", numTaskFailures);
+	}
+
+	// ------------------------------------------------------------------------
+	//  factory
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Factory that instantiates the RestartAllStrategy.
+	 */
+	public static class Factory implements FailoverStrategy.Factory {
+
+		@Override
+		public RestartIndividualStrategy create(ExecutionGraph executionGraph) {
+			return new RestartIndividualStrategy(executionGraph);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/NumberOfFullRestartsGauge.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/NumberOfFullRestartsGauge.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/NumberOfFullRestartsGauge.java
new file mode 100644
index 0000000..05a6414
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/metrics/NumberOfFullRestartsGauge.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph.metrics;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gauge which returns the number of full restarts.
+ */
+public class NumberOfFullRestartsGauge implements Gauge<Long> {
+
+	public static final String METRIC_NAME = "fullRestarts";
+
+	// ------------------------------------------------------------------------
+
+	private final ExecutionGraph eg;
+
+	public NumberOfFullRestartsGauge(ExecutionGraph executionGraph) {
+		this.eg = checkNotNull(executionGraph);
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public Long getValue() {
+		return eg.getNumberOfFullRestarts();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8ed85fe4/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 99dbc86..e60ff77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -376,7 +376,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 					executionGraph.scheduleForExecution();
 				}
 				catch (Throwable t) {
-					executionGraph.fail(t);
+					executionGraph.failGlobal(t);
 				}
 			}
 		});


Mime
View raw message