flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [5/6] flink git commit: [FLINK-4987] Harden SlotPool on JobMaster
Date Tue, 01 Nov 2016 08:42:09 GMT
[FLINK-4987] Harden SlotPool on JobMaster


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b2731a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b2731a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b2731a0

Branch: refs/heads/flip-6
Commit: 8b2731a03681b9b74f5a640b3be14d53d22328cf
Parents: e287113
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Oct 21 16:51:34 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Nov 1 09:41:33 2016 +0100

----------------------------------------------------------------------
 .../clusterframework/types/ResourceProfile.java |   18 +-
 .../org/apache/flink/runtime/instance/Slot.java |   16 +
 .../flink/runtime/instance/SlotDescriptor.java  |  162 ---
 .../apache/flink/runtime/instance/SlotPool.java | 1020 +++++++++++-------
 .../flink/runtime/instance/SlotPoolGateway.java |   95 ++
 .../runtime/jobmanager/scheduler/Locality.java  |   26 +-
 .../scheduler/NoResourceAvailableException.java |    6 +-
 .../runtime/jobmanager/slots/AllocatedSlot.java |   56 +-
 .../jobmanager/slots/PooledSlotProvider.java    |   73 --
 .../jobmanager/slots/SlotAndLocality.java       |   55 +
 .../flink/runtime/jobmaster/JobMaster.java      |   99 +-
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |   18 +-
 .../apache/flink/runtime/util/clock/Clock.java  |   40 +
 .../flink/runtime/util/clock/SystemClock.java   |   57 +
 .../types/ResourceProfileTest.java              |    5 +
 .../runtime/instance/AllocatedSlotsTest.java    |  270 ++---
 .../runtime/instance/AvailableSlotsTest.java    |  247 +++--
 .../flink/runtime/instance/SlotPoolTest.java    |  596 +++++-----
 .../runtime/minicluster/MiniClusterITCase.java  |   13 +-
 19 files changed, 1639 insertions(+), 1233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b2731a0/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 7a25de1..ddc7547 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -18,14 +18,21 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import javax.annotation.Nonnull;
 import java.io.Serializable;
 
 /**
  * Describe the resource profile of the slot, either when requiring or offering it. The profile can be
  * checked whether it can match another profile's requirement, and furthermore we may calculate a matching
  * score to decide which profile we should choose when we have lots of candidate slots.
+ * 
+ * <p>Resource Profiles have a total ordering, defined by comparing these fields in sequence:
+ * <ol>
+ *     <li>Memory Size</li>
+ *     <li>CPU cores</li>
+ * </ol>
  */
-public class ResourceProfile implements Serializable {
+public class ResourceProfile implements Serializable, Comparable<ResourceProfile> {
 
 	private static final long serialVersionUID = 1L;
 
@@ -90,11 +97,18 @@ public class ResourceProfile implements Serializable {
 		return cpuCores >= required.getCpuCores() && memoryInMB >= required.getMemoryInMB();
 	}
 
+	@Override
+	public int compareTo(@Nonnull ResourceProfile other) {
+		int cmp1 = Long.compare(this.memoryInMB, other.memoryInMB);
+		int cmp2 = Double.compare(this.cpuCores, other.cpuCores);
+		return (cmp1 != 0) ? cmp1 : cmp2; 
+	}
+
 	// ------------------------------------------------------------------------
 
 	@Override
 	public int hashCode() {
-		long cpuBits = Double.doubleToLongBits(cpuCores);
+		long cpuBits = Double.doubleToRawLongBits(cpuCores);
 		return (int) (cpuBits ^ (cpuBits >>> 32) ^ memoryInMB ^ (memoryInMB >> 32));
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2731a0/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 8f8b897..d6d8f12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -350,6 +350,22 @@ public abstract class Slot {
 	//  Utilities
 	// --------------------------------------------------------------------------------------------
 
+	/**
+	 * Slots must always has based on reference identity.
+	 */
+	@Override
+	public final int hashCode() {
+		return super.hashCode();
+	}
+
+	/**
+	 * Slots must always compare on referential equality.
+	 */
+	@Override
+	public final boolean equals(Object obj) {
+		return this == obj;
+	}
+
 	@Override
 	public String toString() {
 		return hierarchy() + " - " + getTaskManagerLocation() + " - " + getStateName(status);

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2731a0/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
deleted file mode 100644
index 47ce422..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The description of slots, TaskManagers offer one or more task slots, which define a slice of
- * their resources. This description will contain some static information about the slot, such
- * as the location and numeric id of the slot, rpc gateway to communicate with the TaskManager which
- * owns the slot.
- */
-public class SlotDescriptor {
-
-	/** The ID of the job this slice belongs to. */
-	private final JobID jobID;
-
-	/** The location information of the TaskManager to which this slot belongs */
-	private final TaskManagerLocation taskManagerLocation;
-
-	/** The number of the slot on which the task is deployed */
-	private final int slotNumber;
-
-	/** The resource profile of the slot provides */
-	private final ResourceProfile resourceProfile;
-
-	/** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */
-	private final TaskManagerGateway taskManagerGateway;
-
-	public SlotDescriptor(
-		final JobID jobID,
-		final TaskManagerLocation location,
-		final int slotNumber,
-		final ResourceProfile resourceProfile,
-		final TaskManagerGateway taskManagerGateway)
-	{
-		this.jobID = checkNotNull(jobID);
-		this.taskManagerLocation = checkNotNull(location);
-		this.slotNumber = slotNumber;
-		this.resourceProfile = checkNotNull(resourceProfile);
-		this.taskManagerGateway = checkNotNull(taskManagerGateway);
-	}
-
-	public SlotDescriptor(final SlotDescriptor other) {
-		this.jobID = other.jobID;
-		this.taskManagerLocation = other.taskManagerLocation;
-		this.slotNumber = other.slotNumber;
-		this.resourceProfile = other.resourceProfile;
-		this.taskManagerGateway = other.taskManagerGateway;
-	}
-	
-	// TODO - temporary workaround until we have the SlotDesriptor in the Slot
-	public SlotDescriptor(final Slot slot) {
-		this.jobID = slot.getJobID();
-		this.taskManagerLocation = slot.getTaskManagerLocation();
-		this.slotNumber = slot.getRootSlotNumber();
-		this.resourceProfile = new ResourceProfile(0, 0);
-		this.taskManagerGateway = slot.getTaskManagerGateway();
-	}
-
-	/**
-	 * Returns the ID of the job this allocated slot belongs to.
-	 *
-	 * @return the ID of the job this allocated slot belongs to
-	 */
-	public JobID getJobID() {
-		return jobID;
-	}
-
-	/**
-	 * Gets the number of the slot.
-	 *
-	 * @return The number of the slot on the TaskManager.
-	 */
-	public int getSlotNumber() {
-		return slotNumber;
-	}
-
-	/**
-	 * Gets the resource profile of the slot.
-	 *
-	 * @return The resource profile of the slot.
-	 */
-	public ResourceProfile getResourceProfile() {
-		return resourceProfile;
-	}
-
-	/**
-	 * Gets the location info of the TaskManager that offers this slot.
-	 *
-	 * @return The location info of the TaskManager that offers this slot
-	 */
-	public TaskManagerLocation getTaskManagerLocation() {
-		return taskManagerLocation;
-	}
-
-	/**
-	 * Gets the actor gateway that can be used to send messages to the TaskManager.
-	 * <p>
-	 * This method should be removed once the new interface-based RPC abstraction is in place
-	 *
-	 * @return The actor gateway that can be used to send messages to the TaskManager.
-	 */
-	public TaskManagerGateway getTaskManagerGateway() {
-		return taskManagerGateway;
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		SlotDescriptor that = (SlotDescriptor) o;
-
-		if (slotNumber != that.slotNumber) {
-			return false;
-		}
-		if (!jobID.equals(that.jobID)) {
-			return false;
-		}
-		return taskManagerLocation.equals(that.taskManagerLocation);
-
-	}
-
-	@Override
-	public int hashCode() {
-		int result = jobID.hashCode();
-		result = 31 * result + taskManagerLocation.hashCode();
-		result = 31 * result + slotNumber;
-		return result;
-	}
-
-	@Override
-	public String toString() {
-		return taskManagerLocation + " - " + slotNumber;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2731a0/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 44df29b..7c298ce 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -25,25 +25,41 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
 import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
 import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.runtime.util.clock.SystemClock;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -58,18 +74,33 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * <p>
  * All the allocation or the slot offering will be identified by self generated AllocationID, we will use it to
  * eliminate ambiguities.
+ * 
+ * TODO : Make pending requests location preference aware
+ * TODO : Make pass location preferences to ResourceManager when sending a slot request
  */
-public class SlotPool implements SlotOwner {
+public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
+
+	/** The log for the pool - shared also with the internal classes */
+	static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
+
+	// ------------------------------------------------------------------------
 
-	private static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
+	private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = Time.minutes(5);
+
+	private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = Time.minutes(10);
+
+	private static final Time DEFAULT_RM_REQUEST_TIMEOUT = Time.seconds(10);
+
+	// ------------------------------------------------------------------------
 
 	private final Object lock = new Object();
 
-	/** The executor which is used to execute futures */
-	private final Executor executor;
+	private final JobID jobId;
 
-	/** All registered resources, slots will be accepted and used only if the resource is registered */
-	private final Set<ResourceID> registeredResources;
+	private final ProviderAndOwner providerAndOwner;
+
+	/** All registered TaskManagers, slots will be accepted and used only if the resource is registered */
+	private final HashSet<ResourceID> registeredTaskManagers;
 
 	/** The book-keeping of all allocated slots */
 	private final AllocatedSlots allocatedSlots;
@@ -78,10 +109,15 @@ public class SlotPool implements SlotOwner {
 	private final AvailableSlots availableSlots;
 
 	/** All pending requests waiting for slots */
-	private final Map<AllocationID, Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>>> pendingRequests;
+	private final HashMap<AllocationID, PendingRequest> pendingRequests;
+
+	/** Timeout for request calls to the ResourceManager */
+	private final Time resourceManagerRequestsTimeout;
 
-	/** Timeout of slot allocation */
-	private final Time timeout;
+	/** Timeout for allocation round trips (RM -> launch TM -> offer slot) */
+	private final Time resourceManagerAllocationTimeout;
+
+	private final Clock clock;
 
 	/** the leader id of job manager */
 	private UUID jobManagerLeaderId;
@@ -92,177 +128,232 @@ public class SlotPool implements SlotOwner {
 	/** The gateway to communicate with resource manager */
 	private ResourceManagerGateway resourceManagerGateway;
 
-	public SlotPool(final Executor executor) {
-		this.executor = executor;
-		this.registeredResources = new HashSet<>();
+	// ------------------------------------------------------------------------
+
+	public SlotPool(RpcService rpcService, JobID jobId) {
+		this(rpcService, jobId, SystemClock.getInstance(),
+				DEFAULT_SLOT_REQUEST_TIMEOUT, DEFAULT_RM_ALLOCATION_TIMEOUT, DEFAULT_RM_REQUEST_TIMEOUT);
+	}
+
+	public SlotPool(
+			RpcService rpcService,
+			JobID jobId,
+			Clock clock,
+			Time slotRequestTimeout,
+			Time resourceManagerAllocationTimeout,
+			Time resourceManagerRequestTimeout) {
+
+		super(rpcService);
+
+		this.jobId = checkNotNull(jobId);
+		this.clock = checkNotNull(clock);
+		this.resourceManagerRequestsTimeout = checkNotNull(resourceManagerRequestTimeout);
+		this.resourceManagerAllocationTimeout = checkNotNull(resourceManagerAllocationTimeout);
+
+		this.registeredTaskManagers = new HashSet<>();
 		this.allocatedSlots = new AllocatedSlots();
 		this.availableSlots = new AvailableSlots();
 		this.pendingRequests = new HashMap<>();
-		this.timeout = Time.of(5, TimeUnit.SECONDS);
-	}
 
-	public void setJobManagerLeaderId(final UUID jobManagerLeaderId) {
-		this.jobManagerLeaderId = jobManagerLeaderId;
+		this.providerAndOwner = new ProviderAndOwner(getSelf(), slotRequestTimeout);
 	}
 
 	// ------------------------------------------------------------------------
-	//  Slot Allocation
+	//  Starting and Stopping
 	// ------------------------------------------------------------------------
 
+	@Override
+	public void start() {
+		throw new UnsupportedOperationException("Should never call start() without leader ID");
+	}
+
 	/**
-	 * Try to allocate a simple slot with specified resource profile.
+	 * Start the slot pool to accept RPC calls.
 	 *
-	 * @param jobID           The job id which the slot allocated for
-	 * @param resourceProfile The needed resource profile
-	 * @return The future of allocated simple slot
+	 * @param jobManagerLeaderId The necessary leader id for running the job.
 	 */
-	public Future<SimpleSlot> allocateSimpleSlot(final JobID jobID, final ResourceProfile resourceProfile) {
-		return allocateSimpleSlot(jobID, resourceProfile, new AllocationID());
+	public void start(UUID jobManagerLeaderId) {
+		this.jobManagerLeaderId = jobManagerLeaderId;
+		super.start();
 	}
 
-
 	/**
-	 * Try to allocate a simple slot with specified resource profile and specified allocation id. It's mainly
-	 * for testing purpose since we need to specify whatever allocation id we want.
+	 * Suspends this pool, meaning it has lost its authority to accept and distribute slots.
 	 */
-	@VisibleForTesting
-	Future<SimpleSlot> allocateSimpleSlot(
-		final JobID jobID,
-		final ResourceProfile resourceProfile,
-		final AllocationID allocationID)
-	{
-		final FlinkCompletableFuture<SlotDescriptor> future = new FlinkCompletableFuture<>();
-
-		internalAllocateSlot(jobID, allocationID, resourceProfile, future);
-
-		return future.thenApplyAsync(
-			new ApplyFunction<SlotDescriptor, SimpleSlot>() {
-				@Override
-				public SimpleSlot apply(SlotDescriptor descriptor) {
-					SimpleSlot slot = new SimpleSlot(
-							descriptor.getJobID(), SlotPool.this,
-							descriptor.getTaskManagerLocation(), descriptor.getSlotNumber(),
-							descriptor.getTaskManagerGateway());
-					synchronized (lock) {
-						// double validation since we are out of the lock protection after the slot is granted
-						if (registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID())) {
-							LOG.info("Allocation[{}] Allocated simple slot: {} for job {}.", allocationID, slot, jobID);
-							allocatedSlots.add(allocationID, descriptor, slot);
-						}
-						else {
-							throw new RuntimeException("Resource was marked dead asynchronously.");
-						}
-					}
-					return slot;
-				}
-			},
-			executor
-		);
+	@RpcMethod
+	public void suspend() {
+		validateRunsInMainThread();
+
+		// suspend this RPC endpoint
+		((StartStoppable) getSelf()).stop();
+
+		// do not accept any requests
+		jobManagerLeaderId = null;
+		resourceManagerLeaderId = null;
+		resourceManagerGateway = null;
+
+		// Clear (but not release!) the available slots. The TaskManagers should re-register them
+		// at the new leader JobManager/SlotPool
+		availableSlots.clear();
+		allocatedSlots.clear();
+		pendingRequests.clear();
 	}
 
+	// ------------------------------------------------------------------------
+	//  Getting PoolOwner and PoolProvider
+	// ------------------------------------------------------------------------
 
 	/**
-	 * Try to allocate a shared slot with specified resource profile.
-	 *
-	 * @param jobID                  The job id which the slot allocated for
-	 * @param resourceProfile        The needed resource profile
-	 * @param sharingGroupAssignment The slot sharing group of the vertex
-	 * @return The future of allocated shared slot
+	 * Gets the slot owner implementation for this pool.
+	 * 
+	 * <p>This method does not mutate state and can be called directly (no RPC indirection)
+	 * 
+	 * @return The slot owner implementation for this pool.
 	 */
-	public Future<SharedSlot> allocateSharedSlot(
-		final JobID jobID,
-		final ResourceProfile resourceProfile,
-		final SlotSharingGroupAssignment sharingGroupAssignment)
-	{
-		return allocateSharedSlot(jobID, resourceProfile, sharingGroupAssignment, new AllocationID());
+	public SlotOwner getSlotOwner() {
+		return providerAndOwner;
 	}
 
 	/**
-	 * Try to allocate a shared slot with specified resource profile and specified allocation id. It's mainly
-	 * for testing purpose since we need to specify whatever allocation id we want.
+	 * Gets the slot provider implementation for this pool.
+	 *
+	 * <p>This method does not mutate state and can be called directly (no RPC indirection)
+	 *
+	 * @return The slot provider implementation for this pool.
 	 */
-	@VisibleForTesting
-	Future<SharedSlot> allocateSharedSlot(
-		final JobID jobID,
-		final ResourceProfile resourceProfile,
-		final SlotSharingGroupAssignment sharingGroupAssignment,
-		final AllocationID allocationID)
-	{
-		final FlinkCompletableFuture<SlotDescriptor> future = new FlinkCompletableFuture<>();
-
-		internalAllocateSlot(jobID, allocationID, resourceProfile, future);
-
-		return future.thenApplyAsync(
-			new ApplyFunction<SlotDescriptor, SharedSlot>() {
-				@Override
-				public SharedSlot apply(SlotDescriptor descriptor) {
-					SharedSlot slot = new SharedSlot(
-							descriptor.getJobID(), SlotPool.this, descriptor.getTaskManagerLocation(),
-							descriptor.getSlotNumber(), descriptor.getTaskManagerGateway(),
-							sharingGroupAssignment);
-
-					synchronized (lock) {
-						// double validation since we are out of the lock protection after the slot is granted
-						if (registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID())) {
-							LOG.info("Allocation[{}] Allocated shared slot: {} for job {}.", allocationID, slot, jobID);
-							allocatedSlots.add(allocationID, descriptor, slot);
-						}
-						else {
-							throw new RuntimeException("Resource was marked dead asynchronously.");
-						}
-					}
-					return slot;
-				}
-			},
-			executor
-		);
+	public SlotProvider getSlotProvider() {
+		return providerAndOwner;
 	}
 
-	/**
-	 * Internally allocate the slot with specified resource profile. We will first check whether we have some
-	 * free slot which can meet the requirement already and allocate it immediately. Otherwise, we will try to
-	 * allocation the slot from resource manager.
-	 */
-	private void internalAllocateSlot(
-		final JobID jobID,
-		final AllocationID allocationID,
-		final ResourceProfile resourceProfile,
-		final FlinkCompletableFuture<SlotDescriptor> future)
-	{
-		LOG.info("Allocation[{}] Allocating slot with {} for Job {}.", allocationID, resourceProfile, jobID);
-
-		synchronized (lock) {
-			// check whether we have any free slot which can match the required resource profile
-			SlotDescriptor freeSlot = availableSlots.poll(resourceProfile);
-			if (freeSlot != null) {
-				future.complete(freeSlot);
-			}
-			else {
-				if (resourceManagerGateway != null) {
-					LOG.info("Allocation[{}] No available slot exists, trying to allocate from resource manager.",
-						allocationID);
-					SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
-					pendingRequests.put(allocationID, new Tuple2<>(slotRequest, future));
-					resourceManagerGateway.requestSlot(jobManagerLeaderId, resourceManagerLeaderId, slotRequest, timeout)
-						.handleAsync(new BiFunction<RMSlotRequestReply, Throwable, Void>() {
-							@Override
-							public Void apply(RMSlotRequestReply slotRequestReply, Throwable throwable) {
-								if (throwable != null) {
-									future.completeExceptionally(
-										new Exception("Slot allocation from resource manager failed", throwable));
-								} else if (slotRequestReply instanceof RMSlotRequestRejected) {
-									future.completeExceptionally(
-										new Exception("Slot allocation rejected by resource manager"));
-								}
-								return null;
-							}
-						}, executor);
+	// ------------------------------------------------------------------------
+	//  Resource Manager Connection
+	// ------------------------------------------------------------------------
+
+	@RpcMethod
+	public void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway) {
+		this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId);
+		this.resourceManagerGateway = checkNotNull(resourceManagerGateway);
+	}
+
+	@RpcMethod
+	public void disconnectResourceManager() {
+		this.resourceManagerLeaderId = null;
+		this.resourceManagerGateway = null;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Slot Allocation
+	// ------------------------------------------------------------------------
+
+	@RpcMethod
+	public Future<SimpleSlot> allocateSlot(
+			ScheduledUnit task,
+			ResourceProfile resources,
+			Iterable<TaskManagerLocation> locationPreferences) {
+
+		return internalAllocateSlot(task, resources, locationPreferences);
+	}
+
+	@RpcMethod
+	public void returnAllocatedSlot(Slot slot) {
+		internalReturnAllocatedSlot(slot);
+	}
+
+
+	Future<SimpleSlot> internalAllocateSlot(
+			ScheduledUnit task,
+			ResourceProfile resources,
+			Iterable<TaskManagerLocation> locationPreferences) {
+
+		// (1) do we have a slot available already?
+		SlotAndLocality slotFromPool = availableSlots.poll(resources, locationPreferences);
+		if (slotFromPool != null) {
+			SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), slotFromPool.locality());
+			allocatedSlots.add(slot);
+			return FlinkCompletableFuture.completed(slot);
+		}
+
+		// (2) no slot available, and no resource manager connection
+		if (resourceManagerGateway == null) {
+			return FlinkCompletableFuture.completedExceptionally(
+					new NoResourceAvailableException("not connected to ResourceManager and no slot available"));
+			
+		}
+
+		// (3) we have a resource manager connection, so let's ask it for more resources
+		final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>();
+		final AllocationID allocationID = new AllocationID();
+
+		LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resources, allocationID);
+
+		pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources));
+
+		Future<RMSlotRequestReply> rmResponse = resourceManagerGateway.requestSlot(
+				resourceManagerLeaderId, jobManagerLeaderId,
+				new SlotRequest(jobId, allocationID, resources),
+				resourceManagerRequestsTimeout);
+
+		// on success, trigger let the slot pool know
+		rmResponse.thenAcceptAsync(new AcceptFunction<RMSlotRequestReply>() {
+			@Override
+			public void accept(RMSlotRequestReply reply) {
+				if (reply.getAllocationID() != null && reply.getAllocationID().equals(allocationID)) {
+					if (reply instanceof RMSlotRequestRegistered) {
+						slotRequestToResourceManagerSuccess(allocationID);
+					}
+					else if (reply instanceof RMSlotRequestRejected) {
+						slotRequestToResourceManagerFailed(allocationID,
+								new Exception("ResourceManager rejected slot request"));
+					}
+					else {
+						slotRequestToResourceManagerFailed(allocationID, 
+								new Exception("Unknown ResourceManager response: " + reply));
+					}
 				}
 				else {
-					LOG.warn("Allocation[{}] Resource manager not available right now.", allocationID);
-					future.completeExceptionally(new Exception("Resource manager not available right now."));
+					future.completeExceptionally(new Exception(String.format(
+							"Bug: ResourceManager response had wrong AllocationID. Request: %s , Response: %s", 
+							allocationID, reply.getAllocationID())));
 				}
 			}
+		}, getMainThreadExecutor());
+
+		// on failure, fail the request future
+		rmResponse.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+
+			@Override
+			public Void apply(Throwable failure) {
+				slotRequestToResourceManagerFailed(allocationID, failure);
+				return null;
+			}
+		}, getMainThreadExecutor());
+
+		return future;
+	}
+
+	private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) {
+		// a request is pending from the ResourceManager to a (future) TaskManager
+		// we only add the watcher here in case that request times out
+		scheduleRunAsync(new Runnable() {
+			@Override
+			public void run() {
+				checkTimeoutSlotAllocation(allocationID);
+			}
+		}, resourceManagerAllocationTimeout);
+	}
+
+	private void slotRequestToResourceManagerFailed(AllocationID allocationID, Throwable failure) {
+		PendingRequest request = pendingRequests.remove(allocationID);
+		if (request != null) {
+			request.future().completeExceptionally(new NoResourceAvailableException(
+					"No pooled slot available and request to ResourceManager for new slot failed", failure));
+		}
+	}
+
+	private void checkTimeoutSlotAllocation(AllocationID allocationID) {
+		PendingRequest request = pendingRequests.remove(allocationID);
+		if (request != null && !request.future().isDone()) {
+			request.future().completeExceptionally(new TimeoutException("Slot allocation request timed out"));
 		}
 	}
 
@@ -275,123 +366,123 @@ public class SlotPool implements SlotOwner {
 	 * slot can be reused by other pending requests if the resource profile matches.n
 	 *
 	 * @param slot The slot needs to be returned
-	 * @return True if the returning slot been accepted
 	 */
-	@Override
-	public boolean returnAllocatedSlot(Slot slot) {
+	private void internalReturnAllocatedSlot(Slot slot) {
 		checkNotNull(slot);
 		checkArgument(!slot.isAlive(), "slot is still alive");
-		checkArgument(slot.getOwner() == this, "slot belongs to the wrong pool.");
+		checkArgument(slot.getOwner() == providerAndOwner, "slot belongs to the wrong pool.");
 
+		// markReleased() is an atomic check-and-set operation, so that the slot is guaranteed
+		// to be returned only once
 		if (slot.markReleased()) {
-			synchronized (lock) {
-				final SlotDescriptor slotDescriptor = allocatedSlots.remove(slot);
-				if (slotDescriptor != null) {
-					// check if this TaskManager is valid
-					if (!registeredResources.contains(slot.getTaskManagerID())) {
-						return false;
-					}
-
-					final FlinkCompletableFuture<SlotDescriptor> pendingRequest = pollPendingRequest(slotDescriptor);
-					if (pendingRequest != null) {
-						pendingRequest.complete(slotDescriptor);
-					}
-					else {
-						availableSlots.add(slotDescriptor);
-					}
-
-					return true;
+			if (allocatedSlots.remove(slot)) {
+				// this slot allocation is still valid, use the slot to fulfill another request
+				// or make it available again
+				final AllocatedSlot taskManagerSlot = slot.getAllocatedSlot();
+				final PendingRequest pendingRequest = pollMatchingPendingRequest(taskManagerSlot);
+	
+				if (pendingRequest != null) {
+					LOG.debug("Fulfilling pending request [{}] early with returned slot [{}]",
+							pendingRequest.allocationID(), taskManagerSlot.getSlotAllocationId());
+
+					pendingRequest.future().complete(createSimpleSlot(taskManagerSlot, Locality.UNKNOWN));
 				}
 				else {
-					throw new IllegalArgumentException("Slot was not allocated from this pool.");
+					LOG.debug("Adding returned slot [{}] to available slots", taskManagerSlot.getSlotAllocationId());
+					availableSlots.add(taskManagerSlot, clock.relativeTimeMillis());
 				}
 			}
-		}
-		else {
-			return false;
+			else {
+				LOG.debug("Returned slot's allocation has been failed. Dropping slot.");
+			}
 		}
 	}
 
-	private FlinkCompletableFuture<SlotDescriptor> pollPendingRequest(final SlotDescriptor slotDescriptor) {
-		for (Map.Entry<AllocationID, Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>>> entry : pendingRequests.entrySet()) {
-			final Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>> pendingRequest = entry.getValue();
-			if (slotDescriptor.getResourceProfile().isMatching(pendingRequest.f0.getResourceProfile())) {
-				pendingRequests.remove(entry.getKey());
-				return pendingRequest.f1;
+	private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) {
+		final ResourceProfile slotResources = slot.getResourceProfile();
+
+		for (PendingRequest request : pendingRequests.values()) {
+			if (slotResources.isMatching(request.resourceProfile())) {
+				pendingRequests.remove(request.allocationID());
+				return request;
 			}
 		}
+
+		// no request pending, or no request matches
 		return null;
 	}
 
-	/**
-	 * Release slot to TaskManager, called for finished tasks or canceled jobs.
-	 *
-	 * @param slot The slot needs to be released.
-	 */
-	public void releaseSlot(final Slot slot) {
-		synchronized (lock) {
-			allocatedSlots.remove(slot);
-			availableSlots.remove(new SlotDescriptor(slot));
-			// TODO: send release request to task manager
+	@RpcMethod
+	public Iterable<SlotOffer> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers) {
+		validateRunsInMainThread();
+
+		final ArrayList<SlotOffer> result = new ArrayList<>();
+		for (Tuple2<AllocatedSlot, SlotOffer> offer : offers) {
+			if (offerSlot(offer.f0)) {
+				result.add(offer.f1);
+			}
 		}
-	}
 
+		return result.isEmpty() ? Collections.<SlotOffer>emptyList() : result;
+	}
+	
 	/**
 	 * Slot offering by TaskManager with AllocationID. The AllocationID is originally generated by this pool and
 	 * transfer through the ResourceManager to TaskManager. We use it to distinguish the different allocation
 	 * we issued. Slot offering may be rejected if we find something mismatching or there is actually no pending
 	 * request waiting for this slot (maybe fulfilled by some other returned slot).
 	 *
-	 * @param allocationID   The allocation id of the lo
-	 * @param slotDescriptor The offered slot descriptor
+	 * @param slot The offered slot
 	 * @return True if we accept the offering
 	 */
-	public boolean offerSlot(final AllocationID allocationID, final SlotDescriptor slotDescriptor) {
-		synchronized (lock) {
-			// check if this TaskManager is valid
-			final ResourceID resourceID = slotDescriptor.getTaskManagerLocation().getResourceID();
-			if (!registeredResources.contains(resourceID)) {
-				LOG.warn("Allocation[{}] Slot offering from unregistered TaskManager: {}",
-					allocationID, slotDescriptor);
-				return false;
-			}
+	@RpcMethod
+	public boolean offerSlot(final AllocatedSlot slot) {
+		validateRunsInMainThread();
 
-			// check whether we have already using this slot
-			final Slot allocatedSlot = allocatedSlots.get(allocationID);
-			if (allocatedSlot != null) {
-				final SlotDescriptor allocatedSlotDescriptor = new SlotDescriptor(allocatedSlot);
+		// check if this TaskManager is valid
+		final ResourceID resourceID = slot.getTaskManagerId();
+		final AllocationID allocationID = slot.getSlotAllocationId();
 
-				if (allocatedSlotDescriptor.equals(slotDescriptor)) {
-					LOG.debug("Allocation[{}] Duplicated slot offering: {}",
-						allocationID, slotDescriptor);
-					return true;
-				}
-				else {
-					LOG.info("Allocation[{}] Allocation had been fulfilled by slot {}, rejecting offered slot {}",
-						allocationID, allocatedSlotDescriptor, slotDescriptor);
-					return false;
-				}
-			}
+		if (!registeredTaskManagers.contains(resourceID)) {
+			LOG.debug("Received outdated slot offering [{}] from unregistered TaskManager: {}",
+					slot.getSlotAllocationId(), slot);
+			return false;
+		}
 
-			// check whether we already have this slot in free pool
-			if (availableSlots.contains(slotDescriptor)) {
-				LOG.debug("Allocation[{}] Duplicated slot offering: {}",
-					allocationID, slotDescriptor);
-				return true;
-			}
+		// check whether we have already using this slot
+		if (allocatedSlots.contains(allocationID) || availableSlots.contains(allocationID)) {
+			LOG.debug("Received repeated offer for slot [{}]. Ignoring.", allocationID);
 
-			// check whether we have request waiting for this slot
-			if (pendingRequests.containsKey(allocationID)) {
-				FlinkCompletableFuture<SlotDescriptor> future = pendingRequests.remove(allocationID).f1;
-				future.complete(slotDescriptor);
-				return true;
-			}
+			// return true here so that the sender will get a positive acknowledgement to the retry
+			// and mark the offering as a success
+			return true;
+		}
 
-			// unwanted slot, rejecting this offer
-			return false;
+		// check whether we have request waiting for this slot
+		PendingRequest pendingRequest = pendingRequests.remove(allocationID);
+		if (pendingRequest != null) {
+			// we were waiting for this!
+			SimpleSlot resultSlot = createSimpleSlot(slot, Locality.UNKNOWN);
+			pendingRequest.future().complete(resultSlot);
+			allocatedSlots.add(resultSlot);
 		}
+		else {
+			// we were actually not waiting for this:
+			//   - could be that this request had been fulfilled
+			//   - we are receiving the slots from TaskManagers after becoming leaders
+			availableSlots.add(slot, clock.relativeTimeMillis());
+		}
+
+		// we accepted the request in any case. slot will be released after it idled for
+		// too long and timed out
+		return true;
 	}
 
+	
+	// TODO - periodic (every minute or so) catch slots that were lost (check all slots, if they have any task active)
+
+	// TODO - release slots that were not used to the resource manager
+
 	// ------------------------------------------------------------------------
 	//  Error Handling
 	// ------------------------------------------------------------------------
@@ -405,24 +496,29 @@ public class SlotPool implements SlotOwner {
 	 * @param allocationID Represents the allocation which should be failed
 	 * @param cause        The cause of the failure
 	 */
+	@RpcMethod
 	public void failAllocation(final AllocationID allocationID, final Exception cause) {
-		synchronized (lock) {
-			// 1. check whether the allocation still pending
-			Tuple2<SlotRequest, FlinkCompletableFuture<SlotDescriptor>> pendingRequest =
-					pendingRequests.get(allocationID);
-			if (pendingRequest != null) {
-				pendingRequest.f1.completeExceptionally(cause);
-				return;
+		final PendingRequest pendingRequest = pendingRequests.remove(allocationID);
+		if (pendingRequest != null) {
+			// request was still pending
+			LOG.debug("Failed pending request [{}] with ", allocationID, cause);
+			pendingRequest.future().completeExceptionally(cause);
+		}
+		else if (availableSlots.tryRemove(allocationID)) {
+			LOG.debug("Failed available slot [{}] with ", allocationID, cause);
+		}
+		else {
+			Slot slot = allocatedSlots.remove(allocationID);
+			if (slot != null) {
+				// release the slot.
+				// since it is not in 'allocatedSlots' any more, it will be dropped o return'
+				slot.releaseSlot();
+			}
+			else {
+				LOG.debug("Outdated request to fail slot [{}] with ", allocationID, cause);
 			}
-
-			// 2. check whether we have a free slot corresponding to this allocation id
-			// TODO: add allocation id to slot descriptor, so we can remove it by allocation id
-
-			// 3. check whether we have a in-use slot corresponding to this allocation id
-			// TODO: needs mechanism to release the in-use Slot but don't return it back to this pool
-
-			// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
 		}
+		// TODO: add some unit tests when the previous two are ready, the allocation may failed at any phase
 	}
 
 	// ------------------------------------------------------------------------
@@ -435,10 +531,9 @@ public class SlotPool implements SlotOwner {
 	 *
 	 * @param resourceID The id of the TaskManager
 	 */
-	public void registerResource(final ResourceID resourceID) {
-		synchronized (lock) {
-			registeredResources.add(resourceID);
-		}
+	@RpcMethod
+	public void registerTaskManager(final ResourceID resourceID) {
+		registeredTaskManagers.add(resourceID);
 	}
 
 	/**
@@ -447,12 +542,12 @@ public class SlotPool implements SlotOwner {
 	 *
 	 * @param resourceID The id of the TaskManager
 	 */
-	public void releaseResource(final ResourceID resourceID) {
-		synchronized (lock) {
-			registeredResources.remove(resourceID);
-			availableSlots.removeByResource(resourceID);
+	@RpcMethod
+	public void releaseTaskManager(final ResourceID resourceID) {
+		if (registeredTaskManagers.remove(resourceID)) {
+			availableSlots.removeAllForTaskManager(resourceID);
 
-			final Set<Slot> allocatedSlotsForResource = allocatedSlots.getSlotsByResource(resourceID);
+			final Set<Slot> allocatedSlotsForResource = allocatedSlots.removeSlotsForTaskManager(resourceID);
 			for (Slot slot : allocatedSlotsForResource) {
 				slot.releaseSlot();
 			}
@@ -460,24 +555,15 @@ public class SlotPool implements SlotOwner {
 	}
 
 	// ------------------------------------------------------------------------
-	//  ResourceManager
+	//  Utilities
 	// ------------------------------------------------------------------------
 
-	public void setResourceManager(
-		final UUID resourceManagerLeaderId,
-		final ResourceManagerGateway resourceManagerGateway)
-	{
-		synchronized (lock) {
-			this.resourceManagerLeaderId = resourceManagerLeaderId;
-			this.resourceManagerGateway = resourceManagerGateway;
-		}
-	}
-
-	public void disconnectResourceManager() {
-		synchronized (lock) {
-			this.resourceManagerLeaderId = null;
-			this.resourceManagerGateway = null;
+	private SimpleSlot createSimpleSlot(AllocatedSlot slot, Locality locality) {
+		SimpleSlot result = new SimpleSlot(slot, providerAndOwner, slot.getSlotNumber());
+		if (locality != null) {
+			result.setLocality(locality);
 		}
+		return result;
 	}
 
 	// ------------------------------------------------------------------------
@@ -487,45 +573,34 @@ public class SlotPool implements SlotOwner {
 	/**
 	 * Organize allocated slots from different points of view.
 	 */
-	static class AllocatedSlots {
+	private static class AllocatedSlots {
 
 		/** All allocated slots organized by TaskManager's id */
-		private final Map<ResourceID, Set<Slot>> allocatedSlotsByResource;
-
-		/** All allocated slots organized by Slot object */
-		private final Map<Slot, AllocationID> allocatedSlots;
-
-		/** All allocated slot descriptors organized by Slot object */
-		private final Map<Slot, SlotDescriptor> allocatedSlotsWithDescriptor;
+		private final Map<ResourceID, Set<Slot>> allocatedSlotsByTaskManager;
 
 		/** All allocated slots organized by AllocationID */
 		private final Map<AllocationID, Slot> allocatedSlotsById;
 
 		AllocatedSlots() {
-			this.allocatedSlotsByResource = new HashMap<>();
-			this.allocatedSlots = new HashMap<>();
-			this.allocatedSlotsWithDescriptor = new HashMap<>();
+			this.allocatedSlotsByTaskManager = new HashMap<>();
 			this.allocatedSlotsById = new HashMap<>();
 		}
 
 		/**
-		 * Add a new allocation
+		 * Adds a new slot to this collection.
 		 *
-		 * @param allocationID The allocation id
-		 * @param slot         The allocated slot
+		 * @param slot The allocated slot
 		 */
-		void add(final AllocationID allocationID, final SlotDescriptor descriptor, final Slot slot) {
-			allocatedSlots.put(slot, allocationID);
-			allocatedSlotsById.put(allocationID, slot);
-			allocatedSlotsWithDescriptor.put(slot, descriptor);
+		void add(Slot slot) {
+			allocatedSlotsById.put(slot.getAllocatedSlot().getSlotAllocationId(), slot);
 
 			final ResourceID resourceID = slot.getTaskManagerID();
-			Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID);
-			if (slotsForResource == null) {
-				slotsForResource = new HashSet<>();
-				allocatedSlotsByResource.put(resourceID, slotsForResource);
+			Set<Slot> slotsForTaskManager = allocatedSlotsByTaskManager.get(resourceID);
+			if (slotsForTaskManager == null) {
+				slotsForTaskManager = new HashSet<>();
+				allocatedSlotsByTaskManager.put(resourceID, slotsForTaskManager);
 			}
-			slotsForResource.add(slot);
+			slotsForTaskManager.add(slot);
 		}
 
 		/**
@@ -541,11 +616,11 @@ public class SlotPool implements SlotOwner {
 		/**
 		 * Check whether we have allocated this slot
 		 *
-		 * @param slot The slot needs to checked
+		 * @param slotAllocationId The allocation id of the slot to check
 		 * @return True if we contains this slot
 		 */
-		boolean contains(final Slot slot) {
-			return allocatedSlots.containsKey(slot);
+		boolean contains(AllocationID slotAllocationId) {
+			return allocatedSlotsById.containsKey(slotAllocationId);
 		}
 
 		/**
@@ -553,25 +628,27 @@ public class SlotPool implements SlotOwner {
 		 *
 		 * @param slot The slot needs to be removed
 		 */
-		SlotDescriptor remove(final Slot slot) {
-			final SlotDescriptor descriptor = allocatedSlotsWithDescriptor.remove(slot);
-			if (descriptor != null) {
-				final AllocationID allocationID = allocatedSlots.remove(slot);
-				if (allocationID != null) {
-					allocatedSlotsById.remove(allocationID);
-				} else {
-					throw new IllegalStateException("Bug: maps are inconsistent");
-				}
+		boolean remove(final Slot slot) {
+			return remove(slot.getAllocatedSlot().getSlotAllocationId()) != null;
+		}
 
-				final ResourceID resourceID = slot.getTaskManagerID();
-				final Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID);
-				slotsForResource.remove(slot);
-				if (slotsForResource.isEmpty()) {
-					allocatedSlotsByResource.remove(resourceID);
+		/**
+		 * Remove an allocation with slot.
+		 *
+		 * @param slotId The ID of the slot to be removed
+		 */
+		Slot remove(final AllocationID slotId) {
+			Slot slot = allocatedSlotsById.remove(slotId);
+			if (slot != null) {
+				final ResourceID taskManagerId = slot.getTaskManagerID();
+				Set<Slot> slotsForTM = allocatedSlotsByTaskManager.get(taskManagerId);
+				slotsForTM.remove(slot);
+				if (slotsForTM.isEmpty()) {
+					allocatedSlotsByTaskManager.get(taskManagerId);
 				}
-				
-				return descriptor;
-			} else {
+				return slot;
+			}
+			else {
 				return null;
 			}
 		}
@@ -582,119 +659,326 @@ public class SlotPool implements SlotOwner {
 		 * @param resourceID The id of the TaskManager
 		 * @return Set of slots which are allocated from the same TaskManager
 		 */
-		Set<Slot> getSlotsByResource(final ResourceID resourceID) {
-			Set<Slot> slotsForResource = allocatedSlotsByResource.get(resourceID);
-			if (slotsForResource != null) {
-				return new HashSet<>(slotsForResource);
+		Set<Slot> removeSlotsForTaskManager(final ResourceID resourceID) {
+			Set<Slot> slotsForTaskManager = allocatedSlotsByTaskManager.remove(resourceID);
+			if (slotsForTaskManager != null) {
+				for (Slot slot : slotsForTaskManager) {
+					allocatedSlotsById.remove(slot.getAllocatedSlot().getSlotAllocationId());
+				}
+				return slotsForTaskManager;
 			}
 			else {
-				return new HashSet<>();
+				return Collections.emptySet();
 			}
 		}
 
+		void clear() {
+			allocatedSlotsById.clear();
+			allocatedSlotsByTaskManager.clear();
+		}
+
 		@VisibleForTesting
 		boolean containResource(final ResourceID resourceID) {
-			return allocatedSlotsByResource.containsKey(resourceID);
+			return allocatedSlotsByTaskManager.containsKey(resourceID);
 		}
 
 		@VisibleForTesting
 		int size() {
-			return allocatedSlots.size();
+			return allocatedSlotsById.size();
 		}
 	}
 
+	// ------------------------------------------------------------------------
+
 	/**
 	 * Organize all available slots from different points of view.
 	 */
-	static class AvailableSlots {
+	private static class AvailableSlots {
 
 		/** All available slots organized by TaskManager */
-		private final Map<ResourceID, Set<SlotDescriptor>> availableSlotsByResource;
+		private final HashMap<ResourceID, Set<AllocatedSlot>> availableSlotsByTaskManager;
 
-		/** All available slots */
-		private final Set<SlotDescriptor> availableSlots;
+		/** All available slots organized by host */
+		private final HashMap<String, Set<AllocatedSlot>> availableSlotsByHost;
+
+		/** The available slots, with the time when they were inserted */
+		private final HashMap<AllocationID, SlotAndTimestamp> availableSlots;
 
 		AvailableSlots() {
-			this.availableSlotsByResource = new HashMap<>();
-			this.availableSlots = new HashSet<>();
+			this.availableSlotsByTaskManager = new HashMap<>();
+			this.availableSlotsByHost = new HashMap<>();
+			this.availableSlots = new HashMap<>();
 		}
 
 		/**
-		 * Add an available slot.
+		 * Adds an available slot.
 		 *
-		 * @param descriptor The descriptor of the slot
+		 * @param slot The slot to add
 		 */
-		void add(final SlotDescriptor descriptor) {
-			availableSlots.add(descriptor);
-
-			final ResourceID resourceID = descriptor.getTaskManagerLocation().getResourceID();
-			Set<SlotDescriptor> slotsForResource = availableSlotsByResource.get(resourceID);
-			if (slotsForResource == null) {
-				slotsForResource = new HashSet<>();
-				availableSlotsByResource.put(resourceID, slotsForResource);
+		void add(final AllocatedSlot slot, final long timestamp) {
+			checkNotNull(slot);
+
+			SlotAndTimestamp previous = availableSlots.put(
+					slot.getSlotAllocationId(), new SlotAndTimestamp(slot, timestamp));
+
+			if (previous == null) {
+				final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
+				final String host = slot.getTaskManagerLocation().getFQDNHostname();
+
+				Set<AllocatedSlot> slotsForTaskManager = availableSlotsByTaskManager.get(resourceID);
+				if (slotsForTaskManager == null) {
+					slotsForTaskManager = new HashSet<>();
+					availableSlotsByTaskManager.put(resourceID, slotsForTaskManager);
+				}
+				slotsForTaskManager.add(slot);
+
+				Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
+				if (slotsForHost == null) {
+					slotsForHost = new HashSet<>();
+					availableSlotsByHost.put(host, slotsForHost);
+				}
+				slotsForHost.add(slot);
+			}
+			else {
+				throw new IllegalStateException("slot already contained");
 			}
-			slotsForResource.add(descriptor);
 		}
 
 		/**
-		 * Check whether we have this slot
-		 *
-		 * @param slotDescriptor The descriptor of the slot
-		 * @return True if we contains this slot
+		 * Check whether we have this slot.
 		 */
-		boolean contains(final SlotDescriptor slotDescriptor) {
-			return availableSlots.contains(slotDescriptor);
+		boolean contains(AllocationID slotId) {
+			return availableSlots.containsKey(slotId);
 		}
 
 		/**
-		 * Poll a slot which matches the required resource profile
+		 * Poll a slot which matches the required resource profile. The polling tries to satisfy the
+		 * location preferences, by TaskManager and by host.
 		 *
-		 * @param resourceProfile The required resource profile
+		 * @param resourceProfile      The required resource profile.
+		 * @param locationPreferences  The location preferences, in order to be checked.
+		 * 
 		 * @return Slot which matches the resource profile, null if we can't find a match
 		 */
-		SlotDescriptor poll(final ResourceProfile resourceProfile) {
-			for (SlotDescriptor slotDescriptor : availableSlots) {
-				if (slotDescriptor.getResourceProfile().isMatching(resourceProfile)) {
-					remove(slotDescriptor);
-					return slotDescriptor;
+		SlotAndLocality poll(ResourceProfile resourceProfile, Iterable<TaskManagerLocation> locationPreferences) {
+			// fast path if no slots are available
+			if (availableSlots.isEmpty()) {
+				return null;
+			}
+
+			boolean hadLocationPreference = false;
+
+			if (locationPreferences != null) {
+
+				// first search by TaskManager
+				for (TaskManagerLocation location : locationPreferences) {
+					hadLocationPreference = true;
+
+					final Set<AllocatedSlot> onTaskManager = availableSlotsByTaskManager.get(location.getResourceID());
+					if (onTaskManager != null) {
+						for (AllocatedSlot candidate : onTaskManager) {
+							if (candidate.getResourceProfile().isMatching(resourceProfile)) {
+								remove(candidate.getSlotAllocationId());
+								return new SlotAndLocality(candidate, Locality.LOCAL);
+							}
+						}
+					}
+				}
+
+				// now, search by host
+				for (TaskManagerLocation location : locationPreferences) {
+					final Set<AllocatedSlot> onHost = availableSlotsByHost.get(location.getFQDNHostname());
+					if (onHost != null) {
+						for (AllocatedSlot candidate : onHost) {
+							if (candidate.getResourceProfile().isMatching(resourceProfile)) {
+								remove(candidate.getSlotAllocationId());
+								return new SlotAndLocality(candidate, Locality.HOST_LOCAL);
+							}
+						}
+					}
 				}
 			}
+
+			// take any slot
+			for (SlotAndTimestamp candidate : availableSlots.values()) {
+				final AllocatedSlot slot = candidate.slot();
+
+				if (slot.getResourceProfile().isMatching(resourceProfile)) {
+					remove(slot.getSlotAllocationId());
+					return new SlotAndLocality(
+							slot, hadLocationPreference ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
+				}
+			}
+
+			// nothing available that matches
 			return null;
 		}
 
 		/**
 		 * Remove all available slots come from specified TaskManager.
 		 *
-		 * @param resourceID The id of the TaskManager
+		 * @param taskManager The id of the TaskManager
 		 */
-		void removeByResource(final ResourceID resourceID) {
-			final Set<SlotDescriptor> slotsForResource = availableSlotsByResource.remove(resourceID);
-			if (slotsForResource != null) {
-				for (SlotDescriptor slotDescriptor : slotsForResource) {
-					availableSlots.remove(slotDescriptor);
+		void removeAllForTaskManager(final ResourceID taskManager) {
+			// remove from the by-TaskManager view
+			final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.remove(taskManager);
+
+			if (slotsForTm != null && slotsForTm.size() > 0) {
+				final String host = slotsForTm.iterator().next().getTaskManagerLocation().getFQDNHostname();
+				final Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
+
+				// remove from the base set and the by-host view
+				for (AllocatedSlot slot : slotsForTm) {
+					availableSlots.remove(slot.getSlotAllocationId());
+					slotsForHost.remove(slot);
+				}
+
+				if (slotsForHost.isEmpty()) {
+					availableSlotsByHost.remove(host);
 				}
 			}
 		}
 
-		private void remove(final SlotDescriptor slotDescriptor) {
-			availableSlots.remove(slotDescriptor);
+		boolean tryRemove(AllocationID slotId) {
+			final SlotAndTimestamp sat = availableSlots.remove(slotId);
+			if (sat != null) {
+				final AllocatedSlot slot = sat.slot();
+				final ResourceID resourceID = slot.getTaskManagerLocation().getResourceID();
+				final String host = slot.getTaskManagerLocation().getFQDNHostname();
+
+				final Set<AllocatedSlot> slotsForTm = availableSlotsByTaskManager.get(resourceID);
+				final Set<AllocatedSlot> slotsForHost = availableSlotsByHost.get(host);
+
+				slotsForTm.remove(slot);
+				slotsForHost.remove(slot);
+
+				if (slotsForTm.isEmpty()) {
+					availableSlotsByTaskManager.remove(resourceID);
+				}
+				if (slotsForHost.isEmpty()) {
+					availableSlotsByHost.remove(host);
+				}
+
+				return true;
+			}
+			else {
+				return false;
+			}
+		}
 
-			final ResourceID resourceID = slotDescriptor.getTaskManagerLocation().getResourceID();
-			final Set<SlotDescriptor> slotsForResource = checkNotNull(availableSlotsByResource.get(resourceID));
-			slotsForResource.remove(slotDescriptor);
-			if (slotsForResource.isEmpty()) {
-				availableSlotsByResource.remove(resourceID);
+		private void remove(AllocationID slotId) throws IllegalStateException {
+			if (!tryRemove(slotId)) {
+				throw new IllegalStateException("slot not contained");
 			}
 		}
 
 		@VisibleForTesting
-		boolean containResource(final ResourceID resourceID) {
-			return availableSlotsByResource.containsKey(resourceID);
+		boolean containsTaskManager(ResourceID resourceID) {
+			return availableSlotsByTaskManager.containsKey(resourceID);
 		}
 
 		@VisibleForTesting
 		int size() {
 			return availableSlots.size();
 		}
+
+		@VisibleForTesting
+		void clear() {
+			availableSlots.clear();
+			availableSlotsByTaskManager.clear();
+			availableSlotsByHost.clear();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * An implementation of the {@link SlotOwner} and {@link SlotProvider} interfaces
+	 * that delegates methods as RPC calls to the SlotPool's RPC gateway.
+	 */
+	private static class ProviderAndOwner implements SlotOwner, SlotProvider {
+
+		private final SlotPoolGateway gateway;
+
+		private final Time timeout;
+
+		ProviderAndOwner(SlotPoolGateway gateway, Time timeout) {
+			this.gateway = gateway;
+			this.timeout = timeout;
+		}
+
+		@Override
+		public boolean returnAllocatedSlot(Slot slot) {
+			gateway.returnAllocatedSlot(slot);
+			return true;
+		}
+
+		@Override
+		public Future<SimpleSlot> allocateSlot(ScheduledUnit task, boolean allowQueued) {
+			return gateway.allocateSlot(
+					task, ResourceProfile.UNKNOWN, Collections.<TaskManagerLocation>emptyList(), timeout);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A pending request for a slot
+	 */
+	private static class PendingRequest {
+
+		private final AllocationID allocationID;
+
+		private final FlinkCompletableFuture<SimpleSlot> future;
+
+		private final ResourceProfile resourceProfile;
+
+		PendingRequest(
+				AllocationID allocationID,
+				FlinkCompletableFuture<SimpleSlot> future,
+				ResourceProfile resourceProfile) {
+			this.allocationID = allocationID;
+			this.future = future;
+			this.resourceProfile = resourceProfile;
+		}
+
+		public AllocationID allocationID() {
+			return allocationID;
+		}
+
+		public FlinkCompletableFuture<SimpleSlot> future() {
+			return future;
+		}
+
+		public ResourceProfile resourceProfile() {
+			return resourceProfile;
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * A slot, together with the timestamp when it was added
+	 */
+	private static class SlotAndTimestamp {
+
+		private final AllocatedSlot slot;
+
+		private final long timestamp;
+
+		SlotAndTimestamp(
+				AllocatedSlot slot,
+				long timestamp) {
+			this.slot = slot;
+			this.timestamp = timestamp;
+		}
+
+		public AllocatedSlot slot() {
+			return slot;
+		}
+
+		public long timestamp() {
+			return timestamp;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2731a0/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
new file mode 100644
index 0000000..42942ca
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.UUID;
+
+/**
+ * The gateway for calls on the {@link SlotPool}. 
+ */
+public interface SlotPoolGateway extends RpcGateway {
+
+	// ------------------------------------------------------------------------
+	//  shutdown
+	// ------------------------------------------------------------------------
+
+	void suspend();
+
+	// ------------------------------------------------------------------------
+	//  resource manager connection
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Connects the SlotPool to the given ResourceManager. After this method is called, the
+	 * SlotPool will be able to request resources from the given ResourceManager.
+	 * 
+	 * @param resourceManagerLeaderId The leader session ID of the resource manager.
+	 * @param resourceManagerGateway  The RPC gateway for the resource manager.
+	 */
+	void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway);
+
+	/**
+	 * Disconnects the slot pool from its current Resource Manager. After this call, the pool will not
+	 * be able to request further slots from the Resource Manager, and all currently pending requests
+	 * to the resource manager will be canceled.
+	 * 
+	 * <p>The slot pool will still be able to serve slots from its internal pool.
+	 */
+	void disconnectResourceManager();
+
+	// ------------------------------------------------------------------------
+	//  registering / un-registering TaskManagers and slots
+	// ------------------------------------------------------------------------
+
+	void registerTaskManager(ResourceID resourceID);
+
+	void releaseTaskManager(ResourceID resourceID);
+
+	Future<Boolean> offerSlot(AllocatedSlot slot);
+
+	Future<Iterable<SlotOffer>> offerSlots(Iterable<Tuple2<AllocatedSlot, SlotOffer>> offers);
+	
+	void failAllocation(AllocationID allocationID, Exception cause);
+
+	// ------------------------------------------------------------------------
+	//  allocating and disposing slots
+	// ------------------------------------------------------------------------
+
+	Future<SimpleSlot> allocateSlot(
+			ScheduledUnit task,
+			ResourceProfile resources,
+			Iterable<TaskManagerLocation> locationPreferences,
+			@RpcTimeout Time timeout);
+
+	void returnAllocatedSlot(Slot slot);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2731a0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java
index ec6e9b1..0ef2482 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java
@@ -19,19 +19,19 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 public enum Locality {
-	
-	/**
-	 * No constraint existed on the task placement.
-	 */
+
+	/** No constraint existed on the task placement. */
 	UNCONSTRAINED,
-	
-	/**
-	 * The task was scheduled respecting its locality preferences.
-	 */
+
+	/** The task was scheduled into the same TaskManager as requested */
 	LOCAL,
-	
-	/**
-	 * The task was scheduled to a destination not included in its locality preferences.
-	 */
-	NON_LOCAL
+
+	/** The task was scheduled onto the same host as requested */
+	HOST_LOCAL,
+
+	/** The task was scheduled to a destination not included in its locality preferences. */
+	NON_LOCAL,
+
+	/** No locality information was provided, it is unknown if the locality was respected */
+	UNKNOWN
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2731a0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index e45747b..546f31f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -54,7 +54,11 @@ public class NoResourceAvailableException extends JobException {
 	public NoResourceAvailableException(String message) {
 		super(message);
 	}
-	
+
+	public NoResourceAvailableException(String message, Throwable cause) {
+		super(message, cause);
+	}
+
 	// --------------------------------------------------------------------------------------------
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2731a0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index 9419ab4..b44128b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.jobmanager.slots;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -54,6 +56,9 @@ public class AllocatedSlot {
 	/** TEMP until the new RPC is in place: The actor gateway to communicate with the TaskManager */
 	private final TaskManagerGateway taskManagerGateway;
 
+	/** RPC gateway to call the TaskManager that holds this slot */
+	private final TaskExecutorGateway taskManagerGateway;
+
 	/** The number of the slot on the TaskManager to which slot belongs. Purely informational. */
 	private final int slotNumber;
 
@@ -73,15 +78,23 @@ public class AllocatedSlot {
 		this.slotNumber = slotNumber;
 		this.resourceProfile = checkNotNull(resourceProfile);
 		this.taskManagerGateway = checkNotNull(taskManagerGateway);
+		this.taskManagerGateway = null;
 	}
 
-	public AllocatedSlot(AllocatedSlot other) {
-		this.slotAllocationId = other.slotAllocationId;
-		this.jobID = other.jobID;
-		this.taskManagerLocation = other.taskManagerLocation;
-		this.slotNumber = other.slotNumber;
-		this.resourceProfile = other.resourceProfile;
-		this.taskManagerGateway = other.taskManagerGateway;
+	public AllocatedSlot(
+			AllocationID slotAllocationId,
+			JobID jobID,
+			TaskManagerLocation location,
+			int slotNumber,
+			ResourceProfile resourceProfile,		
+			TaskManagerGateway taskManagerGateway) {
+		this.slotAllocationId = checkNotNull(slotAllocationId);
+		this.jobID = checkNotNull(jobID);
+		this.taskManagerLocation = checkNotNull(location);
+		this.slotNumber = slotNumber;
+		this.resourceProfile = checkNotNull(resourceProfile);
+		this.taskManagerActorGateway = null;
+		this.taskManagerGateway = checkNotNull(taskManagerGateway);
 	}
 
 	// ------------------------------------------------------------------------
@@ -96,6 +109,17 @@ public class AllocatedSlot {
 	}
 
 	/**
+	 * Gets the ID of the TaskManager on which this slot was allocated.
+	 * 
+	 * <p>This is equivalent to {@link #getTaskManagerLocation()#getTaskManagerId()}.
+	 * 
+	 * @return This slot's TaskManager's ID.
+	 */
+	public ResourceID getTaskManagerId() {
+		return getTaskManagerLocation().getResourceID();
+	}
+
+	/**
 	 * Returns the ID of the job this allocated slot belongs to.
 	 *
 	 * @return the ID of the job this allocated slot belongs to
@@ -139,11 +163,27 @@ public class AllocatedSlot {
 	 * @return The actor gateway that can be used to send messages to the TaskManager.
 	 */
 	public TaskManagerGateway getTaskManagerGateway() {
-		return taskManagerGateway;
+		return 	return taskManagerGateway;
 	}
 
 	// ------------------------------------------------------------------------
 
+	/**
+	 * This always returns a reference hash code.
+	 */
+	@Override
+	public final int hashCode() {
+		return super.hashCode();
+	}
+
+	/**
+	 * This always checks based on reference equality.
+	 */
+	@Override
+	public final boolean equals(Object obj) {
+		return this == obj;
+	}
+
 	@Override
 	public String toString() {
 		return "AllocatedSlot " + slotAllocationId + " @ " + taskManagerLocation + " - " + slotNumber;

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2731a0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
deleted file mode 100644
index 5655fc2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.slots;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotPool;
-import org.apache.flink.runtime.instance.SlotProvider;
-import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A simple pool based slot provider with {@link SlotPool} as the underlying storage.
- */
-public class PooledSlotProvider implements SlotProvider {
-
-	/** The pool which holds all the slots. */
-	private final SlotPool slotPool;
-
-	/** The timeout for allocation. */
-	private final Time timeout;
-
-	public PooledSlotProvider(final SlotPool slotPool, final Time timeout) {
-		this.slotPool = slotPool;
-		this.timeout = timeout;
-	}
-
-	@Override
-	public Future<SimpleSlot> allocateSlot(ScheduledUnit task,
-			boolean allowQueued) throws NoResourceAvailableException
-	{
-		checkNotNull(task);
-
-		final JobID jobID = task.getTaskToExecute().getVertex().getJobId();
-		final Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, ResourceProfile.UNKNOWN);
-		try {
-			final SimpleSlot slot = future.get(timeout.getSize(), timeout.getUnit());
-			return FlinkCompletableFuture.completed(slot);
-		} catch (InterruptedException e) {
-			throw new NoResourceAvailableException("Could not allocate a slot because it's interrupted.");
-		} catch (ExecutionException e) {
-			throw new NoResourceAvailableException("Could not allocate a slot because some error occurred " +
-					"during allocation, " + e.getMessage());
-		} catch (TimeoutException e) {
-			throw new NoResourceAvailableException("Could not allocate a slot within time limit: " + timeout);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2731a0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
new file mode 100644
index 0000000..3fe5346
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A combination of a {@link AllocatedSlot} and a {@link Locality}.
+ */
+public class SlotAndLocality {
+
+	private final AllocatedSlot slot;
+
+	private final Locality locality;
+
+	public SlotAndLocality(AllocatedSlot slot, Locality locality) {
+		this.slot = checkNotNull(slot);
+		this.locality = checkNotNull(locality);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public AllocatedSlot slot() {
+		return slot;
+	}
+
+	public Locality locality() {
+		return locality;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return "Slot: " + slot + " (" + locality + ')';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2731a0/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a70cf21..c82ac01 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -53,8 +53,8 @@ import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.Slot;
-import org.apache.flink.runtime.instance.SlotDescriptor;
 import org.apache.flink.runtime.instance.SlotPool;
+import org.apache.flink.runtime.instance.SlotPoolGateway;
 import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -62,7 +62,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
-import org.apache.flink.runtime.jobmanager.slots.PooledSlotProvider;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -96,15 +96,13 @@ import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -161,7 +159,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	private final SlotPool slotPool;
 
-	private final Time allocationTimeout;
+	private final SlotPoolGateway slotPoolGateway;
 
 	private volatile UUID leaderSessionID;
 
@@ -249,8 +247,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		// register self as job status change listener
 		executionGraph.registerJobStatusListener(new JobManagerJobStatusListener());
 
-		this.slotPool = new SlotPool(executorService);
-		this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
+		this.slotPool = new SlotPool(rpcService, jobGraph.getJobID());
+		this.slotPoolGateway = slotPool.getSelf();
 
 		this.registeredTaskManagers = new HashMap<>(4);
 	}
@@ -273,9 +271,6 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	public void start(final UUID leaderSessionID) throws Exception {
 		if (LEADER_ID_UPDATER.compareAndSet(this, null, leaderSessionID)) {
 
-			// make sure the slot pool now accepts messages for this leader  
-			slotPool.setJobManagerLeaderId(leaderSessionID);
-
 			// make sure we receive RPC and async calls
 			super.start();
 
@@ -305,8 +300,18 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 
 	@RpcMethod
 	public void startJobExecution() {
+		// double check that the leader status did not change
+		if (leaderSessionID == null) {
+			log.info("Aborting job startup - JobManager lost leader status");
+			return;
+		}
+
 		log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());
 
+		// start the slot pool make sure the slot pool now accepts messages for this leader
+		log.debug("Staring SlotPool component");
+		slotPool.start(leaderSessionID);
+
 		try {
 			// job is ready to go, try to establish connection with resource manager
 			//   - activate leader retrieval for the resource manager
@@ -328,7 +333,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			@Override
 			public void run() {
 				try {
-					executionGraph.scheduleForExecution(new PooledSlotProvider(slotPool, allocationTimeout));
+					executionGraph.scheduleForExecution(slotPool.getSlotProvider());
 				}
 				catch (Throwable t) {
 					executionGraph.fail(t);
@@ -353,27 +358,26 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			return;
 		}
 
-		// receive no more messages until started again, should be called before we clear self leader id
-		((StartStoppable) getSelf()).stop();
-
+		// not leader any more - should not accept any leader messages any more
 		leaderSessionID = null;
-		slotPool.setJobManagerLeaderId(null);
-		executionGraph.suspend(cause);
 
-		// disconnect from resource manager:
 		try {
 			resourceManagerLeaderRetriever.stop();
-		} catch (Exception e) {
-			log.warn("Failed to stop resource manager leader retriever when suspending.", e);
+		} catch (Throwable t) {
+			log.warn("Failed to stop resource manager leader retriever when suspending.", t);
 		}
-		closeResourceManagerConnection();
 
-		// TODO: in the future, the slot pool should not release the resources, so that
-		// TODO: the TaskManagers offer the resources to the new leader 
-		for (ResourceID taskManagerId : registeredTaskManagers.keySet()) {
-			slotPool.releaseResource(taskManagerId);
-		}
-		registeredTaskManagers.clear();
+		// tell the execution graph (JobManager is still processing messages here) 
+		executionGraph.suspend(cause);
+
+		// receive no more messages until started again, should be called before we clear self leader id
+		((StartStoppable) getSelf()).stop();
+
+		// the slot pool stops receiving messages and clears its pooled slots 
+		slotPoolGateway.suspend();
+
+		// disconnect from resource manager:
+		closeResourceManagerConnection();
 	}
 
 	//----------------------------------------------------------------------------------------------
@@ -629,9 +633,11 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 	}
 
 	@RpcMethod
-	public Iterable<SlotOffer> offerSlots(final ResourceID taskManagerId,
-			final Iterable<SlotOffer> slots, final UUID leaderId) throws Exception
-	{
+	public Future<Iterable<SlotOffer>> offerSlots(
+			final ResourceID taskManagerId,
+			final Iterable<SlotOffer> slots,
+			final UUID leaderId) throws Exception {
+
 		validateLeaderSessionId(leaderSessionID);
 
 		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);
@@ -639,20 +645,22 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			throw new Exception("Unknown TaskManager " + taskManagerId);
 		}
 
-		final Set<SlotOffer> acceptedSlotOffers = new HashSet<>(4);
+		final JobID jid = jobGraph.getJobID();
+		final TaskManagerLocation taskManagerLocation = taskManager.f0;
+		final TaskExecutorGateway taskManagerGateway = taskManager.f1;
+
+		final ArrayList<Tuple2<AllocatedSlot, SlotOffer>> slotsAndOffers = new ArrayList<>();
+
 		for (SlotOffer slotOffer : slots) {
-			final SlotDescriptor slotDescriptor = new SlotDescriptor(
-					jobGraph.getJobID(),
-					taskManager.f0,
-					slotOffer.getSlotIndex(),
-					slotOffer.getResourceProfile(),
-					null); // TODO: replace the actor gateway with the new rpc gateway, it's ready (taskManager.f1)
-			if (slotPool.offerSlot(slotOffer.getAllocationId(), slotDescriptor)) {
-				acceptedSlotOffers.add(slotOffer);
-			}
+			final AllocatedSlot slot = new AllocatedSlot(
+					slotOffer.getAllocationId(), jid, taskManagerLocation,
+					slotOffer.getSlotIndex(), slotOffer.getResourceProfile(),
+					taskManagerGateway);
+
+			slotsAndOffers.add(new Tuple2<>(slot, slotOffer));
 		}
 
-		return acceptedSlotOffers;
+		return slotPoolGateway.offerSlots(slotsAndOffers);
 	}
 
 	@RpcMethod
@@ -667,7 +675,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			throw new Exception("Unknown TaskManager " + taskManagerId);
 		}
 
-		slotPool.failAllocation(allocationId, cause);
+		slotPoolGateway.failAllocation(allocationId, cause);
 	}
 
 	@RpcMethod
@@ -713,7 +721,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 						return new RegistrationResponse.Decline("Invalid leader session id");
 					}
 
-					slotPool.registerResource(taskManagerId);
+					slotPoolGateway.registerTaskManager(taskManagerId);
 					registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
 					return new JMTMRegistrationSuccess(taskManagerId, libraryCacheManager.getBlobServerPort());
 				}
@@ -845,7 +853,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			log.info("JobManager successfully registered at ResourceManager, leader id: {}.",
 					success.getResourceManagerLeaderId());
 
-			slotPool.setResourceManager(
+			slotPoolGateway.connectToResourceManager(
 					success.getResourceManagerLeaderId(), resourceManagerConnection.getTargetGateway());
 		}
 	}
@@ -857,7 +865,8 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 			resourceManagerConnection.close();
 			resourceManagerConnection = null;
 		}
-		slotPool.disconnectResourceManager();
+
+		slotPoolGateway.disconnectResourceManager();
 	}
 
 	private void validateLeaderSessionId(UUID leaderSessionID) throws LeaderIdMismatchException {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2731a0/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index b971b96..f30e345 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -25,6 +25,7 @@ import org.apache.flink.util.ReflectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -207,6 +208,17 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	 * @param runnable Runnable to be executed
 	 * @param delay    The delay after which the runnable will be executed
 	 */
+	protected void scheduleRunAsync(Runnable runnable, Time delay) {
+		scheduleRunAsync(runnable, delay.getSize(), delay.getUnit());
+	}
+
+	/**
+	 * Execute the runnable in the main thread of the underlying RPC endpoint, with
+	 * a delay of the given number of milliseconds.
+	 *
+	 * @param runnable Runnable to be executed
+	 * @param delay    The delay after which the runnable will be executed
+	 */
 	protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit unit) {
 		((MainThreadExecutable) self).scheduleRunAsync(runnable, unit.toMillis(delay));
 	}
@@ -255,7 +267,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	/**
 	 * Executor which executes runnables in the main thread context.
 	 */
-	private class MainThreadExecutor implements Executor {
+	private static class MainThreadExecutor implements Executor {
 
 		private final MainThreadExecutable gateway;
 
@@ -264,7 +276,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 		}
 
 		@Override
-		public void execute(Runnable runnable) {
+		public void execute(@Nonnull Runnable runnable) {
 			gateway.runAsync(runnable);
 		}
 	}
@@ -277,7 +289,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 	private Class<C> determineSelfGatewayType() {
 
 		// determine self gateway type
-		Class c = getClass();
+		Class<?> c = getClass();
 		Class<C> determinedSelfGatewayType;
 		do {
 			determinedSelfGatewayType = ReflectionUtil.getTemplateType1(c);


Mime
View raw message