flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/4] flink git commit: [FLINK-7153] Re-introduce preferred locations for scheduling
Date Thu, 02 Nov 2017 17:01:12 GMT
Repository: flink
Updated Branches:
  refs/heads/master 8afadd459 -> 3ff91be1d


[FLINK-7153] Re-introduce preferred locations for scheduling


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c73b2fe1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c73b2fe1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c73b2fe1

Branch: refs/heads/master
Commit: c73b2fe1f93f9ff2f05eb9130051729320634448
Parents: 8afadd4
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Mon Oct 16 14:04:13 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Nov 2 17:04:44 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java | 162 ++++++++++++++-----
 .../ExecutionAndAllocationFuture.java           |  45 ++++++
 .../executiongraph/ExecutionAndSlot.java        |  47 ------
 .../runtime/executiongraph/ExecutionGraph.java  | 146 ++++++-----------
 .../executiongraph/ExecutionGraphUtils.java     | 106 ------------
 .../executiongraph/ExecutionJobVertex.java      |  32 +---
 .../runtime/executiongraph/ExecutionVertex.java |  51 +++---
 .../apache/flink/runtime/instance/SlotPool.java |   9 +-
 .../instance/SlotSharingGroupAssignment.java    |  20 +--
 .../runtime/jobmanager/scheduler/Scheduler.java |  50 +++---
 .../ExecutionGraphSchedulingTest.java           | 151 +----------------
 .../executiongraph/ExecutionGraphStopTest.java  |  15 +-
 .../executiongraph/ExecutionGraphTestUtils.java |  23 +--
 .../executiongraph/ExecutionGraphUtilsTest.java | 124 --------------
 .../ExecutionVertexCancelTest.java              |   8 +-
 .../ExecutionVertexLocalityTest.java            |  19 +--
 .../ScheduleWithCoLocationHintTest.java         |   3 +-
 .../scheduler/SchedulerSlotSharingTest.java     |   3 +-
 .../scheduler/SchedulerTestUtils.java           |  21 ++-
 19 files changed, 352 insertions(+), 683 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c1f423b..9d3e128 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -44,6 +44,8 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 
@@ -52,9 +54,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
@@ -126,9 +130,11 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	/** A future that completes once the Execution reaches a terminal ExecutionState */
 	private final CompletableFuture<ExecutionState> terminationFuture;
 
+	private final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture;
+
 	private volatile ExecutionState state = CREATED;
 
-	private volatile SimpleSlot assignedResource;     // once assigned, never changes until the execution is archived
+	private final AtomicReference<SimpleSlot> assignedResource;
 
 	private volatile Throwable failureCause;          // once assigned, never changes
 
@@ -185,6 +191,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 		this.partialInputChannelDeploymentDescriptors = new ConcurrentLinkedQueue<>();
 		this.terminationFuture = new CompletableFuture<>();
+		this.taskManagerLocationFuture = new CompletableFuture<>();
+
+		this.assignedResource = new AtomicReference<>();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -220,14 +229,53 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		return globalModVersion;
 	}
 
+	public CompletableFuture<TaskManagerLocation> getTaskManagerLocationFuture() {
+		return taskManagerLocationFuture;
+	}
+
 	public SimpleSlot getAssignedResource() {
-		return assignedResource;
+		return assignedResource.get();
+	}
+
+	/**
+	 * Tries to assign the given slot to the execution. The assignment works only if the
+	 * Execution is in state SCHEDULED. Returns true, if the resource could be assigned.
+	 *
+	 * @param slot to assign to this execution
+	 * @return true if the slot could be assigned to the execution, otherwise false
+	 */
+	boolean tryAssignResource(final SimpleSlot slot) {
+		Preconditions.checkNotNull(slot);
+
+		// only allow to set the assigned resource in state SCHEDULED or CREATED
+		// note: we also accept resource assignment when being in state CREATED for testing purposes
+		if (state == SCHEDULED || state == CREATED) {
+			if (assignedResource.compareAndSet(null, slot)) {
+				// check for concurrent modification (e.g. cancelling call)
+				if (state == SCHEDULED || state == CREATED) {
+					Preconditions.checkState(!taskManagerLocationFuture.isDone(), "The TaskManagerLocationFuture should not be set if we haven't assigned a resource yet.");
+					taskManagerLocationFuture.complete(slot.getTaskManagerLocation());
+
+					return true;
+				} else {
+					// free assigned resource and return false
+					assignedResource.set(null);
+					return false;
+				}
+			} else {
+				// the slot already has another slot assigned
+				return false;
+			}
+		} else {
+			// do not allow resource assignment if we are not in state SCHEDULED
+			return false;
+		}
 	}
 
 	@Override
 	public TaskManagerLocation getAssignedResourceLocation() {
 		// returns non-null only when a location is already assigned
-		return assignedResource != null ? assignedResource.getTaskManagerLocation() : null;
+		return assignedResource.get() != null ? assignedResource.get().getTaskManagerLocation() : null;
 	}
 
 	public Throwable getFailureCause() {
@@ -301,27 +349,23 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 */
 	public boolean scheduleForExecution(SlotProvider slotProvider, boolean queued) {
 		try {
-			final CompletableFuture<SimpleSlot> slotAllocationFuture = allocateSlotForExecution(slotProvider, queued);
+			final CompletableFuture<Execution> allocationFuture = allocateAndAssignSlotForExecution(slotProvider, queued);
 
 			// IMPORTANT: We have to use the synchronous handle operation (direct executor) here so
 			// that we directly deploy the tasks if the slot allocation future is completed. This is
 			// necessary for immediate deployment.
-			final CompletableFuture<Void> deploymentFuture = slotAllocationFuture.handle(
-				(simpleSlot, throwable) ->  {
-					if (simpleSlot != null) {
+			final CompletableFuture<Void> deploymentFuture = allocationFuture.handle(
+				(Execution ignored, Throwable throwable) ->  {
+					if (throwable != null) {
+						markFailed(ExceptionUtils.stripCompletionException(throwable));
+					}
+					else {
 						try {
-							deployToSlot(simpleSlot);
+							deploy();
 						} catch (Throwable t) {
-							try {
-								simpleSlot.releaseSlot();
-							} finally {
-								markFailed(t);
-							}
+							markFailed(ExceptionUtils.stripCompletionException(t));
 						}
 					}
-					else {
-						markFailed(throwable);
-					}
 					return null;
 				}
 			);
@@ -338,8 +382,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
-	public CompletableFuture<SimpleSlot> allocateSlotForExecution(SlotProvider slotProvider, boolean queued)
-			throws IllegalExecutionStateException {
+	/**
+	 * Allocates and assigns a slot obtained from the slot provider to the execution.
+	 *
+	 * @param slotProvider to obtain a new slot from
+	 * @param queued if the allocation can be queued
+	 * @return Future which is completed with this execution once the slot has been assigned
+	 * 			or with an exception if an error occurred.
+	 * @throws IllegalExecutionStateException if this method has been called while not being in the CREATED state
+	 */
+	public CompletableFuture<Execution> allocateAndAssignSlotForExecution(SlotProvider slotProvider, boolean queued) throws IllegalExecutionStateException {
 
 		checkNotNull(slotProvider);
 
@@ -359,7 +411,18 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					new ScheduledUnit(this, sharingGroup) :
 					new ScheduledUnit(this, sharingGroup, locationConstraint);
 
-			return slotProvider.allocateSlot(toSchedule, queued);
+			CompletableFuture<SimpleSlot> slotFuture = slotProvider.allocateSlot(toSchedule, queued);
+
+			return slotFuture.thenApply(slot -> {
+				if (tryAssignResource(slot)) {
+					return this;
+				} else {
+					// release the slot
+					slot.releaseSlot();
+
+					throw new CompletionException(new FlinkException("Could not assign slot " + slot + " to execution " + this + " because it has already been assigned "));
+				}
+			});
 		}
 		else {
 			// call race, already deployed, or already done
@@ -367,8 +430,15 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 		}
 	}
 
-	public void deployToSlot(final SimpleSlot slot) throws JobException {
-		checkNotNull(slot);
+	/**
+	 * Deploys the execution to the previously assigned resource.
+	 *
+	 * @throws JobException if the execution cannot be deployed to the assigned resource
+	 */
+	public void deploy() throws JobException {
+		final SimpleSlot slot  = assignedResource.get();
+
+		checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
 
 		// Check if the TaskManager died in the meantime
 		// This only speeds up the response to TaskManagers failing concurrently to deployments.
@@ -397,7 +467,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			if (!slot.setExecutedVertex(this)) {
 				throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
 			}
-			this.assignedResource = slot;
 
 			// race double check, did we fail/cancel and do we need to release the slot?
 			if (this.state != DEPLOYING) {
@@ -447,7 +516,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * Sends stop RPC call.
 	 */
 	public void stop() {
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -504,10 +573,16 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					// we skip the canceling state. set the timestamp, for a consistent appearance
 					markTimestamp(CANCELING, getStateTimestamp(CANCELED));
 
+					// cancel the future in order to fail depending scheduling operations
+					taskManagerLocationFuture.cancel(false);
+
 					try {
 						vertex.getExecutionGraph().deregisterExecution(this);
-						if (assignedResource != null) {
-							assignedResource.releaseSlot();
+
+						final SimpleSlot slot = assignedResource.get();
+
+						if (slot != null) {
+							slot.releaseSlot();
 						}
 					}
 					finally {
@@ -673,7 +748,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 			int maxStrackTraceDepth,
 			Time timeout) {
 
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -697,7 +772,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param timestamp of the completed checkpoint
 	 */
 	public void notifyCheckpointComplete(long checkpointId, long timestamp) {
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -717,7 +792,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * @param checkpointOptions of the checkpoint to trigger
 	 */
 	public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -775,7 +850,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 						updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
-						assignedResource.releaseSlot();
+						final SimpleSlot slot = assignedResource.get();
+
+						if (slot != null) {
+							slot.releaseSlot();
+						}
+
 						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
@@ -828,7 +908,12 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 
 				if (transitionState(current, CANCELED)) {
 					try {
-						assignedResource.releaseSlot();
+						final SimpleSlot slot = assignedResource.get();
+
+						if (slot != null) {
+							slot.releaseSlot();
+						}
+
 						vertex.getExecutionGraph().deregisterExecution(this);
 					}
 					finally {
@@ -920,8 +1005,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 				updateAccumulatorsAndMetrics(userAccumulators, metrics);
 
 				try {
-					if (assignedResource != null) {
-						assignedResource.releaseSlot();
+					final SimpleSlot slot = assignedResource.get();
+					if (slot != null) {
+						slot.releaseSlot();
 					}
 					vertex.getExecutionGraph().deregisterExecution(this);
 				}
@@ -936,7 +1022,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 					}
 
 					try {
-						if (assignedResource != null) {
+						if (assignedResource.get() != null) {
 							sendCancelRpcCall();
 						}
 					} catch (Throwable tt) {
@@ -1003,7 +1089,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 * The sending is tried up to NUM_CANCEL_CALL_TRIES times.
 	 */
 	private void sendCancelRpcCall() {
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1024,7 +1110,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	}
 
 	private void sendFailIntermediateResultPartitionsRpcCall() {
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1042,7 +1128,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	private void sendUpdatePartitionInfoRpcCall(
 			final Iterable<PartitionInfo> partitionInfos) {
 
-		final SimpleSlot slot = assignedResource;
+		final SimpleSlot slot = assignedResource.get();
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
@@ -1162,8 +1248,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	
 	@Override
 	public String toString() {
+		final SimpleSlot slot = assignedResource.get();
+
 		return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(),
-				(assignedResource == null ? "(unassigned)" : assignedResource.toString()), state);
+				(slot == null ? "(unassigned)" : slot), state);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java
new file mode 100644
index 0000000..1022dbc
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndAllocationFuture.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A pair of an {@link Execution} together with an allocation future.
+ */
+public class ExecutionAndAllocationFuture {
+
+	public final Execution executionAttempt;
+
+	public final CompletableFuture<Void> allocationFuture;
+
+	public ExecutionAndAllocationFuture(Execution executionAttempt, CompletableFuture<Void> allocationFuture) {
+		this.executionAttempt = checkNotNull(executionAttempt);
+		this.allocationFuture = checkNotNull(allocationFuture);
+	}
+
+	// -----------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return super.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
deleted file mode 100644
index 123ff0c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionAndSlot.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.instance.SimpleSlot;
-
-import java.util.concurrent.CompletableFuture;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A pair of an {@link Execution} together with a slot future.
- */
-public class ExecutionAndSlot {
-
-	public final Execution executionAttempt;
-
-	public final CompletableFuture<SimpleSlot> slotFuture;
-
-	public ExecutionAndSlot(Execution executionAttempt, CompletableFuture<SimpleSlot> slotFuture) {
-		this.executionAttempt = checkNotNull(executionAttempt);
-		this.slotFuture = checkNotNull(slotFuture);
-	}
-
-	// -----------------------------------------------------------------------
-
-	@Override
-	public String toString() {
-		return super.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index dca6c44..62c6e99 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -50,7 +50,6 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -90,7 +89,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
@@ -878,113 +876,67 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
 		// that way we do not have any operation that can fail between allocating the slots
 		// and adding them to the list. If we had a failure in between there, that would
 		// cause the slots to get lost
-		final ArrayList<ExecutionAndSlot[]> resources = new ArrayList<>(getNumberOfExecutionJobVertices());
 		final boolean queued = allowQueuedScheduling;
 
-		// we use this flag to handle failures in a 'finally' clause
-		// that allows us to not go through clumsy cast-and-rethrow logic
-		boolean successful = false;
+		// collecting all the slots may resize and fail in that operation without slots getting lost
+		final ArrayList<CompletableFuture<Execution>> allAllocationFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
 
-		try {
-			// collecting all the slots may resize and fail in that operation without slots getting lost
-			final ArrayList<CompletableFuture<SimpleSlot>> slotFutures = new ArrayList<>(getNumberOfExecutionJobVertices());
+		// allocate the slots (obtain all their futures
+		for (ExecutionJobVertex ejv : getVerticesTopologically()) {
+			// these calls are not blocking, they only return futures
+			Collection<CompletableFuture<Execution>> allocationFutures = ejv.allocateResourcesForAll(slotProvider, queued);
 
-			// allocate the slots (obtain all their futures
-			for (ExecutionJobVertex ejv : getVerticesTopologically()) {
-				// these calls are not blocking, they only return futures
-				ExecutionAndSlot[] slots = ejv.allocateResourcesForAll(slotProvider, queued);
+			allAllocationFutures.addAll(allocationFutures);
+		}
 
-				// we need to first add the slots to this list, to be safe on release
-				resources.add(slots);
+		// this future is complete once all slot futures are complete.
+		// the future fails once one slot future fails.
+		final ConjunctFuture<Collection<Execution>> allAllocationsComplete = FutureUtils.combineAll(allAllocationFutures);
 
-				for (ExecutionAndSlot ens : slots) {
-					slotFutures.add(ens.slotFuture);
-				}
+		// make sure that we fail if the allocation timeout was exceeded
+		final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() {
+			@Override
+			public void run() {
+				// When the timeout triggers, we try to complete the conjunct future with an exception.
+				// Note that this is a no-op if the future is already completed
+				int numTotal = allAllocationsComplete.getNumFuturesTotal();
+				int numComplete = allAllocationsComplete.getNumFuturesCompleted();
+				String message = "Could not allocate all requires slots within timeout of " +
+						timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete;
+
+				allAllocationsComplete.completeExceptionally(new NoResourceAvailableException(message));
 			}
+		}, timeout.getSize(), timeout.getUnit());
 
-			// this future is complete once all slot futures are complete.
-			// the future fails once one slot future fails.
-			final ConjunctFuture<Void> allAllocationsComplete = FutureUtils.waitForAll(slotFutures);
-
-			// make sure that we fail if the allocation timeout was exceeded
-			final ScheduledFuture<?> timeoutCancelHandle = futureExecutor.schedule(new Runnable() {
-				@Override
-				public void run() {
-					// When the timeout triggers, we try to complete the conjunct future with an exception.
-					// Note that this is a no-op if the future is already completed
-					int numTotal = allAllocationsComplete.getNumFuturesTotal();
-					int numComplete = allAllocationsComplete.getNumFuturesCompleted();
-					String message = "Could not allocate all requires slots within timeout of " +
-							timeout + ". Slots required: " + numTotal + ", slots allocated: " + numComplete;
-
-					allAllocationsComplete.completeExceptionally(new NoResourceAvailableException(message));
-				}
-			}, timeout.getSize(), timeout.getUnit());
-
-
-			allAllocationsComplete.handleAsync(
-				(Void slots, Throwable throwable) -> {
-					try {
-						// we do not need the cancellation timeout any more
-						timeoutCancelHandle.cancel(false);
-
-						if (throwable == null) {
-							// successfully obtained all slots, now deploy
-
-							for (ExecutionAndSlot[] jobVertexTasks : resources) {
-								for (ExecutionAndSlot execAndSlot : jobVertexTasks) {
-
-									// the futures must all be ready - this is simply a sanity check
-									final SimpleSlot slot;
-									try {
-										slot = execAndSlot.slotFuture.getNow(null);
-										checkNotNull(slot);
-									}
-									catch (CompletionException | NullPointerException e) {
-										throw new IllegalStateException("SlotFuture is incomplete " +
-												"or erroneous even though all futures completed", e);
-									}
-
-									// actual deployment
-									execAndSlot.executionAttempt.deployToSlot(slot);
-								}
-							}
-						}
-						else {
-							// let the exception handler deal with this
-							throw throwable;
-						}
-					}
-					catch (Throwable t) {
-						// we catch everything here to make sure cleanup happens and the
-						// ExecutionGraph notices the error
 
-						// we need to to release all slots before going into recovery!
-						try {
-							ExecutionGraphUtils.releaseAllSlotsSilently(resources);
-						}
-						finally {
-							failGlobal(t);
+		allAllocationsComplete.handleAsync(
+			(Collection<Execution> executions, Throwable throwable) -> {
+				try {
+					// we do not need the cancellation timeout any more
+					timeoutCancelHandle.cancel(false);
+
+					if (throwable == null) {
+						// successfully obtained all slots, now deploy
+						for (Execution execution : executions) {
+							execution.deploy();
 						}
 					}
+					else {
+						// let the exception handler deal with this
+						throw throwable;
+					}
+				}
+				catch (Throwable t) {
+					// we catch everything here to make sure cleanup happens and the
+					// ExecutionGraph notices the error
+					failGlobal(ExceptionUtils.stripCompletionException(t));
+				}
 
-					// Wouldn't it be nice if we could return an actual Void object?
-					// return (Void) Unsafe.getUnsafe().allocateInstance(Void.class);
-					return null;
-				},
-				futureExecutor);
-
-			// from now on, slots will be rescued by the futures and their completion, or by the timeout
-			successful = true;
-		}
-		finally {
-			if (!successful) {
-				// we come here only if the 'try' block finished with an exception
-				// we release the slots (possibly failing some executions on the way) and
-				// let the exception bubble up
-				ExecutionGraphUtils.releaseAllSlotsSilently(resources);
-			}
-		}
+				// Wouldn't it be nice if we could return an actual Void object?
+				// return (Void) Unsafe.getUnsafe().allocateInstance(Void.class);
+				return null;
+			},
+			futureExecutor);
 	}
 
 	public void cancel() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
deleted file mode 100644
index f1d793d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtils.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.util.ExceptionUtils;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.BiFunction;
-
-/**
- * Utilities for dealing with the execution graphs and scheduling.
- */
-public class ExecutionGraphUtils {
-
-	/**
-	 * Releases the slot represented by the given future. If the future is complete, the
-	 * slot is immediately released. Otherwise, the slot is released as soon as the future
-	 * is completed.
-	 * 
-	 * <p>Note that releasing the slot means cancelling any task execution currently
-	 * associated with that slot.
-	 * 
-	 * @param slotFuture The future for the slot to release.
-	 */
-	public static void releaseSlotFuture(CompletableFuture<SimpleSlot> slotFuture) {
-		slotFuture.handle(ReleaseSlotFunction.INSTANCE::apply);
-	}
-
-	/**
-	 * Releases the all the slots in the list of arrays of {@code ExecutionAndSlot}.
-	 * For each future in that collection holds: If the future is complete, its slot is
-	 * immediately released. Otherwise, the slot is released as soon as the future
-	 * is completed.
-	 * 
-	 * <p>This methods never throws any exceptions (except for fatal exceptions) and continues
-	 * to release the remaining slots if one slot release failed.
-	 *
-	 * <p>Note that releasing the slot means cancelling any task execution currently
-	 * associated with that slot.
-	 * 
-	 * @param resources The collection of ExecutionAndSlot whose slots should be released.
-	 */
-	public static void releaseAllSlotsSilently(List<ExecutionAndSlot[]> resources) {
-		try {
-			for (ExecutionAndSlot[] jobVertexResources : resources) {
-				if (jobVertexResources != null) {
-					for (ExecutionAndSlot execAndSlot : jobVertexResources) {
-						if (execAndSlot != null) {
-							try {
-								releaseSlotFuture(execAndSlot.slotFuture);
-							}
-							catch (Throwable t) {
-								ExceptionUtils.rethrowIfFatalError(t);
-							}
-						}
-					}
-				}
-			}
-		}
-		catch (Throwable t) {
-			ExceptionUtils.rethrowIfFatalError(t);
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * A function to be applied into a future, releasing the slot immediately upon completion.
-	 * Completion here refers to both the successful and exceptional completion.
-	 */
-	private static final class ReleaseSlotFunction implements BiFunction<SimpleSlot, Throwable, Void> {
-
-		static final ReleaseSlotFunction INSTANCE = new ReleaseSlotFunction();
-
-		@Override
-		public Void apply(SimpleSlot simpleSlot, Throwable throwable) {
-			if (simpleSlot != null) {
-				simpleSlot.releaseSlot();
-			}
-			return null;
-		}
-	}
-
-	// ------------------------------------------------------------------------
-
-	/** Utility class is not meant to be instantiated */
-	private ExecutionGraphUtils() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 90224b0..98191d0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.blob.PermanentBlobKey;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -57,6 +56,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -475,37 +475,21 @@ public class ExecutionJobVertex implements AccessExecutionJobVertex, Archiveable
 	 * 
 	 * @param resourceProvider The resource provider from whom the slots are requested.
 	 */
-	public ExecutionAndSlot[] allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) {
+	public Collection<CompletableFuture<Execution>> allocateResourcesForAll(SlotProvider resourceProvider, boolean queued) {
 		final ExecutionVertex[] vertices = this.taskVertices;
-		final ExecutionAndSlot[] slots = new ExecutionAndSlot[vertices.length];
+		final CompletableFuture<Execution>[] slots = new CompletableFuture[vertices.length];
 
 		// try to acquire a slot future for each execution.
 		// we store the execution with the future just to be on the safe side
 		for (int i = 0; i < vertices.length; i++) {
-
-			// we use this flag to handle failures in a 'finally' clause
-			// that allows us to not go through clumsy cast-and-rethrow logic
-			boolean successful = false;
-
-			try {
-				// allocate the next slot (future)
-				final Execution exec = vertices[i].getCurrentExecutionAttempt();
-				final CompletableFuture<SimpleSlot> future = exec.allocateSlotForExecution(resourceProvider, queued);
-				slots[i] = new ExecutionAndSlot(exec, future);
-				successful = true;
-			}
-			finally {
-				if (!successful) {
-					// this is the case if an exception was thrown
-					for (int k = 0; k < i; k++) {
-						ExecutionGraphUtils.releaseSlotFuture(slots[k].slotFuture);
-					}
-				}
-			}
+			// allocate the next slot (future)
+			final Execution exec = vertices[i].getCurrentExecutionAttempt();
+			final CompletableFuture<Execution> allocationFuture = exec.allocateAndAssignSlotForExecution(resourceProvider, queued);
+			slots[i] = allocationFuture;
 		}
 
 		// all good, we acquired all slots
-		return slots;
+		return Arrays.asList(slots);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 6b9d481..e87a5a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -55,6 +55,7 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -266,6 +267,10 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		return currentExecution.getFailureCause();
 	}
 
+	public CompletableFuture<TaskManagerLocation> getCurrentTaskManagerLocationFuture() {
+		return currentExecution.getTaskManagerLocationFuture();
+	}
+
 	public SimpleSlot getCurrentAssignedResource() {
 		return currentExecution.getAssignedResource();
 	}
@@ -445,8 +450,8 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	 * @see #getPreferredLocationsBasedOnState()
 	 * @see #getPreferredLocationsBasedOnInputs() 
 	 */
-	public Iterable<TaskManagerLocation> getPreferredLocations() {
-		Iterable<TaskManagerLocation> basedOnState = getPreferredLocationsBasedOnState();
+	public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocations() {
+		Collection<CompletableFuture<TaskManagerLocation>> basedOnState = getPreferredLocationsBasedOnState();
 		return basedOnState != null ? basedOnState : getPreferredLocationsBasedOnInputs();
 	}
 	
@@ -454,13 +459,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	 * Gets the preferred location to execute the current task execution attempt, based on the state
 	 * that the execution attempt will resume.
 	 * 
-	 * @return A size-one iterable with the location preference, or null, if there is no
+	 * @return A size-one collection with the location preference, or null, if there is no
 	 *         location preference based on the state.
 	 */
-	public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnState() {
+	public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnState() {
 		TaskManagerLocation priorLocation;
 		if (currentExecution.getTaskStateSnapshot() != null && (priorLocation = getLatestPriorLocation()) != null) {
-			return Collections.singleton(priorLocation);
+			return Collections.singleton(CompletableFuture.completedFuture(priorLocation));
 		}
 		else {
 			return null;
@@ -476,14 +481,13 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 	 * @return The preferred locations based in input streams, or an empty iterable,
 	 *         if there is no input-based preference.
 	 */
-	public Iterable<TaskManagerLocation> getPreferredLocationsBasedOnInputs() {
+	public Collection<CompletableFuture<TaskManagerLocation>> getPreferredLocationsBasedOnInputs() {
 		// otherwise, base the preferred locations on the input connections
 		if (inputEdges == null) {
 			return Collections.emptySet();
 		}
 		else {
-			Set<TaskManagerLocation> locations = new HashSet<>();
-			Set<TaskManagerLocation> inputLocations = new HashSet<>();
+			Set<CompletableFuture<TaskManagerLocation>> inputLocations = new HashSet<>(4);
 
 			// go over all inputs
 			for (int i = 0; i < inputEdges.length; i++) {
@@ -493,28 +497,17 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 					// go over all input sources
 					for (int k = 0; k < sources.length; k++) {
 						// look-up assigned slot of input source
-						SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
-						if (sourceSlot != null) {
-							// add input location
-							inputLocations.add(sourceSlot.getTaskManagerLocation());
-							// inputs which have too many distinct sources are not considered
-							if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
-								inputLocations.clear();
-								break;
-							}
+						CompletableFuture<TaskManagerLocation> taskManagerLocationFuture = sources[k].getSource().getProducer().getCurrentTaskManagerLocationFuture();
+						inputLocations.add(taskManagerLocationFuture);
+
+						if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
+							return Collections.emptyList();
 						}
 					}
 				}
-				// keep the locations of the input with the least preferred locations
-				if (locations.isEmpty() || // nothing assigned yet
-						(!inputLocations.isEmpty() && inputLocations.size() < locations.size())) {
-					// current input has fewer preferred locations
-					locations.clear();
-					locations.addAll(inputLocations);
-				}
 			}
 
-			return locations.isEmpty() ? Collections.<TaskManagerLocation>emptyList() : locations;
+			return inputLocations.isEmpty() ? Collections.emptyList() : inputLocations;
 		}
 	}
 
@@ -598,8 +591,14 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi
 		return this.currentExecution.scheduleForExecution(slotProvider, queued);
 	}
 
+	@VisibleForTesting
 	public void deployToSlot(SimpleSlot slot) throws JobException {
-		this.currentExecution.deployToSlot(slot);
+		if (this.currentExecution.tryAssignResource(slot)) {
+			this.currentExecution.deploy();
+		} else {
+			throw new IllegalStateException("Could not assign resource " + slot + " to current execution " +
+				currentExecution + '.');
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 6397043..fcf7d40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -1003,10 +1003,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 		@Override
 		public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
-			Iterable<TaskManagerLocation> locationPreferences = 
-					task.getTaskToExecute().getVertex().getPreferredLocations();
+			Collection<CompletableFuture<TaskManagerLocation>> locationPreferenceFutures =
+				task.getTaskToExecute().getVertex().getPreferredLocations();
 
-			return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout);
+			CompletableFuture<Collection<TaskManagerLocation>> locationPreferencesFuture = FutureUtils.combineAll(locationPreferenceFutures);
+
+			return locationPreferencesFuture.thenCompose(
+				locationPreferences -> gateway.allocateSlot(task, ResourceProfile.UNKNOWN, locationPreferences, timeout));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 88fbc10..a071e50 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -266,18 +266,12 @@ public class SlotSharingGroupAssignment {
 	 * slots if no local slot is available. The method returns null, when this sharing group has
 	 * no slot is available for the given JobVertexID. 
 	 *
-	 * @param vertex The vertex to allocate a slot for.
+	 * @param vertexID the vertex id
+	 * @param locationPreferences location preferences
 	 *
 	 * @return A slot to execute the given ExecutionVertex in, or null, if none is available.
 	 */
-	public SimpleSlot getSlotForTask(ExecutionVertex vertex) {
-		return getSlotForTask(vertex.getJobvertexId(), vertex.getPreferredLocationsBasedOnInputs());
-	}
-
-	/**
-	 * 
-	 */
-	SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) {
+	public SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) {
 		synchronized (lock) {
 			Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(vertexID, locationPreferences, false);
 
@@ -306,17 +300,13 @@ public class SlotSharingGroupAssignment {
 	 * shared slot and returns it. If no suitable shared slot could be found, this method
 	 * returns null.</p>
 	 * 
-	 * @param vertex The execution vertex to find a slot for.
 	 * @param constraint The co-location constraint for the placement of the execution vertex.
+	 * @param locationPreferences location preferences
 	 * 
 	 * @return A simple slot allocate within a suitable shared slot, or {@code null}, if no suitable
 	 *         shared slot is available.
 	 */
-	public SimpleSlot getSlotForTask(ExecutionVertex vertex, CoLocationConstraint constraint) {
-		return getSlotForTask(constraint, vertex.getPreferredLocationsBasedOnInputs());
-	}
-	
-	SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<TaskManagerLocation> locationPreferences) {
+	public SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<TaskManagerLocation> locationPreferences) {
 		synchronized (lock) {
 			if (constraint.isAssignedAndAlive()) {
 				// the shared slot of the co-location group is initialized and set we allocate a sub-slot

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 5a7e819..9b1ffbe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -31,6 +32,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -134,31 +136,36 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 
 	@Override
 	public CompletableFuture<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
-		try {
-			final Object ret = scheduleTask(task, allowQueued);
+		Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = task.getTaskToExecute().getVertex().getPreferredLocationsBasedOnInputs();
 
-			if (ret instanceof SimpleSlot) {
-				return CompletableFuture.completedFuture((SimpleSlot) ret);
-			}
-			else if (ret instanceof CompletableFuture) {
-				@SuppressWarnings("unchecked")
-				CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
-				return typed;
-			}
-			else {
-				// this should never happen, simply guard this case with an exception
-				throw new RuntimeException();
+		CompletableFuture<Collection<TaskManagerLocation>> preferredLocationsFuture = FutureUtils.combineAll(preferredLocationFutures);
+
+		return preferredLocationsFuture.thenCompose(
+			preferredLocations -> {
+				try {
+					final Object ret = scheduleTask(task, allowQueued, preferredLocations);
+
+					if (ret instanceof SimpleSlot) {
+						return CompletableFuture.completedFuture((SimpleSlot) ret);
+					} else if (ret instanceof CompletableFuture) {
+						@SuppressWarnings("unchecked")
+						CompletableFuture<SimpleSlot> typed = (CompletableFuture<SimpleSlot>) ret;
+						return typed;
+					} else {
+						// this should never happen, simply guard this case with an exception
+						throw new RuntimeException();
+					}
+				} catch (NoResourceAvailableException e) {
+					throw new CompletionException(e);
+				}
 			}
-		}
-		catch (NoResourceAvailableException e) {
-			return FutureUtils.completedExceptionally(e);
-		}
+		);
 	}
 
 	/**
 	 * Returns either a {@link SimpleSlot}, or a {@link CompletableFuture}.
 	 */
-	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource) throws NoResourceAvailableException {
+	private Object scheduleTask(ScheduledUnit task, boolean queueIfNoResource, Iterable<TaskManagerLocation> preferredLocations) throws NoResourceAvailableException {
 		if (task == null) {
 			throw new NullPointerException();
 		}
@@ -168,7 +175,6 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 
 		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 		
-		final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocationsBasedOnInputs();
 		final boolean forceExternalLocation = false &&
 									preferredLocations != null && preferredLocations.iterator().hasNext();
 	
@@ -197,10 +203,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 				// get a slot from the group, if the group has one for us (and can fulfill the constraint)
 				final SimpleSlot slotFromGroup;
 				if (constraint == null) {
-					slotFromGroup = assignment.getSlotForTask(vertex);
+					slotFromGroup = assignment.getSlotForTask(vertex.getJobvertexId(), preferredLocations);
 				}
 				else {
-					slotFromGroup = assignment.getSlotForTask(vertex, constraint);
+					slotFromGroup = assignment.getSlotForTask(constraint, preferredLocations);
 				}
 
 				SimpleSlot newSlot = null;
@@ -234,7 +240,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl
 						localOnly = true;
 					}
 					else {
-						locations = vertex.getPreferredLocationsBasedOnInputs();
+						locations = preferredLocations;
 						localOnly = forceExternalLocation;
 					}
 					

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index b90c306..69a679a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -38,7 +38,6 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -49,21 +48,22 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.After;
 import org.junit.Test;
-
-import org.mockito.invocation.InvocationOnMock;
 import org.mockito.verification.Timeout;
 
 import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the scheduling of the execution graph. This tests that
@@ -399,141 +399,6 @@ public class ExecutionGraphSchedulingTest extends TestLogger {
 		}
 	}
 
-	/**
-	 * Tests that the {@link ExecutionJobVertex#allocateResourcesForAll(SlotProvider, boolean)} method
-	 * releases partially acquired resources upon exception.
-	 */
-	@Test
-	public void testExecutionJobVertexAllocateResourcesReleasesOnException() throws Exception {
-		final int parallelism = 8;
-
-		final JobVertex vertex = new JobVertex("vertex");
-		vertex.setParallelism(parallelism);
-		vertex.setInvokableClass(NoOpInvokable.class);
-
-		final JobID jobId = new JobID();
-		final JobGraph jobGraph = new JobGraph(jobId, "test", vertex);
-
-		// set up some available slots and some slot owner that accepts released slots back
-		final List<SimpleSlot> returnedSlots = new ArrayList<>();
-		final SlotOwner recycler = new SlotOwner() {
-			@Override
-			public boolean returnAllocatedSlot(Slot slot) {
-				returnedSlots.add((SimpleSlot) slot);
-				return true;
-			}
-		};
-
-		// slot provider that hand out parallelism / 3 slots, then throws an exception
-		final SlotProvider slotProvider = mock(SlotProvider.class);
-
-		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
-		final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList(
-			createSlot(taskManager, jobId, recycler),
-			createSlot(taskManager, jobId, recycler),
-			createSlot(taskManager, jobId, recycler)));
-
-		when(slotProvider.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
-			(InvocationOnMock invocation) -> {
-					if (availableSlots.isEmpty()) {
-						throw new TestRuntimeException();
-					} else {
-						return CompletableFuture.completedFuture(availableSlots.remove(0));
-					}
-				});
-
-		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
-		final ExecutionJobVertex ejv = eg.getJobVertex(vertex.getID());
-
-		// acquire resources and check that all are back after the failure
-
-		final int numSlotsToExpectBack = availableSlots.size();
-
-		try {
-			ejv.allocateResourcesForAll(slotProvider, false);
-			fail("should have failed with an exception");
-		}
-		catch (TestRuntimeException e) {
-			// expected
-		}
-
-		assertEquals(numSlotsToExpectBack, returnedSlots.size());
-	}
-
-	/**
-	 * Tests that the {@link ExecutionGraph#scheduleForExecution()} method
-	 * releases partially acquired resources upon exception.
-	 */
-	@Test
-	public void testExecutionGraphScheduleReleasesResourcesOnException() throws Exception {
-
-		//                                            [pipelined]
-		//  we construct a simple graph    (source) ----------------> (target)
-
-		final int parallelism = 3;
-
-		final JobVertex sourceVertex = new JobVertex("source");
-		sourceVertex.setParallelism(parallelism);
-		sourceVertex.setInvokableClass(NoOpInvokable.class);
-
-		final JobVertex targetVertex = new JobVertex("target");
-		targetVertex.setParallelism(parallelism);
-		targetVertex.setInvokableClass(NoOpInvokable.class);
-
-		targetVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
-
-		final JobID jobId = new JobID();
-		final JobGraph jobGraph = new JobGraph(jobId, "test", sourceVertex, targetVertex);
-
-		// set up some available slots and some slot owner that accepts released slots back
-		final List<SimpleSlot> returnedSlots = new ArrayList<>();
-		final SlotOwner recycler = new SlotOwner() {
-			@Override
-			public boolean returnAllocatedSlot(Slot slot) {
-				returnedSlots.add((SimpleSlot) slot);
-				return true;
-			}
-		};
-
-		final TaskManagerGateway taskManager = mock(TaskManagerGateway.class);
-		final List<SimpleSlot> availableSlots = new ArrayList<>(Arrays.asList(
-			createSlot(taskManager, jobId, recycler),
-			createSlot(taskManager, jobId, recycler),
-			createSlot(taskManager, jobId, recycler),
-			createSlot(taskManager, jobId, recycler),
-			createSlot(taskManager, jobId, recycler)));
-
-
-		// slot provider that hand out parallelism / 3 slots, then throws an exception
-		final SlotProvider slotProvider = mock(SlotProvider.class);
-
-		when(slotProvider.allocateSlot(any(ScheduledUnit.class), anyBoolean())).then(
-			(InvocationOnMock invocation) -> {
-					if (availableSlots.isEmpty()) {
-						throw new TestRuntimeException();
-					} else {
-						return CompletableFuture.completedFuture(availableSlots.remove(0));
-					}
-				});
-
-		final ExecutionGraph eg = createExecutionGraph(jobGraph, slotProvider);
-
-		// acquire resources and check that all are back after the failure
-
-		final int numSlotsToExpectBack = availableSlots.size();
-
-		try {
-			eg.setScheduleMode(ScheduleMode.EAGER);
-			eg.scheduleForExecution();
-			fail("should have failed with an exception");
-		}
-		catch (TestRuntimeException e) {
-			// expected
-		}
-
-		assertEquals(numSlotsToExpectBack, returnedSlots.size());
-	}
-
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
index de9081b..4ce3f9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.java
@@ -112,25 +112,29 @@ public class ExecutionGraphStopTest extends TestLogger {
 		// deploy source 1
 		for (ExecutionVertex ev : eg.getJobVertex(source1.getID()).getTaskVertices()) {
 			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
-			ev.getCurrentExecutionAttempt().deployToSlot(slot);
+			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
+			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		// deploy source 2
 		for (ExecutionVertex ev : eg.getJobVertex(source2.getID()).getTaskVertices()) {
 			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, sourceGateway);
-			ev.getCurrentExecutionAttempt().deployToSlot(slot);
+			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
+			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		// deploy non-source 1
 		for (ExecutionVertex ev : eg.getJobVertex(nonSource1.getID()).getTaskVertices()) {
 			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
-			ev.getCurrentExecutionAttempt().deployToSlot(slot);
+			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
+			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		// deploy non-source 2
 		for (ExecutionVertex ev : eg.getJobVertex(nonSource2.getID()).getTaskVertices()) {
 			SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, nonSourceGateway);
-			ev.getCurrentExecutionAttempt().deployToSlot(slot);
+			ev.getCurrentExecutionAttempt().tryAssignResource(slot);
+			ev.getCurrentExecutionAttempt().deploy();
 		}
 
 		eg.stop();
@@ -162,7 +166,8 @@ public class ExecutionGraphStopTest extends TestLogger {
 
 		final SimpleSlot slot = ExecutionGraphTestUtils.createMockSimpleSlot(jid, gateway);
 
-		exec.deployToSlot(slot);
+		exec.tryAssignResource(slot);
+		exec.deploy();
 		exec.switchToRunning();
 		assertEquals(ExecutionState.RUNNING, exec.getState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index b534ade..5feeabc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.executiongraph;
 
-import akka.actor.Status;
-
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
@@ -63,20 +61,20 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.SerializedValue;
 
+import akka.actor.Status;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
-
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.ExecutionContext$;
+
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
-
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
@@ -231,15 +229,10 @@ public class ExecutionGraphTestUtils {
 	}
 	
 	public static void setVertexResource(ExecutionVertex vertex, SimpleSlot slot) {
-		try {
-			Execution exec = vertex.getCurrentExecutionAttempt();
-			
-			Field f = Execution.class.getDeclaredField("assignedResource");
-			f.setAccessible(true);
-			f.set(exec, slot);
-		}
-		catch (Exception e) {
-			throw new RuntimeException("Modifying the slot failed", e);
+		Execution exec = vertex.getCurrentExecutionAttempt();
+
+		if(!exec.tryAssignResource(slot)) {
+			throw new RuntimeException("Could not assign resource.");
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
deleted file mode 100644
index c616501..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphUtilsTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.executiongraph;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
-import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import org.junit.Test;
-
-import java.net.InetAddress;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-import static org.mockito.Mockito.*;
-
-/**
- * Tests for the utility methods in the class {@link ExecutionGraphUtils}.
- */
-public class ExecutionGraphUtilsTest {
-
-	@Test
-	public void testReleaseSlots() {
-		final JobID jid = new JobID();
-		final SlotOwner owner = mock(SlotOwner.class);
-
-		final SimpleSlot slot1 = new SimpleSlot(createAllocatedSlot(jid, 0), owner, 0);
-		final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
-		final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
-
-		final CompletableFuture<SimpleSlot> incompleteFuture = new CompletableFuture<>();
-
-		final CompletableFuture<SimpleSlot> completeFuture = new CompletableFuture<>();
-		completeFuture.complete(slot2);
-
-		final CompletableFuture<SimpleSlot> disposedSlotFuture = new CompletableFuture<>();
-		slot3.releaseSlot();
-		disposedSlotFuture.complete(slot3);
-
-		// release all futures
-		ExecutionGraphUtils.releaseSlotFuture(incompleteFuture);
-		ExecutionGraphUtils.releaseSlotFuture(completeFuture);
-		ExecutionGraphUtils.releaseSlotFuture(disposedSlotFuture);
-
-		// only now complete the incomplete future
-		incompleteFuture.complete(slot1);
-
-		// verify that each slot was returned once to the owner
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot1));
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot2));
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot3));
-	}
-
-	@Test
-	public void testReleaseSlotsWithNulls() {
-		final JobID jid = new JobID();
-		final SlotOwner owner = mock(SlotOwner.class);
-
-		final Execution mockExecution = mock(Execution.class);
-
-		final SimpleSlot slot1 = new SimpleSlot(createAllocatedSlot(jid, 0), owner, 0);
-		final SimpleSlot slot2 = new SimpleSlot(createAllocatedSlot(jid, 1), owner, 1);
-		final SimpleSlot slot3 = new SimpleSlot(createAllocatedSlot(jid, 2), owner, 2);
-		final SimpleSlot slot4 = new SimpleSlot(createAllocatedSlot(jid, 3), owner, 3);
-		final SimpleSlot slot5 = new SimpleSlot(createAllocatedSlot(jid, 4), owner, 4);
-
-		ExecutionAndSlot[] slots1 = new ExecutionAndSlot[] {
-				null,
-				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot1)),
-				null,
-				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot2)),
-				null
-		};
-
-		ExecutionAndSlot[] slots2 = new ExecutionAndSlot[] {
-				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot3)),
-				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot4)),
-				new ExecutionAndSlot(mockExecution, CompletableFuture.completedFuture(slot5))
-		};
-
-		List<ExecutionAndSlot[]> resources = Arrays.asList(null, slots1, new ExecutionAndSlot[0], null, slots2);
-
-		ExecutionGraphUtils.releaseAllSlotsSilently(resources);
-
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot1));
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot2));
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot3));
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot4));
-		verify(owner, times(1)).returnAllocatedSlot(eq(slot5));
-	}
-
-	// ------------------------------------------------------------------------
-
-	private static AllocatedSlot createAllocatedSlot(JobID jid, int num) {
-		TaskManagerLocation loc = new TaskManagerLocation(
-				ResourceID.generate(), InetAddress.getLoopbackAddress(), 10000 + num);
-	
-		return new AllocatedSlot(new AllocationID(), jid, loc, num,
-				ResourceProfile.UNKNOWN, mock(TaskManagerGateway.class));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
index 44e1794..9908dae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.java
@@ -263,8 +263,8 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
+			setVertexState(vertex, ExecutionState.RUNNING);
 
 			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 
@@ -303,8 +303,8 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
+			setVertexState(vertex, ExecutionState.RUNNING);
 
 			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 
@@ -351,8 +351,8 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			Instance instance = getInstance(new ActorTaskManagerGateway(actorGateway));
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
+			setVertexState(vertex, ExecutionState.RUNNING);
 
 			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 
@@ -384,8 +384,8 @@ public class ExecutionVertexCancelTest extends TestLogger {
 			Instance instance = getInstance(new ActorTaskManagerGateway(gateway));
 			SimpleSlot slot = instance.allocateSimpleSlot(new JobID());
 
-			setVertexState(vertex, ExecutionState.RUNNING);
 			setVertexResource(vertex, slot);
+			setVertexState(vertex, ExecutionState.RUNNING);
 
 			assertEquals(ExecutionState.RUNNING, vertex.getExecutionState());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 5f12646..15d021a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
@@ -50,6 +51,7 @@ import org.junit.Test;
 import java.lang.reflect.Field;
 import java.net.InetAddress;
 import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -92,10 +94,10 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		// validate that the target vertices have no location preference
 		for (int i = 0; i < parallelism; i++) {
 			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
-			Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+			Iterator<CompletableFuture<TaskManagerLocation>> preference = target.getPreferredLocations().iterator();
 
 			assertTrue(preference.hasNext());
-			assertEquals(locations[i], preference.next());
+			assertEquals(locations[i], preference.next().get());
 			assertFalse(preference.hasNext());
 		}
 	}
@@ -122,7 +124,7 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		for (int i = 0; i < parallelism; i++) {
 			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
 
-			Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+			Iterator<CompletableFuture<TaskManagerLocation>> preference = target.getPreferredLocations().iterator();
 			assertFalse(preference.hasNext());
 		}
 	}
@@ -178,10 +180,10 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 		// validate that the target vertices have the state's location as the location preference
 		for (int i = 0; i < parallelism; i++) {
 			ExecutionVertex target = graph.getAllVertices().get(targetVertexId).getTaskVertices()[i];
-			Iterator<TaskManagerLocation> preference = target.getPreferredLocations().iterator();
+			Iterator<CompletableFuture<TaskManagerLocation>> preference = target.getPreferredLocations().iterator();
 
 			assertTrue(preference.hasNext());
-			assertEquals(locations[i], preference.next());
+			assertEquals(locations[i], preference.next().get());
 			assertFalse(preference.hasNext());
 		}
 	}
@@ -236,10 +238,9 @@ public class ExecutionVertexLocalityTest extends TestLogger {
 
 		SimpleSlot simpleSlot = new SimpleSlot(slot, mock(SlotOwner.class), 0);
 
-		final Field locationField = Execution.class.getDeclaredField("assignedResource");
-		locationField.setAccessible(true);
-
-		locationField.set(vertex.getCurrentExecutionAttempt(), simpleSlot);
+		if (!vertex.getCurrentExecutionAttempt().tryAssignResource(simpleSlot)) {
+			throw new FlinkException("Could not assign resource.");
+		}
 	}
 
 	private void setState(Execution execution, ExecutionState state) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
index afb9dac..9f4a675 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/ScheduleWithCoLocationHintTest.java
@@ -33,12 +33,13 @@ import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import java.util.concurrent.ExecutionException;
 
-public class ScheduleWithCoLocationHintTest {
+public class ScheduleWithCoLocationHintTest extends TestLogger {
 
 	@Test
 	public void scheduleAllSharedAndCoLocated() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
index c049593..1f88dd8 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerSlotSharingTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
@@ -47,7 +48,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for the scheduler when scheduling tasks in slot sharing groups.
  */
-public class SchedulerSlotSharingTest {
+public class SchedulerSlotSharingTest extends TestLogger {
 
 	@Test
 	public void scheduleSingleVertexType() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c73b2fe1/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
index 4312b0f..c7d0f09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java
@@ -25,9 +25,11 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -105,8 +107,13 @@ public class SchedulerTestUtils {
 	
 	public static Execution getTestVertex(Iterable<TaskManagerLocation> preferredLocations) {
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
-		
-		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocations);
+
+		Collection<CompletableFuture<TaskManagerLocation>> preferredLocationFutures = new ArrayList<>(4);
+
+		for (TaskManagerLocation preferredLocation : preferredLocations) {
+			preferredLocationFutures.add(CompletableFuture.completedFuture(preferredLocation));
+		}
+		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferredLocationFutures);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.toString()).thenReturn("TEST-VERTEX");
 		
@@ -119,7 +126,7 @@ public class SchedulerTestUtils {
 	public static Execution getTestVertex(JobVertexID jid, int taskIndex, int numTasks) {
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		
-		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(null);
+		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Collections.emptyList());
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.getJobvertexId()).thenReturn(jid);
 		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);
@@ -139,7 +146,13 @@ public class SchedulerTestUtils {
 
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 
-		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(Arrays.asList(locations));
+		Collection<CompletableFuture<TaskManagerLocation>> preferrecLocationFutures = new ArrayList<>(locations.length);
+
+		for (TaskManagerLocation location : locations) {
+			preferrecLocationFutures.add(CompletableFuture.completedFuture(location));
+		}
+
+		when(vertex.getPreferredLocationsBasedOnInputs()).thenReturn(preferrecLocationFutures);
 		when(vertex.getJobId()).thenReturn(new JobID());
 		when(vertex.getJobvertexId()).thenReturn(jid);
 		when(vertex.getParallelSubtaskIndex()).thenReturn(taskIndex);


Mime
View raw message