flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [3/4] flink git commit: [FLINK-7153] Introduce LocationPreferenceConstraint for scheduling
Date Thu, 02 Nov 2017 17:01:14 GMT
[FLINK-7153] Introduce LocationPreferenceConstraint for scheduling

The LocationPreferenceConstraint defines whether all or any preferred locations
have to be taken into consideration when scheduling tasks. Especially for batch
jobs where we do lazy scheduling not all input locations might be known for a
consumer task. Therefore, we set the location preference constraint to any which
means that only those location are taken into consideration which are known at
scheduling time.

[FLINK-7153] Add test cases

Replace AtomicReference with AtomicReferenceUpdater

Fix

Use static imports Preconditions.checkNotNull

Initialize ANY array with number of returned futures

Revert formatting changes in Scheduler

Set flink-runtime log level in log4j-test.properties to OFF

Revert changes to ExecutionVertex#getPreferredLocationsBasedOnInputs

Fix failing FailoverRegionTest

This closes #4916.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b0fb26b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b0fb26b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b0fb26b

Branch: refs/heads/master
Commit: 3b0fb26bfb779a98f8dcadbb4a7ba206e11f9c2c
Parents: c73b2fe
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Fri Oct 27 09:47:03 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Nov 2 17:04:45 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 152 +++++++---
 .../runtime/executiongraph/ExecutionGraph.java  |  13 +-
 .../executiongraph/ExecutionJobVertex.java      |  29 +-
 .../runtime/executiongraph/ExecutionVertex.java |  41 ++-
 .../executiongraph/failover/FailoverRegion.java |   6 +-
 .../apache/flink/runtime/instance/SlotPool.java |  12 +-
 .../flink/runtime/instance/SlotProvider.java    |   8 +-
 .../scheduler/LocationPreferenceConstraint.java |  32 +++
 .../runtime/jobmanager/scheduler/Scheduler.java |  84 +++---
 .../ExecutionGraphDeploymentTest.java           | 107 +++++++
 .../ExecutionGraphMetricsTest.java              |   3 +-
 .../ExecutionGraphSchedulingTest.java           |   4 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  14 +-
 .../runtime/executiongraph/ExecutionTest.java   | 286 +++++++++++++++++++
 .../ExecutionVertexCancelTest.java              |   5 +-
 .../ExecutionVertexDeploymentTest.java          |   4 +-
 .../ExecutionVertexSchedulingTest.java          |  15 +-
 .../executiongraph/FailoverRegionTest.java      |   9 +-
 .../executiongraph/ProgrammedSlotProvider.java  |  29 +-
 .../utils/SimpleSlotProvider.java               |   6 +-
 .../ScheduleWithCoLocationHintTest.java         | 121 ++++----
 .../scheduler/SchedulerIsolatedTasksTest.java   |  46 +--
 .../scheduler/SchedulerSlotSharingTest.java     | 223 ++++++++-------
 .../scheduler/SchedulerTestUtils.java           |  15 +-
 .../taskmanager/LocalTaskManagerLocation.java   |  35 +++
 25 files changed, 972 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 9d3e128..38c3821 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.executiongraph;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.time.Time;
@@ -37,6 +38,7 @@ import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -45,11 +47,12 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -58,7 +61,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
@@ -96,6 +98,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	private static final AtomicReferenceFieldUpdater<Execution, ExecutionState> STATE_UPDATER =
 			AtomicReferenceFieldUpdater.newUpdater(Execution.class, ExecutionState.class, "state");
 
+	private static final AtomicReferenceFieldUpdater<Execution, SimpleSlot> ASSIGNED_SLOT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
+		Execution.class,
+		SimpleSlot.class,
+		"assignedResource");
+
 	private static final Logger LOG = ExecutionGraph.LOG;
 
 	private static final int NUM_CANCEL_CALL_TRIES = 3;
@@ -134,7 +141,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 	private volatile ExecutionState state = CREATED;
 
-	private final AtomicReference<SimpleSlot> assignedResource;
+	private volatile SimpleSlot assignedResource;
 
 	private volatile Throwable failureCause;          // once assigned, never changes
 
@@ -193,7 +200,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		this.terminationFuture = new CompletableFuture<>();
 		this.taskManagerLocationFuture = new CompletableFuture<>();
 
-		this.assignedResource = new AtomicReference<>();
+		this.assignedResource = null;
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -234,7 +241,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	public SimpleSlot getAssignedResource() {
-		return assignedResource.get();
+		return assignedResource;
 	}
 
 	/**
@@ -244,22 +251,23 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param slot to assign to this execution
 	 * @return true if the slot could be assigned to the execution, otherwise false
 	 */
+	@VisibleForTesting
 	boolean tryAssignResource(final SimpleSlot slot) {
-		Preconditions.checkNotNull(slot);
+		checkNotNull(slot);
 
 		// only allow to set the assigned resource in state SCHEDULED or CREATED
 		// note: we also accept resource assignment when being in state CREATED for testing purposes
 		if (state == SCHEDULED || state == CREATED) {
-			if (assignedResource.compareAndSet(null, slot)) {
+			if (ASSIGNED_SLOT_UPDATER.compareAndSet(this, null, slot)) {
 				// check for concurrent modification (e.g. cancelling call)
 				if (state == SCHEDULED || state == CREATED) {
-					Preconditions.checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet.");
+					checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet.");
 					taskManagerLocationFuture.complete(slot.getTaskManagerLocation());
 
 					return true;
 				} else {
 					// free assigned resource and return false
-					assignedResource.set(null);
+					ASSIGNED_SLOT_UPDATER.set(this, null);
 					return false;
 				}
 			} else {
@@ -275,7 +283,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	@Override
 	public TaskManagerLocation getAssignedResourceLocation() {
 		// returns non-null only when a location is already assigned
-		return assignedResource.get() != null ? assignedResource.get().getTaskManagerLocation() : null;
+		final SimpleSlot currentAssignedResource = assignedResource;
+		return currentAssignedResource != null ? currentAssignedResource.getTaskManagerLocation() : null;
 	}
 
 	public Throwable getFailureCause() {
@@ -333,7 +342,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	public boolean scheduleForExecution() {
 		SlotProvider resourceProvider = getVertex().getExecutionGraph().getSlotProvider();
 		boolean allowQueued = getVertex().getExecutionGraph().isQueuedSchedulingAllowed();
-		return scheduleForExecution(resourceProvider, allowQueued);
+		return scheduleForExecution(resourceProvider, allowQueued, LocationPreferenceConstraint.ANY);
 	}
 
 	/**
@@ -344,12 +353,19 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param slotProvider The slot provider to use to allocate slot for this execution attempt.
 	 * @param queued Flag to indicate whether the scheduler may queue this task if it cannot
 	 *               immediately deploy it.
+	 * @param locationPreferenceConstraint constraint for the location preferences
 	 * 
 	 * @throws IllegalStateException Thrown, if the vertex is not in CREATED state, which is the only state that permits scheduling.
 	 */
-	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
+	public boolean scheduleForExecution(
+		SlotProvider slotProvider,
+		boolean queued,
+		LocationPreferenceConstraint locationPreferenceConstraint) {
 		try {
-			final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution(slotProvider, queued);
+			final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution(
+				slotProvider,
+				queued,
+				locationPreferenceConstraint);
 
 			// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
 			// that we directly deploy the tasks if the slot allocation future is completed. This is
@@ -387,11 +403,15 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 *
 	 * @param slotProvider to obtain a new slot from
 	 * @param queued if the allocation can be queued
+	 * @param locationPreferenceConstraint constraint for the location preferences
 	 * @return Future which is completed with this execution once the slot has been assigned
 	 * 			or with an exception if an error occurred.
 	 * @throws IllegalExecutionStateException if this method has been called while not being in the CREATED state
 	 */
-	public CompletableFuture<Execution> allocateAndAssignSlotForExecution(SlotProvider slotProvider, boolean queued) throws IllegalExecutionStateException {
+	public CompletableFuture<Execution> allocateAndAssignSlotForExecution(
+			SlotProvider slotProvider,
+			boolean queued,
+			LocationPreferenceConstraint locationPreferenceConstraint) throws IllegalExecutionStateException {
 
 		checkNotNull(slotProvider);
 
@@ -411,18 +431,27 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					new ScheduledUnit(this, sharingGroup) :
 					new ScheduledUnit(this, sharingGroup, locationConstraint);
 
-			CompletableFuture<SimpleSlot> slotFuture = slotProvider.allocateSlot(toSchedule, queued);
-
-			return slotFuture.thenApply(slot -> {
-				if (tryAssignResource(slot)) {
-					return this;
-				} else {
-					// release the slot
-					slot.releaseSlot();
+			// calculate the preferred locations
+			final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = calculatePreferredLocations(locationPreferenceConstraint);
+
+			return preferredLocationsFuture
+				.thenCompose(
+					(Collection<TaskManagerLocation> preferredLocations) ->
+						slotProvider.allocateSlot(
+							toSchedule,
+							queued,
+							preferredLocations))
+				.thenApply(
+					(SimpleSlot slot) -> {
+						if (tryAssignResource(slot)) {
+							return this;
+						} else {
+							// release the slot
+							slot.releaseSlot();
 
-					throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned "));
-				}
-			});
+							throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned "));
+						}
+					});
 		}
 		else {
 			// call race, already deployed, or already done
@@ -436,7 +465,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @throws JobException if the execution cannot be deployed to the assigned resource
 	 */
 	public void deploy() throws JobException {
-		final SimpleSlot slot  = assignedResource.get();
+		final SimpleSlot slot  = assignedResource;
 
 		checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
 
@@ -516,7 +545,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * Sends stop RPC call.
 	 */
 	public void stop() {
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -579,7 +608,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					try {
 						vertex.getExecutionGraph().deregisterExecution(this);
 
-						final SimpleSlot slot = assignedResource.get();
+						final SimpleSlot slot = assignedResource;
 
 						if (slot != null) {
 							slot.releaseSlot();
@@ -640,8 +669,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					() -> {
 						try {
 							consumerVertex.scheduleForExecution(
-									consumerVertex.getExecutionGraph().getSlotProvider(),
-									consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed());
+								consumerVertex.getExecutionGraph().getSlotProvider(),
+								consumerVertex.getExecutionGraph().isQueuedSchedulingAllowed(),
+								LocationPreferenceConstraint.ANY); // there must be at least one known location
 						} catch (Throwable t) {
 							consumerVertex.fail(new IllegalStateException("Could not schedule consumer " +
 									"vertex " + consumerVertex, t));
@@ -748,7 +778,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			int maxStrackTraceDepth,
 			Time timeout) {
 
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -772,7 +802,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param timestamp of the completed checkpoint
 	 */
 	public void notifyCheckpointComplete(long checkpointId, long timestamp) {
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -792,7 +822,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param checkpointOptions of the checkpoint to trigger
 	 */
 	public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -850,7 +880,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 						updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
-						final SimpleSlot slot = assignedResource.get();
+						final SimpleSlot slot = assignedResource;
 
 						if (slot != null) {
 							slot.releaseSlot();
@@ -908,7 +938,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 				if (transitionState(current, CANCELED)) {
 					try {
-						final SimpleSlot slot = assignedResource.get();
+						final SimpleSlot slot = assignedResource;
 
 						if (slot != null) {
 							slot.releaseSlot();
@@ -1005,7 +1035,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
 				try {
-					final SimpleSlot slot = assignedResource.get();
+					final SimpleSlot slot = assignedResource;
 					if (slot != null) {
 						slot.releaseSlot();
 					}
@@ -1022,7 +1052,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					}
 
 					try {
-						if (assignedResource.get() != null) {
+						if (assignedResource != null) {
 							sendCancelRpcCall();
 						}
 					} catch (Throwable tt) {
@@ -1089,7 +1119,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * The sending is tried up to NUM_CANCEL_CALL_TRIES times.
 	 */
 	private void sendCancelRpcCall() {
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1110,7 +1140,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	private void sendFailIntermediateResultPartitionsRpcCall() {
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1128,7 +1158,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	private void sendUpdatePartitionInfoRpcCall(
 			final Iterable<PartitionInfo> partitionInfos) {
 
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1151,6 +1181,46 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	//  Miscellaneous
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Calculates the preferred locations based on the location preference constraint.
+	 *
+	 * @param locationPreferenceConstraint constraint for the location preference
+	 * @return Future containing the collection of preferred locations. This might not be completed if not all inputs
+	 * 		have been a resource assigned.
+	 */
+	@VisibleForTesting
+	public CompletableFuture<Collection<TaskManagerLocation>> calculatePreferredLocations(LocationPreferenceConstraint locationPreferenceConstraint) {
+		final Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = getVertex().getPreferredLocationsBasedOnInputs();
+		final CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture;
+
+		switch(locationPreferenceConstraint) {
+			case ALL:
+				preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
+				break;
+			case ANY:
+				final ArrayList<TaskManagerLocation> completedTaskManagerLocations = new ArrayList<>(preferredLocationFutures.size());
+
+				for (CompletableFuture<TaskManagerLocation> preferredLocationFuture : preferredLocationFutures) {
+					if (preferredLocationFuture.isDone() && !preferredLocationFuture.isCompletedExceptionally()) {
+						final TaskManagerLocation taskManagerLocation = preferredLocationFuture.getNow(null);
+
+						if (taskManagerLocation == null) {
+							throw new FlinkRuntimeException("TaskManagerLocationFuture was completed with null. This indicates a programming bug.");
+						}
+
+						completedTaskManagerLocations.add(taskManagerLocation);
+					}
+				}
+
+				preferredLocationsFuture = CompletableFuture.completedFuture(completedTaskManagerLocations);
+				break;
+			default:
+				throw new RuntimeException("Unknown LocationPreferenceConstraint " + locationPreferenceConstraint + '.');
+		}
+
+		return preferredLocationsFuture;
+	}
+
 	private boolean transitionState(ExecutionState currentState, ExecutionState targetState) {
 		return transitionState(currentState, targetState, null);
 	}
@@ -1248,7 +1318,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	
 	@Override
 	public String toString() {
-		final SimpleSlot slot = assignedResource.get();
+		final SimpleSlot slot = assignedResource;
 
 		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(),
 				(slot == null ? "(unassigned)" : slot), state);

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 62c6e99..8a74001 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -60,6 +60,7 @@ import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -853,11 +854,14 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		}
 	}
 
-	private void scheduleLazy(SlotProvider slotProvider) throws NoResourceAvailableException {
+	private void scheduleLazy(SlotProvider slotProvider) {
 		// simply take the vertices without inputs.
 		for (ExecutionJobVertex ejv : verticesInCreationOrder) {
 			if (ejv.getJobVertex().isInputVertex()) {
-				ejv.scheduleAll(slotProvider, allowQueuedScheduling);
+				ejv.scheduleAll(
+					slotProvider,
+					allowQueuedScheduling,
+					LocationPreferenceConstraint.ALL); // since it is an input vertex, the input based location preferences should be empty
 			}
 		}
 	}
@@ -884,7 +888,10 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		// allocate the slots (obtain all their futures
 		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
 			// these calls are not blocking, they only return futures
-			Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(slotProvider, queued);
+			Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(
+				slotProvider,
+				queued,
+				LocationPreferenceConstraint.ALL);
 
 			allAllocationFutures.addAll(allocationFutures);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 98191d0..fff7ce1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.types.Either;
@@ -455,14 +456,24 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	//---------------------------------------------------------------------------------------------
 	//  Actions
 	//---------------------------------------------------------------------------------------------
-	
-	public void scheduleAll(SlotProvider slotProvider, boolean queued) {
+
+	/**
+	 * Schedules all execution vertices of this ExecutionJobVertex.
+	 *
+	 * @param slotProvider to allocate the slots from
+	 * @param queued if the allocations can be queued
+	 * @param locationPreferenceConstraint constraint for the location preferences
+	 */
+	public void scheduleAll(
+			SlotProvider slotProvider,
+			boolean queued,
+			LocationPreferenceConstraint locationPreferenceConstraint) {
 		
 		final ExecutionVertex[] vertices = this.taskVertices;
 
 		// kick off the tasks
 		for (ExecutionVertex ev : vertices) {
-			ev.scheduleForExecution(slotProvider, queued);
+			ev.scheduleForExecution(slotProvider, queued, locationPreferenceConstraint);
 		}
 	}
 
@@ -474,8 +485,13 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 * <p>If this method throws an exception, it makes sure to release all so far requested slots.
 	 * 
 	 * @param resourceProvider The resource provider from whom the slots are requested.
+	 * @param queued if the allocation can be queued
+	 * @param locationPreferenceConstraint constraint for the location preferences
 	 */
-	public Collection<CompletableFuture<Execution>> allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) {
+	public Collection<CompletableFuture<Execution>> allocateResourcesForAll(
+			SlotProvider resourceProvider,
+			boolean queued,
+			LocationPreferenceConstraint locationPreferenceConstraint) {
 		final ExecutionVertex[] vertices = this.taskVertices;
 		final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length];
 
@@ -484,7 +500,10 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 		for (int i = 0; i < vertices.length; i++) {
 			// allocate the next slot (future)
 			final Execution exec = vertices[i].getCurrentExecutionAttempt();
-			final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(resourceProvider, queued);
+			final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(
+				resourceProvider,
+				queued,
+				locationPreferenceConstraint);
 			slots[i] = allocationFuture;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e87a5a0..6d45d06 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
@@ -487,7 +488,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 			return Collections.emptySet();
 		}
 		else {
-			Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(4);
+			Set<CompletableFuture<TaskManagerLocation>> locations = new HashSet<>(getTotalNumberOfParallelSubtasks());
+			Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(getTotalNumberOfParallelSubtasks());
 
 			// go over all inputs
 			for (int i = 0; i < inputEdges.length; i++) {
@@ -497,17 +499,26 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 					// go over all input sources
 					for (int k = 0; k < sources.length; k++) {
 						// look-up assigned slot of input source
-						CompletableFuture<TaskManagerLocation> taskManagerLocationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
-						inputLocations.add(taskManagerLocationFuture);
-
+						CompletableFuture<TaskManagerLocation> locationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
+						// add input location
+						inputLocations.add(locationFuture);
+						// inputs which have too many distinct sources are not considered
 						if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-							return Collections.emptyList();
+							inputLocations.clear();
+							break;
 						}
 					}
 				}
+				// keep the locations of the input with the least preferred locations
+				if (locations.isEmpty() || // nothing assigned yet
+						(!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
+					// current input has fewer preferred locations
+					locations.clear();
+					locations.addAll(inputLocations);
+				}
 			}
 
-			return inputLocations.isEmpty() ? Collections.emptyList() : inputLocations;
+			return locations.isEmpty() ? Collections.emptyList() : locations;
 		}
 	}
 
@@ -587,8 +598,22 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		}
 	}
 
-	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
-		return this.currentExecution.scheduleForExecution(slotProvider, queued);
+	/**
+	 * Schedules the current execution of this ExecutionVertex.
+	 *
+	 * @param slotProvider to allocate the slots from
+	 * @param queued if the allocation can be queued
+	 * @param locationPreferenceConstraint constraint for the location preferences
+	 * @return
+	 */
+	public boolean scheduleForExecution(
+			SlotProvider slotProvider,
+			boolean queued,
+			LocationPreferenceConstraint locationPreferenceConstraint) {
+		return this.currentExecution.scheduleForExecution(
+			slotProvider,
+			queued,
+			locationPreferenceConstraint);
 	}
 
 	@VisibleForTesting

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
index 1919c61..0b00c0e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/FailoverRegion.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.FlinkException;
 
@@ -216,8 +217,9 @@ public class FailoverRegion {
 				for (ExecutionVertex ev : connectedExecutionVertexes) {
 					try {
 						ev.scheduleForExecution(
-								executionGraph.getSlotProvider(),
-								executionGraph.isQueuedSchedulingAllowed());
+							executionGraph.getSlotProvider(),
+							executionGraph.isQueuedSchedulingAllowed(),
+							LocationPreferenceConstraint.ANY); // some inputs not belonging to the failover region might have failed concurrently
 					}
 					catch (Throwable e) {
 						failover(globalModVersionOfFailover);

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index fcf7d40..1944b38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -1002,14 +1002,12 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		}
 
 		@Override
-		public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
-			Collection<CompletableFuture<TaskManagerLocation>> locationPreferenceFutures =
-				task.getTaskToExecute().getVertex().getPreferredLocations();
+		public CompletableFuture<SimpleSlot> allocateSlot(
+				ScheduledUnit task,
+				boolean allowQueued,
+				Collection<TaskManagerLocation> preferredLocations) {
 
-			CompletableFuture<Collection<TaskManagerLocation>> locationPreferencesFuture = FutureUtils.combineAll(locationPreferenceFutures);
-
-			return locationPreferencesFuture.thenCompose(
-				locationPreferences -> gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout));
+			return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, preferredLocations, timeout);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
index 23e6749..ef988b4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotProvider.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.instance;
 
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -40,7 +42,11 @@ public interface SlotProvider {
 	 *
 	 * @param task         The task to allocate the slot for
 	 * @param allowQueued  Whether allow the task be queued if we do not have enough resource
+	 * @param preferredLocations preferred locations for the slot allocation
 	 * @return The future of the allocation
 	 */
-	CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued);
+	CompletableFuture<SimpleSlot> allocateSlot(
+		ScheduledUnit task,
+		boolean allowQueued,
+		Collection<TaskManagerLocation> preferredLocations);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java
new file mode 100644
index 0000000..e890512
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/LocationPreferenceConstraint.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.scheduler;
+
+/**
+ * Defines the location preference constraint.
+ *
+ * <p> Currently, we support that all input locations have to be taken into consideration
+ * and only those which are known at scheduling time. Note that if all input locations
+ * are considered, then the scheduling operation can potentially take a while until all
+ * inputs have locations assigned.
+ */
+public enum LocationPreferenceConstraint {
+	ALL, // wait for all inputs to have a location assigned
+	ANY // only consider those inputs who already have a location assigned
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 9b1ffbe..1995c12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -18,6 +18,26 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceDiedException;
+import org.apache.flink.runtime.instance.InstanceListener;
+import org.apache.flink.runtime.instance.SharedSlot;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotProvider;
+import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -32,31 +52,9 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.instance.SharedSlot;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceDiedException;
-import org.apache.flink.runtime.instance.InstanceListener;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.ExceptionUtils;
-
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * The scheduler is responsible for distributing the ready-to-run tasks among instances and slots.
  * 
@@ -135,31 +133,29 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 
 
 	@Override
-	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
-		Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = task.getTaskToExecute().getVertex().getPreferredLocationsBasedOnInputs();
+	public CompletableFuture<SimpleSlot> allocateSlot(
+			ScheduledUnit task,
+			boolean allowQueued,
+			Collection<TaskManagerLocation> preferredLocations) {
 
-		CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
+		try {
+			final Object ret = scheduleTask(task, allowQueued, preferredLocations);
 
-		return preferredLocationsFuture.thenCompose(
-			preferredLocations -> {
-				try {
-					final Object ret = scheduleTask(task, allowQueued, preferredLocations);
-
-					if (ret instanceof SimpleSlot) {
-						return CompletableFuture.completedFuture((SimpleSlot) ret);
-					} else if (ret instanceof CompletableFuture) {
-						@SuppressWarnings("unchecked")
-						CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
-						return typed;
-					} else {
-						// this should never happen, simply guard this case with an exception
-						throw new RuntimeException();
-					}
-				} catch (NoResourceAvailableException e) {
-					throw new CompletionException(e);
-				}
+			if (ret instanceof SimpleSlot) {
+				return CompletableFuture.completedFuture((SimpleSlot) ret);
+			}
+			else if (ret instanceof CompletableFuture) {
+				@SuppressWarnings("unchecked")
+				CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
+				return typed;
 			}
-		);
+			else {
+				// this should never happen, simply guard this case with an exception
+				throw new RuntimeException();
+			}
+		} catch (NoResourceAvailableException e) {
+			return FutureUtils.completedExceptionally(e);
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 5c80405..de91d78 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -46,14 +47,19 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
 import org.apache.flink.util.TestLogger;
 
@@ -67,14 +73,18 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests for {@link ExecutionGraph} deployment.
@@ -555,6 +565,103 @@ public class ExecutionGraphDeploymentTest extends TestLogger {
 			eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
 	}
 
+	/**
+	 * Tests that eager scheduling will wait until all input locations have been set before
+	 * scheduling a task.
+	 */
+	@Test
+	public void testEagerSchedulingWaitsOnAllInputPreferredLocations() throws Exception {
+		final int parallelism = 2;
+		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(parallelism);
+
+		final Time timeout = Time.hours(1L);
+		final JobVertexID sourceVertexId = new JobVertexID();
+		final JobVertex sourceVertex = new JobVertex("Test source", sourceVertexId);
+		sourceVertex.setInvokableClass(NoOpInvokable.class);
+		sourceVertex.setParallelism(parallelism);
+
+		final JobVertexID sinkVertexId = new JobVertexID();
+		final JobVertex sinkVertex = new JobVertex("Test sink", sinkVertexId);
+		sinkVertex.setInvokableClass(NoOpInvokable.class);
+		sinkVertex.setParallelism(parallelism);
+
+		sinkVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
+
+		final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> slotFutures = new HashMap<>(2);
+
+		for (JobVertexID jobVertexID : Arrays.asList(sourceVertexId, sinkVertexId)) {
+			CompletableFuture<SimpleSlot>[] slotFutureArray = new CompletableFuture[parallelism];
+
+			for (int i = 0; i < parallelism; i++) {
+				slotFutureArray[i] = new CompletableFuture<>();
+			}
+
+			slotFutures.put(jobVertexID, slotFutureArray);
+			slotProvider.addSlots(jobVertexID, slotFutureArray);
+		}
+
+		final ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(3);
+
+		final ExecutionGraph executionGraph = ExecutionGraphTestUtils.createExecutionGraph(
+			new JobID(),
+			slotProvider,
+			new NoRestartStrategy(),
+			scheduledExecutorService,
+			timeout,
+			sourceVertex,
+			sinkVertex);
+
+		executionGraph.setScheduleMode(ScheduleMode.EAGER);
+		executionGraph.scheduleForExecution();
+
+		// all tasks should be in state SCHEDULED
+		for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
+			assertEquals(ExecutionState.SCHEDULED, executionVertex.getCurrentExecutionAttempt().getState());
+		}
+
+		// wait until the source vertex slots have been requested
+		assertTrue(slotProvider.getSlotRequestedFuture(sourceVertexId, 0).get());
+		assertTrue(slotProvider.getSlotRequestedFuture(sourceVertexId, 1).get());
+
+		// check that the sinks have not requested their slots because they need the location
+		// information of the sources
+		assertFalse(slotProvider.getSlotRequestedFuture(sinkVertexId, 0).isDone());
+		assertFalse(slotProvider.getSlotRequestedFuture(sinkVertexId, 1).isDone());
+
+		final TaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
+
+		final SimpleSlot sourceSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0);
+
+		final SimpleSlot sourceSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1);
+
+		final SimpleSlot sinkSlot1 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 0);
+
+		final SimpleSlot sinkSlot2 = createSlot(executionGraph.getJobID(), localTaskManagerLocation, 1);
+
+		slotFutures.get(sourceVertexId)[0].complete(sourceSlot1);
+		slotFutures.get(sourceVertexId)[1].complete(sourceSlot2);
+
+		// wait until the sink vertex slots have been requested after we completed the source slots
+		assertTrue(slotProvider.getSlotRequestedFuture(sinkVertexId, 0).get());
+		assertTrue(slotProvider.getSlotRequestedFuture(sinkVertexId, 1).get());
+
+		slotFutures.get(sinkVertexId)[0].complete(sinkSlot1);
+		slotFutures.get(sinkVertexId)[1].complete(sinkSlot2);
+
+		for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
+			ExecutionGraphTestUtils.waitUntilExecutionState(executionVertex.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 5000L);
+		}
+	}
+
+	private SimpleSlot createSlot(JobID jobId, TaskManagerLocation taskManagerLocation, int index) {
+		return new SimpleSlot(
+			jobId,
+			mock(SlotOwner.class),
+			taskManagerLocation,
+			index,
+			new SimpleAckingTaskManagerGateway());
+	}
+
 	@SuppressWarnings("serial")
 	public static class FailingFinalizeJobVertex extends JobVertex {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index f4e8b30..d3cec30 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -53,6 +53,7 @@ import org.mockito.Matchers;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -115,7 +116,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
 			CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 			future.complete(simpleSlot);
-			when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
+			when(scheduler.allocateSlot(any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future);
 
 			when(rootSlot.getSlotNumber()).thenReturn(0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 69a679a..90136a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -306,7 +306,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 
 		for (int i = 0; i < parallelism; i += 2) {
 			sourceFutures[i].complete(sourceSlots[i]);
-			targetFutures[i + 1].complete(targetSlots[i + 1]);
+			targetFutures[i].complete(targetSlots[i]);
 		}
 
 		//
@@ -331,7 +331,7 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		// all completed futures must have been returns
 		for (int i = 0; i < parallelism; i += 2) {
 			assertTrue(sourceSlots[i].isCanceled());
-			assertTrue(targetSlots[i + 1].isCanceled());
+			assertTrue(targetSlots[i].isCanceled());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index 5feeabc..42a63ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -320,9 +320,21 @@ public class ExecutionGraphTestUtils {
 			ScheduledExecutorService executor,
 			JobVertex... vertices) throws Exception {
 
+			return createExecutionGraph(jid, slotProvider, restartStrategy, executor, Time.seconds(10L), vertices);
+	}
+
+	public static ExecutionGraph createExecutionGraph(
+			JobID jid,
+			SlotProvider slotProvider,
+			RestartStrategy restartStrategy,
+			ScheduledExecutorService executor,
+			Time timeout,
+			JobVertex... vertices) throws Exception {
+
 		checkNotNull(jid);
 		checkNotNull(restartStrategy);
 		checkNotNull(vertices);
+		checkNotNull(timeout);
 
 		return ExecutionGraphBuilder.buildGraph(
 			null,
@@ -333,7 +345,7 @@ public class ExecutionGraphTestUtils {
 			slotProvider,
 			ExecutionGraphTestUtils.class.getClassLoader(),
 			new StandaloneCheckpointRecoveryFactory(),
-			Time.seconds(10),
+			timeout,
 			restartStrategy,
 			new UnregisteredMetricsGroup(),
 			1,

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
new file mode 100644
index 0000000..fa845cf
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.Slot;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link Execution}.
+ */
+public class ExecutionTest extends TestLogger {
+
+	/**
+	 * Tests that slots are released if we cannot assign the allocated resource to the
+	 * Execution. In this case, a concurrent cancellation precedes the assignment.
+	 */
+	@Test
+	public void testSlotReleaseOnFailedResourceAssignment() throws Exception {
+		final JobVertexID jobVertexId = new JobVertexID();
+		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		final CompletableFuture<SimpleSlot> slotFuture = new CompletableFuture<>();
+		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
+		slotProvider.addSlot(jobVertexId, 0, slotFuture);
+
+		ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+			new JobID(),
+			slotProvider,
+			new NoRestartStrategy(),
+			jobVertex);
+
+		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+		final Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
+
+		final TestingSlotOwner slotOwner = new TestingSlotOwner();
+
+		final SimpleSlot slot = new SimpleSlot(
+			new JobID(),
+			slotOwner,
+			new LocalTaskManagerLocation(),
+			0,
+			new SimpleAckingTaskManagerGateway());
+
+		CompletableFuture<Execution> allocationFuture = execution.allocateAndAssignSlotForExecution(
+			slotProvider,
+			false,
+			LocationPreferenceConstraint.ALL);
+
+		assertFalse(allocationFuture.isDone());
+
+		assertEquals(ExecutionState.SCHEDULED, execution.getState());
+
+		// cancelling the execution should move it into state CANCELED; this happens before
+		// the slot future has been completed
+		execution.cancel();
+
+		assertEquals(ExecutionState.CANCELED, execution.getState());
+
+		// completing now the future should cause the slot to be released
+		slotFuture.complete(slot);
+
+		assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
+	}
+
+	/**
+	 * Tests that the slot is released in case of a execution cancellation when having
+	 * a slot assigned and being in state SCHEDULED.
+	 */
+	@Test
+	public void testSlotReleaseOnExecutionCancellationInScheduled() throws Exception {
+		final JobVertexID jobVertexId = new JobVertexID();
+		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		final TestingSlotOwner slotOwner = new TestingSlotOwner();
+
+		final SimpleSlot slot = new SimpleSlot(
+			new JobID(),
+			slotOwner,
+			new LocalTaskManagerLocation(),
+			0,
+			new SimpleAckingTaskManagerGateway());
+
+		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
+		slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
+
+		ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+			new JobID(),
+			slotProvider,
+			new NoRestartStrategy(),
+			jobVertex);
+
+		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+		final Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
+
+		CompletableFuture<Execution> allocationFuture = execution.allocateAndAssignSlotForExecution(
+			slotProvider,
+			false,
+			LocationPreferenceConstraint.ALL);
+
+		assertTrue(allocationFuture.isDone());
+
+		assertEquals(ExecutionState.SCHEDULED, execution.getState());
+
+		assertEquals(slot, execution.getAssignedResource());
+
+		// cancelling the execution should move it into state CANCELED
+		execution.cancel();
+		assertEquals(ExecutionState.CANCELED, execution.getState());
+
+		assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
+	}
+
+	/**
+	 * Tests that the slot is released in case of a execution cancellation when being in state
+	 * RUNNING.
+	 */
+	@Test
+	public void testSlotReleaseOnExecutionCancellationInRunning() throws Exception {
+		final JobVertexID jobVertexId = new JobVertexID();
+		final JobVertex jobVertex = new JobVertex("Test vertex", jobVertexId);
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+
+		final TestingSlotOwner slotOwner = new TestingSlotOwner();
+
+		final SimpleSlot slot = new SimpleSlot(
+			new JobID(),
+			slotOwner,
+			new LocalTaskManagerLocation(),
+			0,
+			new SimpleAckingTaskManagerGateway());
+
+		final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1);
+		slotProvider.addSlot(jobVertexId, 0, CompletableFuture.completedFuture(slot));
+
+		ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph(
+			new JobID(),
+			slotProvider,
+			new NoRestartStrategy(),
+			jobVertex);
+
+		ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(jobVertexId);
+
+		final Execution execution = executionJobVertex.getTaskVertices()[0].getCurrentExecutionAttempt();
+
+		CompletableFuture<Execution> allocationFuture = execution.allocateAndAssignSlotForExecution(
+			slotProvider,
+			false,
+			LocationPreferenceConstraint.ALL);
+
+		assertTrue(allocationFuture.isDone());
+
+		assertEquals(ExecutionState.SCHEDULED, execution.getState());
+
+		assertEquals(slot, execution.getAssignedResource());
+
+		execution.deploy();
+
+		execution.switchToRunning();
+
+		// cancelling the execution should move it into state CANCELING
+		execution.cancel();
+		assertEquals(ExecutionState.CANCELING, execution.getState());
+
+		execution.cancelingComplete();
+
+		assertEquals(slot, slotOwner.getReturnedSlotFuture().get());
+	}
+
+	/**
+	 * Tests that all preferred locations are calculated.
+	 */
+	@Test
+	public void testAllPreferredLocationCalculation() throws ExecutionException, InterruptedException {
+		final TaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation();
+		final TaskManagerLocation taskManagerLocation2 = new LocalTaskManagerLocation();
+		final TaskManagerLocation taskManagerLocation3 = new LocalTaskManagerLocation();
+
+		final CompletableFuture<TaskManagerLocation> locationFuture1 = CompletableFuture.completedFuture(taskManagerLocation1);
+		final CompletableFuture<TaskManagerLocation> locationFuture2 = new CompletableFuture<>();
+		final CompletableFuture<TaskManagerLocation> locationFuture3 = new CompletableFuture<>();
+
+		final Execution execution = SchedulerTestUtils.getTestVertex(Arrays.asList(locationFuture1, locationFuture2, locationFuture3));
+
+		CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = execution.calculatePreferredLocations(LocationPreferenceConstraint.ALL);
+
+		assertFalse(preferredLocationsFuture.isDone());
+
+		locationFuture3.complete(taskManagerLocation3);
+
+		assertFalse(preferredLocationsFuture.isDone());
+
+		locationFuture2.complete(taskManagerLocation2);
+
+		assertTrue(preferredLocationsFuture.isDone());
+
+		final Collection<TaskManagerLocation> preferredLocations = preferredLocationsFuture.get();
+
+		assertThat(preferredLocations, containsInAnyOrder(taskManagerLocation1, taskManagerLocation2, taskManagerLocation3));
+	}
+
+	/**
+	 * Tests that any preferred locations are calculated.
+	 */
+	@Test
+	public void testAnyPreferredLocationCalculation() throws ExecutionException, InterruptedException {
+		final TaskManagerLocation taskManagerLocation1 = new LocalTaskManagerLocation();
+		final TaskManagerLocation taskManagerLocation3 = new LocalTaskManagerLocation();
+
+		final CompletableFuture<TaskManagerLocation> locationFuture1 = CompletableFuture.completedFuture(taskManagerLocation1);
+		final CompletableFuture<TaskManagerLocation> locationFuture2 = new CompletableFuture<>();
+		final CompletableFuture<TaskManagerLocation> locationFuture3 = CompletableFuture.completedFuture(taskManagerLocation3);
+
+		final Execution execution = SchedulerTestUtils.getTestVertex(Arrays.asList(locationFuture1, locationFuture2, locationFuture3));
+
+		CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = execution.calculatePreferredLocations(LocationPreferenceConstraint.ANY);
+
+		assertTrue(preferredLocationsFuture.isDone());
+
+		final Collection<TaskManagerLocation> preferredLocations = preferredLocationsFuture.get();
+
+		assertThat(preferredLocations, containsInAnyOrder(taskManagerLocation1, taskManagerLocation3));
+	}
+
+	/**
+	 * Slot owner which records the first returned slot.
+	 */
+	public static final class TestingSlotOwner implements SlotOwner {
+
+		final CompletableFuture<Slot> returnedSlot = new CompletableFuture<>();
+
+		public CompletableFuture<Slot> getReturnedSlotFuture() {
+			return returnedSlot;
+		}
+
+		@Override
+		public boolean returnAllocatedSlot(Slot slot) {
+			return returnedSlot.complete(slot);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 9908dae..cb31d15 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -447,7 +448,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			// it can occur as the result of races
 			{
 				Scheduler scheduler = mock(Scheduler.class);
-				vertex.scheduleForExecution(scheduler, false);
+				vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL);
 
 				assertEquals(ExecutionState.CANCELED, vertex.getExecutionState());
 			}
@@ -486,7 +487,7 @@ public class ExecutionVertexCancelTest extends TestLogger {
 				setVertexState(vertex, ExecutionState.CANCELING);
 
 				Scheduler scheduler = mock(Scheduler.class);
-				vertex.scheduleForExecution(scheduler, false);
+				vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL);
 			}
 			catch (Exception e) {
 				fail("should not throw an exception");

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 26cb3f1..cf08687 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -35,6 +35,8 @@ import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import java.util.Collection;
@@ -52,7 +54,7 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class ExecutionVertexDeploymentTest {
+public class ExecutionVertexDeploymentTest extends TestLogger {
 
 	@Test
 	public void testDeployCall() {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
index 4eac4aa..27f7f51 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexSchedulingTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.instance.DummyActorGateway;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
@@ -32,6 +33,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 import org.mockito.Matchers;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
@@ -39,6 +41,7 @@ import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.ge
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.anyBoolean;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -62,11 +65,11 @@ public class ExecutionVertexSchedulingTest {
 			Scheduler scheduler = mock(Scheduler.class);
 			CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 			future.complete(slot);
-			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
+			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future);
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			// try to deploy to the slot
-			vertex.scheduleForExecution(scheduler, false);
+			vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL);
 
 			// will have failed
 			assertEquals(ExecutionState.FAILED, vertex.getExecutionState());
@@ -94,11 +97,11 @@ public class ExecutionVertexSchedulingTest {
 			final CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 
 			Scheduler scheduler = mock(Scheduler.class);
-			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
+			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future);
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 			// try to deploy to the slot
-			vertex.scheduleForExecution(scheduler, true);
+			vertex.scheduleForExecution(scheduler, true, LocationPreferenceConstraint.ALL);
 
 			// future has not yet a slot
 			assertEquals(ExecutionState.SCHEDULED, vertex.getExecutionState());
@@ -128,12 +131,12 @@ public class ExecutionVertexSchedulingTest {
 			Scheduler scheduler = mock(Scheduler.class);
 			CompletableFuture<SimpleSlot> future = new CompletableFuture<>();
 			future.complete(slot);
-			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean())).thenReturn(future);
+			when(scheduler.allocateSlot(Matchers.any(ScheduledUnit.class), anyBoolean(), any(Collection.class))).thenReturn(future);
 
 			assertEquals(ExecutionState.CREATED, vertex.getExecutionState());
 
 			// try to deploy to the slot
-			vertex.scheduleForExecution(scheduler, false);
+			vertex.scheduleForExecution(scheduler, false, LocationPreferenceConstraint.ALL);
 			assertEquals(ExecutionState.DEPLOYING, vertex.getExecutionState());
 		}
 		catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
index 4e89d43..f1e0f7c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java
@@ -38,6 +38,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -156,7 +157,7 @@ public class FailoverRegionTest extends TestLogger {
 
 		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
 
-		ev21.scheduleForExecution(slotProvider, true);
+		ev21.scheduleForExecution(slotProvider, true, LocationPreferenceConstraint.ALL);
 		ev21.getCurrentExecutionAttempt().fail(new Exception("New fail"));
 		assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev11).getState());
 		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
@@ -169,7 +170,7 @@ public class FailoverRegionTest extends TestLogger {
 
 		ev11.getCurrentExecutionAttempt().markFinished();
 		ev21.getCurrentExecutionAttempt().markFinished();
-		ev22.scheduleForExecution(slotProvider, true);
+		ev22.scheduleForExecution(slotProvider, true, LocationPreferenceConstraint.ALL);
 		ev22.getCurrentExecutionAttempt().markFinished();
 		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev11).getState());
 		assertEquals(JobStatus.RUNNING, strategy.getFailoverRegion(ev22).getState());
@@ -209,11 +210,11 @@ public class FailoverRegionTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that two failover regions failover at the same time, they will not influence each orther
+	 * Tests that two failover regions failover at the same time, they will not influence each other
 	 * @throws Exception
 	 */
 	@Test
-	public void testMutilRegionFailoverAtSameTime() throws Exception {
+	public void testMultiRegionFailoverAtSameTime() throws Exception {
 		Instance instance = ExecutionGraphTestUtils.getInstance(
 				new ActorTaskManagerGateway(
 						new SimpleActorGateway(TestingUtils.directExecutionContext())),

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
index fef6aaa..5d7fa1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ProgrammedSlotProvider.java
@@ -22,7 +22,9 @@ import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -38,6 +40,8 @@ class ProgrammedSlotProvider implements SlotProvider {
 
 	private final Map<JobVertexID, CompletableFuture<SimpleSlot>[]> slotFutures = new HashMap<>();
 
+	private final Map<JobVertexID, CompletableFuture<Boolean>[]> slotFutureRequested = new HashMap<>();
+
 	private final int parallelism;
 
 	public ProgrammedSlotProvider(int parallelism) {
@@ -51,14 +55,20 @@ class ProgrammedSlotProvider implements SlotProvider {
 		checkArgument(subtaskIndex >= 0 && subtaskIndex < parallelism);
 
 		CompletableFuture<SimpleSlot>[] futures = slotFutures.get(vertex);
+		CompletableFuture<Boolean>[] requestedFutures = slotFutureRequested.get(vertex);
+
 		if (futures == null) {
 			@SuppressWarnings("unchecked")
 			CompletableFuture<SimpleSlot>[] newArray = (CompletableFuture<SimpleSlot>[]) new CompletableFuture<?>[parallelism];
 			futures = newArray;
 			slotFutures.put(vertex, futures);
+
+			requestedFutures = new CompletableFuture[parallelism];
+			slotFutureRequested.put(vertex, requestedFutures);
 		}
 
 		futures[subtaskIndex] = future;
+		requestedFutures[subtaskIndex] = new CompletableFuture<>();
 	}
 
 	public void addSlots(JobVertexID vertex, CompletableFuture<SimpleSlot>[] futures) {
@@ -67,10 +77,25 @@ class ProgrammedSlotProvider implements SlotProvider {
 		checkArgument(futures.length == parallelism);
 
 		slotFutures.put(vertex, futures);
+
+		CompletableFuture<Boolean>[] requestedFutures = new CompletableFuture[futures.length];
+
+		for (int i = 0; i < futures.length; i++) {
+			requestedFutures[i] = new CompletableFuture<>();
+		}
+
+		slotFutureRequested.put(vertex, requestedFutures);
+	}
+
+	public CompletableFuture<Boolean> getSlotRequestedFuture(JobVertexID jobVertexId, int subtaskIndex) {
+		return slotFutureRequested.get(jobVertexId)[subtaskIndex];
 	}
 
 	@Override
-	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+	public CompletableFuture<SimpleSlot> allocateSlot(
+			ScheduledUnit task,
+			boolean allowQueued,
+			Collection<TaskManagerLocation> preferredLocations) {
 		JobVertexID vertexId = task.getTaskToExecute().getVertex().getJobvertexId();
 		int subtask = task.getTaskToExecute().getParallelSubtaskIndex();
 
@@ -78,6 +103,8 @@ class ProgrammedSlotProvider implements SlotProvider {
 		if (forTask != null) {
 			CompletableFuture<SimpleSlot> future = forTask[subtask];
 			if (future != null) {
+				slotFutureRequested.get(vertexId)[subtask].complete(true);
+
 				return future;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3b0fb26b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
index be5282a..a2323bf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/utils/SimpleSlotProvider.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.net.InetAddress;
 import java.util.ArrayDeque;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -70,7 +71,10 @@ public class SimpleSlotProvider implements SlotProvider, SlotOwner {
 	}
 
 	@Override
-	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+	public CompletableFuture<SimpleSlot> allocateSlot(
+			ScheduledUnit task,
+			boolean allowQueued,
+			Collection<TaskManagerLocation> preferredLocations) {
 		final AllocatedSlot slot;
 
 		synchronized (slots) {


Mime
View raw message