flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/8] flink git commit: [FLINK-4490] [distributed coordination] (part 2) Make slots independent of 'Instance'.
Date Fri, 02 Sep 2016 15:36:07 GMT
[FLINK-4490] [distributed coordination] (part 2) Make slots independent of 'Instance'.

To allow for a future dynamic slot allocation and release model, the slots should not depend on 'Instance'.
In this change, the Slots hold most of the necessary information directly (location, gateway) and
the interact with the Instance only via a 'SlotOwner' interface.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aaa474ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aaa474ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aaa474ad

Branch: refs/heads/master
Commit: aaa474ad8f1d638c3988697dd57446802142119b
Parents: 34cda87
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Aug 30 20:34:20 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Fri Sep 2 17:32:57 2016 +0200

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java       |  14 +-
 .../flink/runtime/executiongraph/Execution.java |  51 +++----
 .../runtime/executiongraph/ExecutionVertex.java |  17 ++-
 .../runtime/instance/HardwareDescription.java   |  28 ++--
 .../apache/flink/runtime/instance/Instance.java |  22 +--
 .../flink/runtime/instance/SharedSlot.java      |  45 ++++--
 .../flink/runtime/instance/SimpleSlot.java      |  35 +++--
 .../org/apache/flink/runtime/instance/Slot.java | 103 ++++++++++---
 .../instance/SlotSharingGroupAssignment.java    | 132 ++++++++---------
 .../scheduler/CoLocationConstraint.java         |  48 ++++---
 .../runtime/jobmanager/scheduler/Scheduler.java |  74 ++++++----
 .../scheduler/SlotAllocationFuture.java         | 116 ++++++++++-----
 .../runtime/jobmanager/slots/SlotOwner.java     |  29 ++++
 .../taskmanager/TaskManagerLocation.java        |   2 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../testingUtils/TestingJobManagerLike.scala    |   2 +-
 .../ExecutionGraphMetricsTest.java              |   4 +-
 .../VertexLocationConstraintTest.java           |  52 +++----
 .../flink/runtime/instance/SharedSlotsTest.java |  67 ++++-----
 .../ScheduleWithCoLocationHintTest.java         | 144 ++++++++++---------
 .../scheduler/SchedulerIsolatedTasksTest.java   |  52 +++----
 .../scheduler/SchedulerSlotSharingTest.java     | 102 +++++++------
 .../scheduler/SchedulerTestUtils.java           |  29 +++-
 .../scheduler/SlotAllocationFutureTest.java     |  51 +++++--
 .../resourcemanager/ResourceManagerITCase.java  |  17 ++-
 25 files changed, 741 insertions(+), 497 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index f31febb..0912055 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -18,16 +18,18 @@
 
 package org.apache.flink.runtime.deployment;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -88,6 +90,7 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 	public static InputChannelDeploymentDescriptor[] fromEdges(
 			ExecutionEdge[] edges, SimpleSlot consumerSlot) {
 
+		final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID();
 		final InputChannelDeploymentDescriptor[] icdd = new InputChannelDeploymentDescriptor[edges.length];
 
 		// Each edge is connected to a different result partition
@@ -105,16 +108,17 @@ public class InputChannelDeploymentDescriptor implements Serializable {
 					(producerState == ExecutionState.RUNNING
 							|| producerState == ExecutionState.FINISHED)) {
 
-				final Instance partitionInstance = producerSlot.getInstance();
+				final TaskManagerLocation partitionTaskManagerLocation = producerSlot.getTaskManagerLocation();
+				final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();
 
-				if (partitionInstance.equals(consumerSlot.getInstance())) {
-					// Consuming task is deployed to the same instance as the partition => local
+				if (partitionTaskManager.equals(consumerTaskManager)) {
+					// Consuming task is deployed to the same TaskManager as the partition => local
 					partitionLocation = ResultPartitionLocation.createLocal();
 				}
 				else {
 					// Different instances => remote
 					final ConnectionID connectionId = new ConnectionID(
-							partitionInstance.getInstanceConnectionInfo(),
+							partitionTaskManagerLocation,
 							consumedPartition.getIntermediateResult().getConnectionIndex());
 
 					partitionLocation = ResultPartitionLocation.createRemote(connectionId);

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 197999c..846df49 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -25,12 +25,12 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -49,7 +49,6 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.util.SerializableObject;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -371,7 +370,7 @@ public class Execution {
 				throw new JobException("Could not assign the ExecutionVertex to the slot " + slot);
 			}
 			this.assignedResource = slot;
-			this.assignedResourceLocation = slot.getInstance().getInstanceConnectionInfo();
+			this.assignedResourceLocation = slot.getTaskManagerLocation();
 
 			// race double check, did we fail/cancel and do we need to release the slot?
 			if (this.state != DEPLOYING) {
@@ -381,7 +380,7 @@ public class Execution {
 
 			if (LOG.isInfoEnabled()) {
 				LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(),
-						attemptNumber, slot.getInstance().getInstanceConnectionInfo().getHostname()));
+						attemptNumber, assignedResourceLocation.getHostname()));
 			}
 
 			final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(
@@ -393,9 +392,8 @@ public class Execution {
 
 			// register this execution at the execution graph, to receive call backs
 			vertex.getExecutionGraph().registerExecution(this);
-
-			final Instance instance = slot.getInstance();
-			final ActorGateway gateway = instance.getActorGateway();
+			
+			final ActorGateway gateway = slot.getTaskManagerActorGateway();
 
 			final Future<Object> deployAction = gateway.ask(new SubmitTask(deployment), timeout);
 
@@ -408,7 +406,7 @@ public class Execution {
 							String taskname = deployment.getTaskInfo().getTaskNameWithSubtasks() + " (" + attemptId + ')';
 
 							markFailed(new Exception(
-									"Cannot deploy task " + taskname + " - TaskManager (" + instance
+									"Cannot deploy task " + taskname + " - TaskManager (" + assignedResourceLocation
 									+ ") not responding after a timeout of " + timeout, failure));
 						}
 						else {
@@ -437,7 +435,7 @@ public class Execution {
 		final SimpleSlot slot = this.assignedResource;
 
 		if (slot != null) {
-			final ActorGateway gateway = slot.getInstance().getActorGateway();
+			final ActorGateway gateway = slot.getTaskManagerActorGateway();
 
 			Future<Object> stopResult = gateway.retry(
 				new StopTask(attemptId),
@@ -590,24 +588,25 @@ public class Execution {
 						continue;
 					}
 
-					final Instance consumerInstance = consumerSlot.getInstance();
-
-					final ResultPartitionID partitionId = new ResultPartitionID(
-							partition.getPartitionId(), attemptId);
+					final TaskManagerLocation partitionTaskManagerLocation = partition.getProducer()
+							.getCurrentAssignedResource().getTaskManagerLocation();
+					final ResourceID partitionTaskManager = partitionTaskManagerLocation.getResourceID();
+					
+					final ResourceID consumerTaskManager = consumerSlot.getTaskManagerID();
 
-					final Instance partitionInstance = partition.getProducer()
-							.getCurrentAssignedResource().getInstance();
+					final ResultPartitionID partitionId = new ResultPartitionID(partition.getPartitionId(), attemptId);
+					
 
 					final ResultPartitionLocation partitionLocation;
 
-					if (consumerInstance.equals(partitionInstance)) {
+					if (consumerTaskManager.equals(partitionTaskManager)) {
 						// Consuming task is deployed to the same instance as the partition => local
 						partitionLocation = ResultPartitionLocation.createLocal();
 					}
 					else {
 						// Different instances => remote
 						final ConnectionID connectionId = new ConnectionID(
-								partitionInstance.getInstanceConnectionInfo(),
+								partitionTaskManagerLocation,
 								partition.getIntermediateResult().getConnectionIndex());
 
 						partitionLocation = ResultPartitionLocation.createRemote(connectionId);
@@ -916,7 +915,7 @@ public class Execution {
 
 		if (slot != null) {
 
-			final ActorGateway gateway = slot.getInstance().getActorGateway();
+			final ActorGateway gateway = slot.getTaskManagerActorGateway();
 
 			Future<Object> cancelResult = gateway.retry(
 				new CancelTask(attemptId),
@@ -946,14 +945,10 @@ public class Execution {
 		final SimpleSlot slot = this.assignedResource;
 
 		if (slot != null) {
-			final Instance instance = slot.getInstance();
+			final ActorGateway gateway = slot.getTaskManagerActorGateway();
 
-			if (instance.isAlive()) {
-				final ActorGateway gateway = instance.getActorGateway();
-
-				// TODO For some tests this could be a problem when querying too early if all resources were released
-				gateway.tell(new FailIntermediateResultPartitions(attemptId));
-			}
+			// TODO For some tests this could be a problem when querying too early if all resources were released
+			gateway.tell(new FailIntermediateResultPartitions(attemptId));
 		}
 	}
 
@@ -968,15 +963,15 @@ public class Execution {
 			final UpdatePartitionInfo updatePartitionInfo) {
 
 		if (consumerSlot != null) {
-			final Instance instance = consumerSlot.getInstance();
-			final ActorGateway gateway = instance.getActorGateway();
+			final ActorGateway gateway = consumerSlot.getTaskManagerActorGateway();
+			final TaskManagerLocation taskManagerLocation = consumerSlot.getTaskManagerLocation();
 
 			Future<Object> futureUpdate = gateway.ask(updatePartitionInfo, timeout);
 
 			futureUpdate.onFailure(new OnFailure() {
 				@Override
 				public void onFailure(Throwable failure) throws Throwable {
-					fail(new IllegalStateException("Update task on instance " + instance +
+					fail(new IllegalStateException("Update task on TaskManager " + taskManagerLocation +
 							" failed due to:", failure));
 				}
 			}, executionContext);

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index e5a115a..f02647e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -27,7 +27,6 @@ import org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescript
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
@@ -98,7 +97,7 @@ public class ExecutionVertex {
 
 	private volatile Execution currentExecution;	// this field must never be null
 
-	private volatile List<Instance> locationConstraintInstances;
+	private volatile List<TaskManagerLocation> locationConstraintInstances;
 
 	private volatile boolean scheduleLocalOnly;
 
@@ -352,7 +351,7 @@ public class ExecutionVertex {
 		}
 	}
 
-	public void setLocationConstraintHosts(List<Instance> instances) {
+	public void setLocationConstraintHosts(List<TaskManagerLocation> instances) {
 		this.locationConstraintInstances = instances;
 	}
 
@@ -376,9 +375,9 @@ public class ExecutionVertex {
 	 *
 	 * @return The preferred locations for this vertex execution, or null, if there is no preference.
 	 */
-	public Iterable<Instance> getPreferredLocations() {
+	public Iterable<TaskManagerLocation> getPreferredLocations() {
 		// if we have hard location constraints, use those
-		List<Instance> constraintInstances = this.locationConstraintInstances;
+		List<TaskManagerLocation> constraintInstances = this.locationConstraintInstances;
 		if (constraintInstances != null && !constraintInstances.isEmpty()) {
 			return constraintInstances;
 		}
@@ -388,8 +387,8 @@ public class ExecutionVertex {
 			return Collections.emptySet();
 		}
 		else {
-			Set<Instance> locations = new HashSet<Instance>();
-			Set<Instance> inputLocations = new HashSet<Instance>();
+			Set<TaskManagerLocation> locations = new HashSet<>();
+			Set<TaskManagerLocation> inputLocations = new HashSet<>();
 
 			// go over all inputs
 			for (int i = 0; i < inputEdges.length; i++) {
@@ -402,7 +401,7 @@ public class ExecutionVertex {
 						SimpleSlot sourceSlot = sources[k].getSource().getProducer().getCurrentAssignedResource();
 						if (sourceSlot != null) {
 							// add input location
-							inputLocations.add(sourceSlot.getInstance());
+							inputLocations.add(sourceSlot.getTaskManagerLocation());
 							// inputs which have too many distinct sources are not considered
 							if (inputLocations.size() > MAX_DISTINCT_LOCATIONS_TO_CONSIDER) {
 								inputLocations.clear();
@@ -495,7 +494,7 @@ public class ExecutionVertex {
 
 			// send only if we actually have a target
 			if (slot != null) {
-				ActorGateway gateway = slot.getInstance().getActorGateway();
+				ActorGateway gateway = slot.getTaskManagerActorGateway();
 				if (gateway != null) {
 					if (sender == null) {
 						gateway.tell(message);

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
index bfcc1e5..9c1c5b7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescription.java
@@ -30,22 +30,16 @@ public final class HardwareDescription implements Serializable {
 	private static final long serialVersionUID = 3380016608300325361L;
 
 	/** The number of CPU cores available to the JVM on the compute node. */
-	private int numberOfCPUCores;
+	private final int numberOfCPUCores;
 
 	/** The size of physical memory in bytes available on the compute node. */
-	private long sizeOfPhysicalMemory;
+	private final long sizeOfPhysicalMemory;
 
 	/** The size of the JVM heap memory */
-	private long sizeOfJvmHeap;
-	
-	/** The size of the memory managed by the system for caching, hashing, sorting, ... */
-	private long sizeOfManagedMemory;
+	private final long sizeOfJvmHeap;
 
-	
-	/**
-	 * Public default constructor used for serialization process.
-	 */
-	public HardwareDescription() {}
+	/** The size of the memory managed by the system for caching, hashing, sorting, ... */
+	private final long sizeOfManagedMemory;
 
 	/**
 	 * Constructs a new hardware description object.
@@ -88,7 +82,7 @@ public final class HardwareDescription implements Serializable {
 	public long getSizeOfJvmHeap() {
 		return this.sizeOfJvmHeap;
 	}
-	
+
 	/**
 	 * Returns the size of the memory managed by the system for caching, hashing, sorting, ...
 	 * 
@@ -97,26 +91,26 @@ public final class HardwareDescription implements Serializable {
 	public long getSizeOfManagedMemory() {
 		return this.sizeOfManagedMemory;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// Utils
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return String.format("cores=%d, physMem=%d, heap=%d, managed=%d", 
 				numberOfCPUCores, sizeOfPhysicalMemory, sizeOfJvmHeap, sizeOfManagedMemory);
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	// Factory
 	// --------------------------------------------------------------------------------------------
-	
+
 	public static HardwareDescription extractFromSystem(long managedMemory) {
 		final int numberOfCPUCores = Hardware.getNumberCPUCores();
 		final long sizeOfJvmHeap = Runtime.getRuntime().maxMemory();
 		final long sizeOfPhysicalMemory = Hardware.getSizeOfPhysicalMemory();
-		
+
 		return new HardwareDescription(numberOfCPUCores, sizeOfPhysicalMemory, sizeOfJvmHeap, managedMemory);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
index 598b32b..fe46895 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Instance.java
@@ -28,15 +28,20 @@ import java.util.Set;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotAvailabilityListener;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * An instance represents a {@link org.apache.flink.runtime.taskmanager.TaskManager}
  * registered at a JobManager and ready to receive work.
  */
-public class Instance {
+public class Instance implements SlotOwner {
 
 	private final static Logger LOG = LoggerFactory.getLogger(Instance.class);
 
@@ -241,7 +246,7 @@ public class Instance {
 				return null;
 			}
 			else {
-				SimpleSlot slot = new SimpleSlot(jobID, this, nextSlot);
+				SimpleSlot slot = new SimpleSlot(jobID, this, connectionInfo, nextSlot, actorGateway);
 				allocatedSlots.add(slot);
 				return slot;
 			}
@@ -278,7 +283,8 @@ public class Instance {
 				return null;
 			}
 			else {
-				SharedSlot slot = new SharedSlot(jobID, this, nextSlot, sharingGroupAssignment);
+				SharedSlot slot = new SharedSlot(
+						jobID, this, connectionInfo, nextSlot, actorGateway, sharingGroupAssignment);
 				allocatedSlots.add(slot);
 				return slot;
 			}
@@ -295,13 +301,11 @@ public class Instance {
 	 * @param slot The slot to return.
 	 * @return True, if the slot was returned, false if not.
 	 */
+	@Override
 	public boolean returnAllocatedSlot(Slot slot) {
-		if (slot == null || slot.getInstance() != this) {
-			throw new IllegalArgumentException("Slot is null or belongs to the wrong TaskManager.");
-		}
-		if (slot.isAlive()) {
-			throw new IllegalArgumentException("Slot is still alive");
-		}
+		checkNotNull(slot);
+		checkArgument(!slot.isAlive(), "slot is still alive");
+		checkArgument(slot.getOwner() == this, "slot belongs to the wrong TaskManager.");
 
 		if (slot.markReleased()) {
 			LOG.debug("Return allocated slot {}.", slot);

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
index ef62910..7f05604 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SharedSlot.java
@@ -18,13 +18,18 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.api.common.JobID;
 
+import javax.annotation.Nullable;
 import java.util.ConcurrentModificationException;
 import java.util.HashSet;
 import java.util.Set;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * This class represents a shared slot. A shared slot can have multiple
  * {@link SimpleSlot} instances within itself. This allows to
@@ -35,7 +40,7 @@ import java.util.Set;
  * <p><b>IMPORTANT:</b> This class contains no synchronization. Thus, the caller has to guarantee proper
  * synchronization. In the current implementation, all concurrently modifying operations are
  * passed through a {@link SlotSharingGroupAssignment} object which is responsible for
- * synchronization.</p>
+ * synchronization.
  */
 public class SharedSlot extends Slot {
 
@@ -51,12 +56,18 @@ public class SharedSlot extends Slot {
 	 * This constructor is used to create a slot directly from an instance. 
 	 * 
 	 * @param jobID The ID of the job that the slot is created for.
-	 * @param instance The instance that holds the slot.
+	 * @param owner The component from which this slot is allocated.
+	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the slot.
+	 * @param taskManagerActorGateway The actor gateway to communicate with the TaskManager   
 	 * @param assignmentGroup The assignment group that this shared slot belongs to.
 	 */
-	public SharedSlot(JobID jobID, Instance instance, int slotNumber, SlotSharingGroupAssignment assignmentGroup) {
-		this(jobID, instance, slotNumber, assignmentGroup, null, null);
+	public SharedSlot(
+			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
+			ActorGateway taskManagerActorGateway,
+			SlotSharingGroupAssignment assignmentGroup) {
+
+		this(jobID, owner, location, slotNumber, taskManagerActorGateway, assignmentGroup, null, null);
 	}
 
 	/**
@@ -64,15 +75,23 @@ public class SharedSlot extends Slot {
 	 * to the given task group.
 	 * 
 	 * @param jobID The ID of the job that the slot is created for.
-	 * @param instance The instance that holds the slot.
+	 * @param owner The component from which this slot is allocated.
+	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the slot.
+	 * @param taskManagerActorGateway The actor gateway to communicate with the TaskManager   
 	 * @param assignmentGroup The assignment group that this shared slot belongs to.
+	 * @param parent The parent slot of this slot.
+	 * @param groupId The assignment group of this slot.
 	 */
-	public SharedSlot(JobID jobID, Instance instance, int slotNumber,
-						SlotSharingGroupAssignment assignmentGroup, SharedSlot parent, AbstractID groupId) {
-		super(jobID, instance, slotNumber, parent, groupId);
+	public SharedSlot(
+			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
+			ActorGateway taskManagerActorGateway,
+			SlotSharingGroupAssignment assignmentGroup,
+			@Nullable SharedSlot parent, @Nullable AbstractID groupId) {
+
+		super(jobID, owner, location, slotNumber, taskManagerActorGateway, parent, groupId);
 
-		this.assignmentGroup = assignmentGroup;
+		this.assignmentGroup = checkNotNull(assignmentGroup);
 		this.subSlots = new HashSet<Slot>();
 	}
 
@@ -148,7 +167,9 @@ public class SharedSlot extends Slot {
 	 */
 	SimpleSlot allocateSubSlot(AbstractID groupId) {
 		if (isAlive()) {
-			SimpleSlot slot = new SimpleSlot(getJobID(), getInstance(), subSlots.size(), this, groupId);
+			SimpleSlot slot = new SimpleSlot(
+					getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(), 
+					getTaskManagerActorGateway(), this, groupId);
 			subSlots.add(slot);
 			return slot;
 		}
@@ -168,7 +189,9 @@ public class SharedSlot extends Slot {
 	 */
 	SharedSlot allocateSharedSlot(AbstractID groupId){
 		if (isAlive()) {
-			SharedSlot slot = new SharedSlot(getJobID(), getInstance(), subSlots.size(), assignmentGroup, this, groupId);
+			SharedSlot slot = new SharedSlot(
+					getJobID(), getOwner(), getTaskManagerLocation(), subSlots.size(), 
+					getTaskManagerActorGateway(), assignmentGroup, this, groupId);
 			subSlots.add(slot);
 			return slot;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
index dbe961a..615138f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlot.java
@@ -21,15 +21,18 @@ package org.apache.flink.runtime.instance;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 
+import javax.annotation.Nullable;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 /**
  * A SimpleSlot represents a single slot on a TaskManager instance, or a slot within a shared slot.
  *
  * <p>If this slot is part of a {@link SharedSlot}, then the parent attribute will point to that shared slot.
- * If not, then the parent attribute is null.</p>
+ * If not, then the parent attribute is null.
  */
 public class SimpleSlot extends Slot {
 
@@ -43,18 +46,22 @@ public class SimpleSlot extends Slot {
 	private volatile Execution executedTask;
 
 	/** The locality attached to the slot, defining whether the slot was allocated at the desired location. */
-	private Locality locality = Locality.UNCONSTRAINED;
+	private volatile Locality locality = Locality.UNCONSTRAINED;
 
 
 	/**
 	 * Creates a new simple slot that stands alone and does not belong to shared slot.
 	 * 
 	 * @param jobID The ID of the job that the slot is allocated for.
-	 * @param instance The instance that the slot belongs to.
+	 * @param owner The component from which this slot is allocated.
+	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the task slot on the instance.
+	 * @param taskManagerActorGateway The actor gateway to communicate with the TaskManager of this slot   
 	 */
-	public SimpleSlot(JobID jobID, Instance instance, int slotNumber) {
-		super(jobID, instance, slotNumber, null, null);
+	public SimpleSlot(
+			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
+			ActorGateway taskManagerActorGateway) {
+		this(jobID, owner, location, slotNumber, taskManagerActorGateway, null, null);
 	}
 
 	/**
@@ -62,13 +69,18 @@ public class SimpleSlot extends Slot {
 	 * is identified by the given ID..
 	 *
 	 * @param jobID The ID of the job that the slot is allocated for.
-	 * @param instance The instance that the slot belongs to.
+	 * @param owner The component from which this slot is allocated.
+	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of the simple slot in its parent shared slot.
 	 * @param parent The parent shared slot.
 	 * @param groupID The ID that identifies the group that the slot belongs to.
 	 */
-	public SimpleSlot(JobID jobID, Instance instance, int slotNumber, SharedSlot parent, AbstractID groupID) {
-		super(jobID, instance, slotNumber, parent, groupID);
+	public SimpleSlot(
+			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
+			ActorGateway taskManagerActorGateway,
+			@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
+
+		super(jobID, owner, location, slotNumber, taskManagerActorGateway, parent, groupID);
 	}
 
 	// ------------------------------------------------------------------------
@@ -142,15 +154,12 @@ public class SimpleSlot extends Slot {
 
 	@Override
 	public void releaseSlot() {
-
 		if (!isCanceled()) {
 
 			// kill all tasks currently running in this slot
 			Execution exec = this.executedTask;
 			if (exec != null && !exec.isFinished()) {
-				exec.fail(new Exception(
-						"The slot in which the task was executed has been released. Probably loss of TaskManager "
-								+ getInstance()));
+				exec.fail(new Exception("TaskManager was lost/killed: " + getTaskManagerLocation()));
 			}
 
 			// release directly (if we are directly allocated),
@@ -158,7 +167,7 @@ public class SimpleSlot extends Slot {
 			if (getParent() == null) {
 				// we have to give back the slot to the owning instance
 				if (markCancelled()) {
-					getInstance().returnAllocatedSlot(this);
+					getOwner().returnAllocatedSlot(this);
 				}
 			} else {
 				// we have to ask our parent to dispose us

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 341ef95..451a9ec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -18,11 +18,18 @@
 
 package org.apache.flink.runtime.instance;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.api.common.JobID;
 
+import javax.annotation.Nullable;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Base class for task slots. TaskManagers offer one or more task slots, which define a slice of 
  * their resources.
@@ -30,7 +37,7 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
  * <p>In the simplest case, a slot holds a single task ({@link SimpleSlot}). In the more complex
  * case, a slot is shared ({@link SharedSlot}) and contains a set of tasks. Shared slots may contain
  * other shared slots which in turn can hold simple slots. That way, a shared slot may define a tree
- * of slots that belong to it.</p>
+ * of slots that belong to it.
  */
 public abstract class Slot {
 
@@ -52,15 +59,23 @@ public abstract class Slot {
 	/** The ID of the job this slice belongs to. */
 	private final JobID jobID;
 
-	/** The id of the group that this slot is allocated to. May be null. */
-	private final AbstractID groupID;
+	/** The location information of the TaskManager to which this slot belongs */
+	private final TaskManagerLocation taskManagerLocation;
+
+	/** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */
+	private final ActorGateway taskManagerActorGateway;
 
-	/** The instance on which the slot is allocated */
-	private final Instance instance;
+	/** The owner of this slot - the slot was taken from that owner and must be disposed to it */
+	private final SlotOwner owner;
 
 	/** The parent of this slot in the hierarchy, or null, if this is the parent */
+	@Nullable
 	private final SharedSlot parent;
 
+	/** The id of the group that this slot is allocated to. May be null. */
+	@Nullable
+	private final AbstractID groupID;
+
 	/** The number of the slot on which the task is deployed */
 	private final int slotNumber;
 
@@ -71,23 +86,28 @@ public abstract class Slot {
 	 * Base constructor for slots.
 	 * 
 	 * @param jobID The ID of the job that this slot is allocated for.
-	 * @param instance The instance from which this slot is allocated.
+	 * @param owner The component from which this slot is allocated.
+	 * @param location The location info of the TaskManager where the slot was allocated from
 	 * @param slotNumber The number of this slot.
+	 * @param taskManagerActorGateway The actor gateway to communicate with the TaskManager
 	 * @param parent The parent slot that contains this slot. May be null, if this slot is the root.
 	 * @param groupID The ID that identifies the task group for which this slot is allocated. May be null
 	 *                if the slot does not belong to any task group.   
 	 */
-	protected Slot(JobID jobID, Instance instance, int slotNumber, SharedSlot parent, AbstractID groupID) {
-		if (jobID == null || instance == null || slotNumber < 0) {
-			throw new IllegalArgumentException();
-		}
-
-		this.jobID = jobID;
-		this.instance = instance;
+	protected Slot(
+			JobID jobID, SlotOwner owner, TaskManagerLocation location, int slotNumber,
+			ActorGateway taskManagerActorGateway,
+			@Nullable SharedSlot parent, @Nullable AbstractID groupID) {
+
+		checkArgument(slotNumber >= 0);
+
+		this.jobID = checkNotNull(jobID);
+		this.taskManagerLocation = checkNotNull(location);
+		this.owner = checkNotNull(owner);
+		this.taskManagerActorGateway = checkNotNull(taskManagerActorGateway);
+		this.parent = parent; // may be null
+		this.groupID = groupID; // may be null
 		this.slotNumber = slotNumber;
-		this.parent = parent;
-		this.groupID = groupID;
-
 	}
 	// --------------------------------------------------------------------------------------------
 
@@ -101,12 +121,42 @@ public abstract class Slot {
 	}
 
 	/**
-	 * Gets the instance from which the slot was allocated.
+	 * Gets the ID of the TaskManager that offers this slot.
+	 *
+	 * @return The ID of the TaskManager that offers this slot
+	 */
+	public ResourceID getTaskManagerID() {
+		return taskManagerLocation.getResourceID();
+	}
+
+	/**
+	 * Gets the location info of the TaskManager that offers this slot.
 	 *
-	 * @return The instance from which the slot was allocated.
+	 * @return The location info of the TaskManager that offers this slot
 	 */
-	public Instance getInstance() {
-		return instance;
+	public TaskManagerLocation getTaskManagerLocation() {
+		return taskManagerLocation;
+	}
+
+	/**
+	 * Gets the actor gateway that can be used to send messages to the TaskManager.
+	 *
+	 * <p>This method should be removed once the new interface-based RPC abstraction is in place
+	 *
+	 * @return The actor gateway that can be used to send messages to the TaskManager.
+	 */
+	public ActorGateway getTaskManagerActorGateway() {
+		return taskManagerActorGateway;
+	}
+
+	/**
+	 * Gets the owner of this slot. The owner is the component that the slot was created from
+	 * and to which it needs to be returned after the executed tasks are done.
+	 * 
+	 * @return The owner of this slot.
+	 */
+	public SlotOwner getOwner() {
+		return owner;
 	}
 
 	/**
@@ -149,6 +199,7 @@ public abstract class Slot {
 	 * 
 	 * @return The ID identifying the logical group of slots.
 	 */
+	@Nullable
 	public AbstractID getGroupID() {
 		return groupID;
 	}
@@ -158,10 +209,18 @@ public abstract class Slot {
 	 * 
 	 * @return The parent slot, or null, if no this slot has no parent.
 	 */
+	@Nullable
 	public SharedSlot getParent() {
 		return parent;
 	}
 
+	/**
+	 * Gets the root slot of the tree containing this slot. If this slot is the root,
+	 * the method returns this slot directly, otherwise it recursively goes to the parent until
+	 * it reaches the root.
+	 * 
+	 * @return The root slot of the tree containing this slot
+	 */
 	public Slot getRoot() {
 		if (parent == null) {
 			return this;
@@ -244,11 +303,11 @@ public abstract class Slot {
 
 	@Override
 	public String toString() {
-		return hierarchy() + " - " + instance + " - " + getStateName(status);
+		return hierarchy() + " - " + taskManagerLocation + " - " + getStateName(status);
 	}
 
 	protected String hierarchy() {
-		return (getParent() != null ? getParent().hierarchy() : "") + "(" + slotNumber + ")";
+		return (getParent() != null ? getParent().hierarchy() : "") + '(' + slotNumber + ')';
 	}
 
 	private static String getStateName(int state) {

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
index 7d666fe..346cc77 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotSharingGroupAssignment.java
@@ -28,13 +28,15 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,9 +95,8 @@ public class SlotSharingGroupAssignment {
 	/** All slots currently allocated to this sharing group */
 	private final Set<SharedSlot> allSlots = new LinkedHashSet<SharedSlot>();
 
-	/** The slots available per vertex type (jid), keyed by instance, to make them locatable */
-	private final Map<AbstractID, Map<Instance, List<SharedSlot>>> availableSlotsPerJid = 
-			new LinkedHashMap<AbstractID, Map<Instance, List<SharedSlot>>>();
+	/** The slots available per vertex type (JobVertexId), keyed by TaskManager, to make them locatable */
+	private final Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlotsPerJid = new LinkedHashMap<>();
 
 
 	// --------------------------------------------------------------------------------------------
@@ -122,7 +123,7 @@ public class SlotSharingGroupAssignment {
 	 */
 	public int getNumberOfAvailableSlotsForGroup(AbstractID groupId) {
 		synchronized (lock) {
-			Map<Instance, List<SharedSlot>> available = availableSlotsPerJid.get(groupId);
+			Map<ResourceID, List<SharedSlot>> available = availableSlotsPerJid.get(groupId);
 
 			if (available != null) {
 				Set<SharedSlot> set = new HashSet<SharedSlot>();
@@ -148,37 +149,25 @@ public class SlotSharingGroupAssignment {
 	//  Slot allocation
 	// ------------------------------------------------------------------------
 
-	/**
-	 * 
-	 * @param sharedSlot
-	 * @param locality
-	 * @param groupId
-	 * @return
-	 */
 	public SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot sharedSlot, Locality locality, JobVertexID groupId) {
 		return addSharedSlotAndAllocateSubSlot(sharedSlot, locality, groupId, null);
 	}
 
-	/**
-	 * 
-	 * @param sharedSlot
-	 * @param locality
-	 * @param constraint
-	 * @return
-	 */
-	public SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot sharedSlot, Locality locality,
-														CoLocationConstraint constraint) {
+	public SimpleSlot addSharedSlotAndAllocateSubSlot(
+			SharedSlot sharedSlot, Locality locality, CoLocationConstraint constraint)
+	{
 		return addSharedSlotAndAllocateSubSlot(sharedSlot, locality, null, constraint);
 	}
-	
-	private SimpleSlot addSharedSlotAndAllocateSubSlot(SharedSlot sharedSlot, Locality locality,
-													JobVertexID groupId, CoLocationConstraint constraint) {
+
+	private SimpleSlot addSharedSlotAndAllocateSubSlot(
+			SharedSlot sharedSlot, Locality locality, JobVertexID groupId, CoLocationConstraint constraint) {
+
 		// sanity checks
 		if (!sharedSlot.isRootAndEmpty()) {
 			throw new IllegalArgumentException("The given slot is not an empty root slot.");
 		}
-		
-		final Instance location = sharedSlot.getInstance();
+
+		final ResourceID location = sharedSlot.getTaskManagerID();
 
 		synchronized (lock) {
 			// early out in case that the slot died (instance disappeared)
@@ -244,20 +233,20 @@ public class SlotSharingGroupAssignment {
 				// can place a task into this slot.
 				boolean entryForNewJidExists = false;
 				
-				for (Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
+				for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry : availableSlotsPerJid.entrySet()) {
 					// there is already an entry for this groupID
 					if (entry.getKey().equals(groupIdForMap)) {
 						entryForNewJidExists = true;
 						continue;
 					}
 
-					Map<Instance, List<SharedSlot>> available = entry.getValue();
+					Map<ResourceID, List<SharedSlot>> available = entry.getValue();
 					putIntoMultiMap(available, location, sharedSlot);
 				}
 
 				// make sure an empty entry exists for this group, if no other entry exists
 				if (!entryForNewJidExists) {
-					availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<Instance, List<SharedSlot>>());
+					availableSlotsPerJid.put(groupIdForMap, new LinkedHashMap<ResourceID, List<SharedSlot>>());
 				}
 
 				return subSlot;
@@ -287,18 +276,15 @@ public class SlotSharingGroupAssignment {
 
 	/**
 	 * 
-	 * @param vertexID
-	 * @param locationPreferences
-	 * @return
 	 */
-	SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<Instance> locationPreferences) {
+	SimpleSlot getSlotForTask(JobVertexID vertexID, Iterable<TaskManagerLocation> locationPreferences) {
 		synchronized (lock) {
-			Pair<SharedSlot, Locality> p = getSlotForTaskInternal(vertexID, locationPreferences, false);
+			Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(vertexID, locationPreferences, false);
 
 			if (p != null) {
-				SharedSlot ss = p.getLeft();
+				SharedSlot ss = p.f0;
 				SimpleSlot slot = ss.allocateSubSlot(vertexID);
-				slot.setLocality(p.getRight());
+				slot.setLocality(p.f1);
 				return slot;
 			}
 			else {
@@ -330,7 +316,7 @@ public class SlotSharingGroupAssignment {
 		return getSlotForTask(constraint, vertex.getPreferredLocations());
 	}
 	
-	SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<Instance> locationPreferences) {
+	SimpleSlot getSlotForTask(CoLocationConstraint constraint, Iterable<TaskManagerLocation> locationPreferences) {
 		synchronized (lock) {
 			if (constraint.isAssignedAndAlive()) {
 				// the shared slot of the co-location group is initialized and set we allocate a sub-slot
@@ -346,15 +332,16 @@ public class SlotSharingGroupAssignment {
 				if (previous == null) {
 					throw new IllegalStateException("Bug: Found assigned co-location constraint without a slot.");
 				}
-				
-				Instance location = previous.getInstance();
-				Pair<SharedSlot, Locality> p = getSlotForTaskInternal(constraint.getGroupId(),
-																		Collections.singleton(location), true);
+
+				TaskManagerLocation location = previous.getTaskManagerLocation();
+				Tuple2<SharedSlot, Locality> p = getSlotForTaskInternal(
+						constraint.getGroupId(), Collections.singleton(location), true);
+
 				if (p == null) {
 					return null;
 				}
 				else {
-					SharedSlot newSharedSlot = p.getLeft();
+					SharedSlot newSharedSlot = p.f0;
 
 					// allocate the co-location group slot inside the shared slot
 					SharedSlot constraintGroupSlot = newSharedSlot.allocateSharedSlot(constraint.getGroupId());
@@ -377,15 +364,15 @@ public class SlotSharingGroupAssignment {
 				// the location constraint has not been associated with a shared slot, yet.
 				// grab a new slot and initialize the constraint with that one.
 				// preferred locations are defined by the vertex
-				Pair<SharedSlot, Locality> p =
+				Tuple2<SharedSlot, Locality> p =
 						getSlotForTaskInternal(constraint.getGroupId(), locationPreferences, false);
 				if (p == null) {
 					// could not get a shared slot for this co-location-group
 					return null;
 				}
 				else {
-					final SharedSlot availableShared = p.getLeft();
-					final Locality l = p.getRight();
+					final SharedSlot availableShared = p.f0;
+					final Locality l = p.f1;
 
 					// allocate the co-location group slot inside the shared slot
 					SharedSlot constraintGroupSlot = availableShared.allocateSharedSlot(constraint.getGroupId());
@@ -405,9 +392,8 @@ public class SlotSharingGroupAssignment {
 	}
 
 
-	private Pair<SharedSlot, Locality> getSlotForTaskInternal(AbstractID groupId,
-																Iterable<Instance> preferredLocations,
-																boolean localOnly)
+	private Tuple2<SharedSlot, Locality> getSlotForTaskInternal(
+			AbstractID groupId, Iterable<TaskManagerLocation> preferredLocations, boolean localOnly)
 	{
 		// check if there is anything at all in this group assignment
 		if (allSlots.isEmpty()) {
@@ -415,15 +401,15 @@ public class SlotSharingGroupAssignment {
 		}
 
 		// get the available slots for the group
-		Map<Instance, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
+		Map<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupId);
 		
 		if (slotsForGroup == null) {
 			// we have a new group, so all slots are available
-			slotsForGroup = new LinkedHashMap<Instance, List<SharedSlot>>();
+			slotsForGroup = new LinkedHashMap<>();
 			availableSlotsPerJid.put(groupId, slotsForGroup);
 
 			for (SharedSlot availableSlot : allSlots) {
-				putIntoMultiMap(slotsForGroup, availableSlot.getInstance(), availableSlot);
+				putIntoMultiMap(slotsForGroup, availableSlot.getTaskManagerID(), availableSlot);
 			}
 		}
 		else if (slotsForGroup.isEmpty()) {
@@ -435,15 +421,15 @@ public class SlotSharingGroupAssignment {
 		boolean didNotGetPreferred = false;
 
 		if (preferredLocations != null) {
-			for (Instance location : preferredLocations) {
+			for (TaskManagerLocation location : preferredLocations) {
 
 				// set the flag that we failed a preferred location. If one will be found,
 				// we return early anyways and skip the flag evaluation
 				didNotGetPreferred = true;
 
-				SharedSlot slot = removeFromMultiMap(slotsForGroup, location);
+				SharedSlot slot = removeFromMultiMap(slotsForGroup, location.getResourceID());
 				if (slot != null && slot.isAlive()) {
-					return new ImmutablePair<SharedSlot, Locality>(slot, Locality.LOCAL);
+					return new Tuple2<>(slot, Locality.LOCAL);
 				}
 			}
 		}
@@ -459,7 +445,7 @@ public class SlotSharingGroupAssignment {
 		SharedSlot slot;
 		while ((slot = pollFromMultiMap(slotsForGroup)) != null) {
 			if (slot.isAlive()) {
-				return new ImmutablePair<SharedSlot, Locality>(slot, locality);
+				return new Tuple2<>(slot, locality);
 			}
 		}
 		
@@ -510,7 +496,7 @@ public class SlotSharingGroupAssignment {
 							// for that group again. otherwise, the slot is part of a
 							// co-location group and nothing becomes immediately available
 
-							Map<Instance, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID);
+							Map<ResourceID, List<SharedSlot>> slotsForJid = availableSlotsPerJid.get(groupID);
 
 							// sanity check
 							if (slotsForJid == null) {
@@ -518,7 +504,7 @@ public class SlotSharingGroupAssignment {
 										" when available slots indicated that all slots were available.");
 							}
 
-							putIntoMultiMap(slotsForJid, parent.getInstance(), parent);
+							putIntoMultiMap(slotsForJid, parent.getTaskManagerID(), parent);
 						}
 					} else {
 						// the parent shared slot is now empty and can be released
@@ -558,8 +544,6 @@ public class SlotSharingGroupAssignment {
 	/**
 	 * 
 	 * <p><b>NOTE: This method must be called from within a scope that holds the lock.</b></p>
-	 * 
-	 * @param sharedSlot
 	 */
 	private void internalDisposeEmptySharedSlot(SharedSlot sharedSlot) {
 		// sanity check
@@ -576,7 +560,7 @@ public class SlotSharingGroupAssignment {
 		
 		if (parent == null) {
 			// root slot, return to the instance.
-			sharedSlot.getInstance().returnAllocatedSlot(sharedSlot);
+			sharedSlot.getOwner().returnAllocatedSlot(sharedSlot);
 			
 			// also, make sure we remove this slot from everywhere
 			allSlots.remove(sharedSlot);
@@ -592,7 +576,7 @@ public class SlotSharingGroupAssignment {
 				
 				if (parentRemaining > 0) {
 					// the parent becomes available for the group again
-					Map<Instance, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupID);
+					Map<ResourceID, List<SharedSlot>> slotsForGroup = availableSlotsPerJid.get(groupID);
 
 					// sanity check
 					if (slotsForGroup == null) {
@@ -600,7 +584,7 @@ public class SlotSharingGroupAssignment {
 								" when available slots indicated that all slots were available.");
 					}
 
-					putIntoMultiMap(slotsForGroup, parent.getInstance(), parent);
+					putIntoMultiMap(slotsForGroup, parent.getTaskManagerID(), parent);
 					
 				}
 				else {
@@ -620,7 +604,7 @@ public class SlotSharingGroupAssignment {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private static void putIntoMultiMap(Map<Instance, List<SharedSlot>> map, Instance location, SharedSlot slot) {
+	private static void putIntoMultiMap(Map<ResourceID, List<SharedSlot>> map, ResourceID location, SharedSlot slot) {
 		List<SharedSlot> slotsForInstance = map.get(location);
 		if (slotsForInstance == null) {
 			slotsForInstance = new ArrayList<SharedSlot>();
@@ -629,7 +613,7 @@ public class SlotSharingGroupAssignment {
 		slotsForInstance.add(slot);
 	}
 	
-	private static SharedSlot removeFromMultiMap(Map<Instance, List<SharedSlot>> map, Instance location) {
+	private static SharedSlot removeFromMultiMap(Map<ResourceID, List<SharedSlot>> map, ResourceID location) {
 		List<SharedSlot> slotsForLocation = map.get(location);
 		
 		if (slotsForLocation == null) {
@@ -645,8 +629,8 @@ public class SlotSharingGroupAssignment {
 		}
 	}
 	
-	private static SharedSlot pollFromMultiMap(Map<Instance, List<SharedSlot>> map) {
-		Iterator<Map.Entry<Instance, List<SharedSlot>>> iter = map.entrySet().iterator();
+	private static SharedSlot pollFromMultiMap(Map<ResourceID, List<SharedSlot>> map) {
+		Iterator<Map.Entry<ResourceID, List<SharedSlot>>> iter = map.entrySet().iterator();
 		
 		while (iter.hasNext()) {
 			List<SharedSlot> slots = iter.next().getValue();
@@ -667,19 +651,19 @@ public class SlotSharingGroupAssignment {
 		return null;
 	}
 	
-	private static void removeSlotFromAllEntries(Map<AbstractID, Map<Instance, List<SharedSlot>>> availableSlots, 
-													SharedSlot slot)
+	private static void removeSlotFromAllEntries(
+			Map<AbstractID, Map<ResourceID, List<SharedSlot>>> availableSlots, SharedSlot slot)
 	{
-		final Instance instance = slot.getInstance();
+		final ResourceID taskManagerId = slot.getTaskManagerID();
 		
-		for (Map.Entry<AbstractID, Map<Instance, List<SharedSlot>>> entry : availableSlots.entrySet()) {
-			Map<Instance, List<SharedSlot>> map = entry.getValue();
+		for (Map.Entry<AbstractID, Map<ResourceID, List<SharedSlot>>> entry : availableSlots.entrySet()) {
+			Map<ResourceID, List<SharedSlot>> map = entry.getValue();
 
-			List<SharedSlot> list = map.get(instance);
+			List<SharedSlot> list = map.get(taskManagerId);
 			if (list != null) {
 				list.remove(slot);
 				if (list.isEmpty()) {
-					map.remove(instance);
+					map.remove(taskManagerId);
 				}
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
index fece894..c41f7bf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/CoLocationConstraint.java
@@ -18,34 +18,39 @@
 
 package org.apache.flink.runtime.jobmanager.scheduler;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.runtime.instance.Instance;
 
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.runtime.instance.SharedSlot;
 
+import static org.apache.flink.util.Preconditions.checkState;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * A CoLocationConstraint manages the location of a set of tasks
  * (Execution Vertices). In co-location groups, the different subtasks of
  * different JobVertices need to be executed on the same {@link Instance}.
  * This is realized by creating a special shared slot that holds these tasks.
  * 
- * <p>This class tracks the location and the shared slot for this set of tasks.</p>
+ * <p>This class tracks the location and the shared slot for this set of tasks.
  */
 public class CoLocationConstraint {
-	
+
 	private final CoLocationGroup group;
-	
+
 	private volatile SharedSlot sharedSlot;
-	
-	private volatile boolean locationLocked;
-	
-	
+
+	private volatile ResourceID lockedLocation;
+
+
 	CoLocationConstraint(CoLocationGroup group) {
 		Preconditions.checkNotNull(group);
 		this.group = group;
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Status & Properties
 	// ------------------------------------------------------------------------
@@ -77,7 +82,7 @@ public class CoLocationConstraint {
 	 * @return True if the location has been assigned, false otherwise.
 	 */
 	public boolean isAssigned() {
-		return locationLocked;
+		return lockedLocation != null;
 	}
 
 	/**
@@ -89,7 +94,7 @@ public class CoLocationConstraint {
 	 *         false otherwise.
 	 */
 	public boolean isAssignedAndAlive() {
-		return locationLocked && sharedSlot.isAlive();
+		return lockedLocation != null && sharedSlot.isAlive();
 	}
 
 	/**
@@ -100,9 +105,9 @@ public class CoLocationConstraint {
 	 * @return The instance describing the location for the tasks of this constraint.
 	 * @throws IllegalStateException Thrown if the location has not been assigned, yet.
 	 */
-	public Instance getLocation() {
-		if (locationLocked) {
-			return sharedSlot.getInstance();
+	public TaskManagerLocation getLocation() {
+		if (lockedLocation != null) {
+			return sharedSlot.getTaskManagerLocation();
 		} else {
 			throw new IllegalStateException("Location not yet locked");
 		}
@@ -125,18 +130,20 @@ public class CoLocationConstraint {
 	 *                                  the new slot is from a different location.
 	 */
 	public void setSharedSlot(SharedSlot newSlot) {
+		checkNotNull(newSlot);
+
 		if (this.sharedSlot == null) {
 			this.sharedSlot = newSlot;
 		}
 		else if (newSlot != this.sharedSlot){
-			if (locationLocked && this.sharedSlot.getInstance() != newSlot.getInstance()) {
+			if (lockedLocation != null && lockedLocation != newSlot.getTaskManagerID()) {
 				throw new IllegalArgumentException(
 						"Cannot assign different location to a constraint whose location is locked.");
 			}
 			if (this.sharedSlot.isAlive()) {
 				this.sharedSlot.releaseSlot();
 			}
-			
+
 			this.sharedSlot = newSlot;
 		}
 	}
@@ -149,13 +156,10 @@ public class CoLocationConstraint {
 	 *                               or is no slot has been set, yet.
 	 */
 	public void lockLocation() throws IllegalStateException {
-		if (locationLocked) {
-			throw new IllegalStateException("Location is already locked");
-		}
-		if (sharedSlot == null) {
-			throw new IllegalStateException("Cannot lock location without a slot.");
-		}
-		locationLocked = true;
+		checkState(lockedLocation == null, "Location is already locked");
+		checkState(sharedSlot != null, "Cannot lock location without a slot.");
+
+		lockedLocation = sharedSlot.getTaskManagerID();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
index 963fc4c..b481b55 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -37,6 +38,7 @@ import akka.dispatch.Futures;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.SlotSharingGroupAssignment;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.instance.SharedSlot;
@@ -45,6 +47,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.instance.Instance;
 import org.apache.flink.runtime.instance.InstanceDiedException;
 import org.apache.flink.runtime.instance.InstanceListener;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -78,7 +81,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	private final HashMap<String, Set<Instance>> allInstancesByHost = new HashMap<String, Set<Instance>>();
 	
 	/** All instances that still have available resources */
-	private final Queue<Instance> instancesWithAvailableResources = new SetQueue<Instance>();
+	private final Map<ResourceID, Instance> instancesWithAvailableResources = new LinkedHashMap<>();
 	
 	/** All tasks pending to be scheduled */
 	private final Queue<QueuedTask> taskQueue = new ArrayDeque<QueuedTask>();
@@ -163,7 +166,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 
 		final ExecutionVertex vertex = task.getTaskToExecute().getVertex();
 		
-		final Iterable<Instance> preferredLocations = vertex.getPreferredLocations();
+		final Iterable<TaskManagerLocation> preferredLocations = vertex.getPreferredLocations();
 		final boolean forceExternalLocation = vertex.isScheduleLocalOnly() &&
 									preferredLocations != null && preferredLocations.iterator().hasNext();
 	
@@ -222,7 +225,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 					
 					// our location preference is either determined by the location constraint, or by the
 					// vertex's preferred locations
-					final Iterable<Instance> locations;
+					final Iterable<TaskManagerLocation> locations;
 					final boolean localOnly;
 					if (constraint != null && constraint.isAssigned()) {
 						locations = Collections.singleton(constraint.getLocation());
@@ -341,7 +344,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	 * @return The instance to run the vertex on, it {@code null}, if no instance is available.
 	 */
 	protected SimpleSlot getFreeSlotForTask(ExecutionVertex vertex,
-											Iterable<Instance> requestedLocations,
+											Iterable<TaskManagerLocation> requestedLocations,
 											boolean localOnly) {
 		// we need potentially to loop multiple times, because there may be false positives
 		// in the set-with-available-instances
@@ -360,7 +363,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				
 				// if the instance has further available slots, re-add it to the set of available resources.
 				if (instanceToUse.hasResourcesAvailable()) {
-					this.instancesWithAvailableResources.add(instanceToUse);
+					this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), instanceToUse);
 				}
 				
 				if (slot != null) {
@@ -396,7 +399,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	 * @return A sub-slot for the given vertex, or {@code null}, if no slot is available.
 	 */
 	protected SimpleSlot getNewSlotForSharingGroup(ExecutionVertex vertex,
-													Iterable<Instance> requestedLocations,
+													Iterable<TaskManagerLocation> requestedLocations,
 													SlotSharingGroupAssignment groupAssignment,
 													CoLocationConstraint constraint,
 													boolean localOnly)
@@ -422,7 +425,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 
 				// if the instance has further available slots, re-add it to the set of available resources.
 				if (instanceToUse.hasResourcesAvailable()) {
-					this.instancesWithAvailableResources.add(instanceToUse);
+					this.instancesWithAvailableResources.put(instanceToUse.getResourceId(), instanceToUse);
 				}
 
 				if (sharedSlot != null) {
@@ -460,13 +463,13 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	 *                           no locality preference exists.   
 	 * @param localOnly Flag to indicate whether only one of the exact local instances can be chosen.  
 	 */
-	private Pair<Instance, Locality> findInstance(Iterable<Instance> requestedLocations, boolean localOnly){
+	private Pair<Instance, Locality> findInstance(Iterable<TaskManagerLocation> requestedLocations, boolean localOnly) {
 		
 		// drain the queue of newly available instances
 		while (this.newlyAvailableInstances.size() > 0) {
 			Instance queuedInstance = this.newlyAvailableInstances.poll();
 			if (queuedInstance != null) {
-				this.instancesWithAvailableResources.add(queuedInstance);
+				this.instancesWithAvailableResources.put(queuedInstance.getResourceId(), queuedInstance);
 			}
 		}
 		
@@ -475,15 +478,18 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 			return null;
 		}
 
-		Iterator<Instance> locations = requestedLocations == null ? null : requestedLocations.iterator();
+		Iterator<TaskManagerLocation> locations = requestedLocations == null ? null : requestedLocations.iterator();
 
 		if (locations != null && locations.hasNext()) {
 			// we have a locality preference
 
 			while (locations.hasNext()) {
-				Instance location = locations.next();
-				if (location != null && this.instancesWithAvailableResources.remove(location)) {
-					return new ImmutablePair<Instance, Locality>(location, Locality.LOCAL);
+				TaskManagerLocation location = locations.next();
+				if (location != null) {
+					Instance instance = instancesWithAvailableResources.remove(location.getResourceID());
+					if (instance != null) {
+						return new ImmutablePair<Instance, Locality>(instance, Locality.LOCAL);
+					}
 				}
 			}
 			
@@ -492,14 +498,21 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				return null;
 			}
 			else {
-				Instance instanceToUse = this.instancesWithAvailableResources.poll();
-				return new ImmutablePair<Instance, Locality>(instanceToUse, Locality.NON_LOCAL);
+				// take the first instance from the instances with resources
+				Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();
+				Instance instanceToUse = instances.next();
+				instances.remove();
+
+				return new ImmutablePair<>(instanceToUse, Locality.NON_LOCAL);
 			}
 		}
 		else {
 			// no location preference, so use some instance
-			Instance instanceToUse = this.instancesWithAvailableResources.poll();
-			return new ImmutablePair<Instance, Locality>(instanceToUse, Locality.UNCONSTRAINED);
+			Iterator<Instance> instances = instancesWithAvailableResources.values().iterator();
+			Instance instanceToUse = instances.next();
+			instances.remove();
+
+			return new ImmutablePair<>(instanceToUse, Locality.UNCONSTRAINED);
 		}
 	}
 	
@@ -570,7 +583,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 				}
 			}
 			else {
-				this.instancesWithAvailableResources.add(instance);
+				this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
 			}
 		}
 	}
@@ -643,11 +656,10 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 					allInstancesByHost.put(instanceHostName, instanceSet);
 				}
 				instanceSet.add(instance);
-				
-					
+
 				// add it to the available resources and let potential waiters know
-				this.instancesWithAvailableResources.add(instance);
-	
+				this.instancesWithAvailableResources.put(instance.getResourceId(), instance);
+
 				// add all slots as available
 				for (int i = 0; i < instance.getNumberOfAvailableSlots(); i++) {
 					newSlotAvailable(instance);
@@ -681,8 +693,8 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		}
 
 		allInstances.remove(instance);
-		instancesWithAvailableResources.remove(instance);
-		
+		instancesWithAvailableResources.remove(instance.getResourceId());
+
 		String instanceHostName = instance.getInstanceConnectionInfo().getHostname();
 		Set<Instance> instanceSet = allInstancesByHost.get(instanceHostName);
 		if (instanceSet != null) {
@@ -709,7 +721,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		synchronized (globalLock) {
 			processNewlyAvailableInstances();
 
-			for (Instance instance : instancesWithAvailableResources) {
+			for (Instance instance : instancesWithAvailableResources.values()) {
 				count += instance.getNumberOfAvailableSlots();
 			}
 		}
@@ -781,9 +793,9 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 		synchronized (globalLock) {
 			Instance instance;
 
-			while((instance = newlyAvailableInstances.poll()) != null){
-				if(instance.hasResourcesAvailable()){
-					instancesWithAvailableResources.add(instance);
+			while ((instance = newlyAvailableInstances.poll()) != null) {
+				if (instance.hasResourcesAvailable()) {
+					instancesWithAvailableResources.put(instance.getResourceId(), instance);
 				}
 			}
 		}
@@ -794,17 +806,17 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener {
 	//  Utilities
 	// ------------------------------------------------------------------------
 
-	private static String getHostnamesFromInstances(Iterable<Instance> instances) {
+	private static String getHostnamesFromInstances(Iterable<TaskManagerLocation> locations) {
 		StringBuilder bld = new StringBuilder();
 
 		boolean successive = false;
-		for (Instance i : instances) {
+		for (TaskManagerLocation loc : locations) {
 			if (successive) {
 				bld.append(", ");
 			} else {
 				successive = true;
 			}
-			bld.append(i.getInstanceConnectionInfo().getHostname());
+			bld.append(loc.getHostname());
 		}
 
 		return bld.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
index 31bd341..36e4072 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotAllocationFuture.java
@@ -20,73 +20,125 @@ package org.apache.flink.runtime.jobmanager.scheduler;
 
 import org.apache.flink.runtime.instance.SimpleSlot;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * 
+ */
 public class SlotAllocationFuture {
-	
+
 	private final Object monitor = new Object();
-	
+
 	private volatile SimpleSlot slot;
-	
+
 	private volatile SlotAllocationFutureAction action;
-	
+
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Creates a future that is uncompleted.
+	 */
 	public SlotAllocationFuture() {}
-	
+
+	/**
+	 * Creates a future that is immediately completed.
+	 * 
+	 * @param slot The task slot that completes the future.
+	 */
 	public SlotAllocationFuture(SimpleSlot slot) {
 		this.slot = slot;
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
-	public SimpleSlot waitTillAllocated() throws InterruptedException {
-		return waitTillAllocated(0);
-	}
-	
-	public SimpleSlot waitTillAllocated(long timeout) throws InterruptedException {
+
+	public SimpleSlot waitTillCompleted() throws InterruptedException {
 		synchronized (monitor) {
 			while (slot == null) {
-				monitor.wait(timeout);
+				monitor.wait();
+			}
+			return slot;
+		}
+	}
+
+	public SimpleSlot waitTillCompleted(long timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
+		checkArgument(timeout >= 0, "timeout may not be negative");
+		checkNotNull(timeUnit, "timeUnit");
+
+		if (timeout == 0) {
+			return waitTillCompleted();
+		} else {
+			final long deadline = System.nanoTime() + timeUnit.toNanos(timeout);
+			long millisToWait;
+
+			synchronized (monitor) {
+				while (slot == null && (millisToWait = (deadline - System.nanoTime()) / 1_000_000) > 0) {
+					monitor.wait(millisToWait);
+				}
+
+				if (slot != null) {
+					return slot;
+				} else {
+					throw new TimeoutException();
+				}
 			}
-			
+		}
+	}
+
+	/**
+	 * Gets the slot from this future. This method throws an exception, if the future has not been completed.
+	 * This method never blocks.
+	 * 
+	 * @return The slot with which this future was completed.
+	 * @throws IllegalStateException Thrown, if this method is called before the future is completed.
+	 */
+	public SimpleSlot get() {
+		final SimpleSlot slot = this.slot;
+		if (slot != null) {
 			return slot;
+		} else {
+			throw new IllegalStateException("The future is not complete - not slot available");
 		}
 	}
-	
+
 	public void setFutureAction(SlotAllocationFutureAction action) {
+		checkNotNull(action);
+
 		synchronized (monitor) {
-			if (this.action != null) {
-				throw new IllegalStateException("Future already has an action registered.");
-			}
-			
+			checkState(this.action == null, "Future already has an action registered.");
+
 			this.action = action;
-			
+
 			if (this.slot != null) {
 				action.slotAllocated(this.slot);
 			}
 		}
 	}
-	
+
+	/**
+	 * Completes the future with a slot.
+	 */
 	public void setSlot(SimpleSlot slot) {
-		if (slot == null) {
-			throw new NullPointerException();
-		}
-		
+		checkNotNull(slot);
+
 		synchronized (monitor) {
-			if (this.slot != null) {
-				throw new IllegalStateException("The future has already been assigned a slot.");
-			}
-			
+			checkState(this.slot == null, "The future has already been assigned a slot.");
+
 			this.slot = slot;
 			monitor.notifyAll();
-			
+
 			if (action != null) {
 				action.slotAllocated(slot);
 			}
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return slot == null ? "PENDING" : "DONE";

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
new file mode 100644
index 0000000..ad9c784
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotOwner.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.instance.Slot;
+
+/**
+ * Interface for components that hold slots and to which slots get released / recycled.
+ */
+public interface SlotOwner {
+
+	boolean returnAllocatedSlot(Slot slot);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
index 5a0faa5..01d0654 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManagerLocation.java
@@ -108,7 +108,7 @@ public class TaskManagerLocation implements Comparable<TaskManagerLocation>, jav
 		}
 
 		this.stringRepresentation = String.format(
-				"TaskManager (%s) @ %s (dataPort=%d)", resourceID, fqdnHostName, dataPort);
+				"%s @ %s (dataPort=%d)", resourceID, fqdnHostName, dataPort);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 0c62c69..2a0ecc2 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -650,7 +650,7 @@ class JobManager(
             val taskId = execution.getVertex.getParallelSubtaskIndex
 
             val host = if (slot != null) {
-              slot.getInstance().getInstanceConnectionInfo.getHostname
+              slot.getTaskManagerLocation().getHostname()
             } else {
               null
             }

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index 6a9b490..3947b17 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -249,7 +249,7 @@ trait TestingJobManagerLike extends FlinkActor {
             } else {
               sender ! decorateMessage(
                 WorkingTaskManager(
-                  Some(resource.getInstance().getActorGateway)
+                  Some(resource.getTaskManagerActorGateway())
                 )
               )
             }

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index cf7cf58..d8bd6cb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -117,7 +117,8 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 		ActorGateway actorGateway = mock(ActorGateway.class);
 
 		when(simpleSlot.isAlive()).thenReturn(true);
-		when(simpleSlot.getInstance()).thenReturn(instance);
+		when(simpleSlot.getTaskManagerID()).thenReturn(instance.getResourceId());
+		when(simpleSlot.getTaskManagerLocation()).thenReturn(instance.getInstanceConnectionInfo());
 		when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
 		when(simpleSlot.getRoot()).thenReturn(rootSlot);
 
@@ -152,6 +153,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 		assertNotNull(metric);
 		assertTrue(metric instanceof Gauge);
 
+		@SuppressWarnings("unchecked")
 		Gauge<Long> restartingTime = (Gauge<Long>) metric;
 
 		// check that the restarting time is 0 since it's the initial start

http://git-wip-us.apache.org/repos/asf/flink/blob/aaa474ad/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
index 91472ae..a1f3345 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/VertexLocationConstraintTest.java
@@ -92,8 +92,8 @@ public class VertexLocationConstraintTest {
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
 			ExecutionVertex[] vertices = ejv.getTaskVertices();
 			
-			vertices[0].setLocationConstraintHosts(Arrays.asList(instance1, instance2));
-			vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3));
+			vertices[0].setLocationConstraintHosts(Arrays.asList(instance1.getInstanceConnectionInfo(), instance2.getInstanceConnectionInfo()));
+			vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3.getInstanceConnectionInfo()));
 			
 			vertices[0].setScheduleLocalOnly(true);
 			vertices[1].setScheduleLocalOnly(true);
@@ -106,14 +106,14 @@ public class VertexLocationConstraintTest {
 			assertNotNull(slot1);
 			assertNotNull(slot2);
 			
-			Instance target1 = slot1.getInstance();
-			Instance target2 = slot2.getInstance();
+			ResourceID target1 = slot1.getTaskManagerID();
+			ResourceID target2 = slot2.getTaskManagerID();
 			
 			assertNotNull(target1);
 			assertNotNull(target2);
 			
-			assertTrue(target1 == instance1 || target1 == instance2);
-			assertTrue(target2 == instance3);
+			assertTrue(target1 == instance1.getResourceId() || target1 == instance2.getResourceId());
+			assertEquals(target2, instance3.getResourceId());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -165,8 +165,8 @@ public class VertexLocationConstraintTest {
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
 			ExecutionVertex[] vertices = ejv.getTaskVertices();
 			
-			vertices[0].setLocationConstraintHosts(Collections.singletonList(instance3));
-			vertices[1].setLocationConstraintHosts(Arrays.asList(instance1, instance2));
+			vertices[0].setLocationConstraintHosts(Collections.singletonList(instance3.getInstanceConnectionInfo()));
+			vertices[1].setLocationConstraintHosts(Arrays.asList(instance1.getInstanceConnectionInfo(), instance2.getInstanceConnectionInfo()));
 			
 			vertices[0].setScheduleLocalOnly(true);
 			vertices[1].setScheduleLocalOnly(true);
@@ -179,14 +179,11 @@ public class VertexLocationConstraintTest {
 			assertNotNull(slot1);
 			assertNotNull(slot2);
 			
-			Instance target1 = slot1.getInstance();
-			Instance target2 = slot2.getInstance();
+			ResourceID target1 = slot1.getTaskManagerID();
+			ResourceID target2 = slot2.getTaskManagerID();
 			
-			assertNotNull(target1);
-			assertNotNull(target2);
-			
-			assertTrue(target1 == instance3);
-			assertTrue(target2 == instance1 || target2 == instance2);
+			assertTrue(target1 == instance3.getResourceId());
+			assertTrue(target2 == instance1.getResourceId() || target2 == instance2.getResourceId());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -242,8 +239,8 @@ public class VertexLocationConstraintTest {
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID());
 			ExecutionVertex[] vertices = ejv.getTaskVertices();
 			
-			vertices[0].setLocationConstraintHosts(Arrays.asList(instance1, instance2));
-			vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3));
+			vertices[0].setLocationConstraintHosts(Arrays.asList(instance1.getInstanceConnectionInfo(), instance2.getInstanceConnectionInfo()));
+			vertices[1].setLocationConstraintHosts(Collections.singletonList(instance3.getInstanceConnectionInfo()));
 			
 			vertices[0].setScheduleLocalOnly(true);
 			vertices[1].setScheduleLocalOnly(true);
@@ -255,15 +252,12 @@ public class VertexLocationConstraintTest {
 			
 			assertNotNull(slot1);
 			assertNotNull(slot2);
-			
-			Instance target1 = slot1.getInstance();
-			Instance target2 = slot2.getInstance();
-			
-			assertNotNull(target1);
-			assertNotNull(target2);
-			
-			assertTrue(target1 == instance1 || target1 == instance2);
-			assertTrue(target2 == instance3);
+
+			ResourceID target1 = slot1.getTaskManagerID();
+			ResourceID target2 = slot2.getTaskManagerID();
+
+			assertTrue(target1 == instance1.getResourceId() || target1 == instance2.getResourceId());
+			assertTrue(target2 == instance3.getResourceId());
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -310,7 +304,7 @@ public class VertexLocationConstraintTest {
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex.getID());
 			ExecutionVertex[] vertices = ejv.getTaskVertices();
 			
-			vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2));
+			vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2.getInstanceConnectionInfo()));
 			vertices[0].setScheduleLocalOnly(true);
 			
 			try {
@@ -380,7 +374,7 @@ public class VertexLocationConstraintTest {
 			ExecutionJobVertex ejv = eg.getAllVertices().get(jobVertex1.getID());
 			ExecutionVertex[] vertices = ejv.getTaskVertices();
 			
-			vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2));
+			vertices[0].setLocationConstraintHosts(Collections.singletonList(instance2.getInstanceConnectionInfo()));
 			vertices[0].setScheduleLocalOnly(true);
 			
 			try {
@@ -420,7 +414,7 @@ public class VertexLocationConstraintTest {
 			ExecutionVertex ev = eg.getAllVertices().get(vertex.getID()).getTaskVertices()[0];
 			
 			Instance instance = ExecutionGraphTestUtils.getInstance(DummyActorGateway.INSTANCE);
-			ev.setLocationConstraintHosts(Collections.singletonList(instance));
+			ev.setLocationConstraintHosts(Collections.singletonList(instance.getInstanceConnectionInfo()));
 			
 			assertNotNull(ev.getPreferredLocations());
 			assertEquals(instance, ev.getPreferredLocations().iterator().next());


Mime
View raw message