flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [2/2] flink git commit: [FLINK-4347][FLINK-4348] simplify SlotManager and integrate it with ResourceManager
Date Fri, 07 Oct 2016 14:00:43 GMT
[FLINK-4347][FLINK-4348] simplify SlotManager and integrate it with ResourceManager

Instead of relying on a full synchronization of all slots information on
every heartbeat, the SlotManager is now responsible for updating its
state. It initially syncs all slots upon registration of the
TaskExecutor. After that, it only receives notifications from the
TaskExecutor when slots become available again. This simplifies the
logic of the SlotManager and makes the slot allocation more predictable
in case of message loss.

Additional changes:

- Move the slot registration and allocation report to the registration
  of the TaskExecutor

- Let the TaskExecutor immediately notify the ResourceManager once a
  slot becomes free. The ResourceManager has to confirm this
  notification. Otherwise, the slot will be blocked because the
  ResourceManager's state is not in sync.

- Integrate with handleSlotRequestFailedAtTaskManager and introduce
  fencing to protect against TaskExecutors which are not registered
  anymore.

- introduce RPC call to notify ResourceManager about free slots

- ignore out-of-date slot requests from ResourceManager at TaskExecutor

- let the ResourceManager update its state instead of relying on heartbeats

- provide ResourceManagerServices to SlotManager

- introduce factory for SlotManager

- keep task gateways and worker information in ResourceManager and
  inform SlotManager

- add TaskExecutor test to ensure that a free slot which hasn't been
  confirmed by the task executor is correctly blacklisted as long as the
  ResourceManager has not confirmed the allocation removal.

- adapt tests

- update javadocs

This closes #2571.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0518af03
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0518af03
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0518af03

Branch: refs/heads/flip-6
Commit: 0518af03a8931ed9b2da013c5ed4da7065156ee0
Parents: 85424c1
Author: Maximilian Michels <mxm@apache.org>
Authored: Thu Sep 29 15:08:32 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Fri Oct 7 15:51:14 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/types/ResourceSlot.java    |  12 +-
 .../resourcemanager/ResourceManager.java        | 115 +++++--
 .../resourcemanager/ResourceManagerGateway.java |  31 +-
 .../ResourceManagerServices.java                |   2 +-
 .../resourcemanager/SlotRequestRegistered.java  |  33 --
 .../resourcemanager/SlotRequestRejected.java    |  34 --
 .../resourcemanager/SlotRequestReply.java       |  41 ---
 .../StandaloneResourceManager.java              |   8 +-
 .../jobmanager/RMSlotRequestRegistered.java     |  33 ++
 .../jobmanager/RMSlotRequestRejected.java       |  34 ++
 .../messages/jobmanager/RMSlotRequestReply.java |  41 +++
 .../taskexecutor/SlotAvailableReply.java        |  47 +++
 .../taskexecutor/TMSlotRequestRegistered.java   |  35 ++
 .../taskexecutor/TMSlotRequestRejected.java     |  35 ++
 .../taskexecutor/TMSlotRequestReply.java        |  58 ++++
 .../registration/TaskExecutorRegistration.java  |  12 +-
 .../registration/WorkerRegistration.java        |  42 +++
 .../slotmanager/SimpleSlotManager.java          |  53 ---
 .../slotmanager/SlotManager.java                | 326 +++++++------------
 .../slotmanager/SlotManagerFactory.java         |  31 ++
 .../flink/runtime/taskexecutor/SlotReport.java  |  19 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  37 ++-
 .../taskexecutor/TaskExecutorGateway.java       |   7 +-
 ...TaskExecutorToResourceManagerConnection.java |   2 +-
 .../resourcemanager/ResourceManagerHATest.java  |  12 +-
 .../ResourceManagerJobMasterTest.java           |   4 +-
 .../ResourceManagerTaskExecutorTest.java        |  53 +--
 .../resourcemanager/TestingResourceManager.java |  53 +++
 .../resourcemanager/TestingSlotManager.java     |  78 +++++
 .../slotmanager/SlotManagerTest.java            | 239 ++++++--------
 .../slotmanager/SlotProtocolTest.java           |  92 ++++--
 .../runtime/taskexecutor/TaskExecutorTest.java  |  96 +++++-
 32 files changed, 1087 insertions(+), 628 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
index 4a91a79..0b9367d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceSlot.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -35,12 +35,12 @@ public class ResourceSlot implements ResourceIDRetrievable {
 	private final ResourceProfile resourceProfile;
 
 	/** Gateway to the TaskExecutor which owns the slot */
-	private final TaskExecutorGateway taskExecutorGateway;
+	private final TaskExecutorRegistration taskExecutorRegistration;
 
-	public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorGateway taskExecutorGateway) {
+	public ResourceSlot(SlotID slotId, ResourceProfile resourceProfile, TaskExecutorRegistration taskExecutorRegistration) {
 		this.slotId = checkNotNull(slotId);
 		this.resourceProfile = checkNotNull(resourceProfile);
-		this.taskExecutorGateway = taskExecutorGateway;
+		this.taskExecutorRegistration = checkNotNull(taskExecutorRegistration);
 	}
 
 	@Override
@@ -56,8 +56,8 @@ public class ResourceSlot implements ResourceIDRetrievable {
 		return resourceProfile;
 	}
 
-	public TaskExecutorGateway getTaskExecutorGateway() {
-		return taskExecutorGateway;
+	public TaskExecutorRegistration getTaskExecutorRegistration() {
+		return taskExecutorRegistration;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index f45afa3..d2d00cf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -24,8 +24,8 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
@@ -37,7 +37,12 @@ import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
+import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
+import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -45,12 +50,14 @@ import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 
+import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.util.LeaderConnectionInfo;
 import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -70,41 +77,54 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
-public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
+public abstract class ResourceManager<WorkerType extends Serializable>
 		extends RpcEndpoint<ResourceManagerGateway>
 		implements LeaderContender {
 
-	/** The exit code with which the process is stopped in case of a fatal error */
+	/** The exit code with which the process is stopped in case of a fatal error. */
 	protected static final int EXIT_CODE_FATAL_ERROR = -13;
 
+	/** All currently registered JobMasterGateways scoped by JobID. */
 	private final Map<JobID, JobMasterGateway> jobMasterGateways;
 
+	/** LeaderListeners for all registered JobMasters. */
 	private final Map<JobID, JobMasterLeaderListener> jobMasterLeaderRetrievalListeners;
 
-	private final Map<ResourceID, WorkerType> taskExecutorGateways;
+	/** All currently registered TaskExecutors with there framework specific worker information. */
+	private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
 
+	/** High availability services for leader retrieval and election. */
 	private final HighAvailabilityServices highAvailabilityServices;
 
-	private final SlotManager slotManager;
+	/** The factory to construct the SlotManager. */
+	private final SlotManagerFactory slotManagerFactory;
 
+	/** The SlotManager created by the slotManagerFactory when the ResourceManager is started. */
+	private SlotManager slotManager;
+
+	/** The service to elect a ResourceManager leader. */
 	private LeaderElectionService leaderElectionService;
 
+	/** ResourceManager's leader session id which is updated on leader election. */
 	private UUID leaderSessionID;
 
+	/** All registered listeners for status updates of the ResourceManager. */
 	private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
 
+	/** Default timeout for messages */
 	private final Time timeout = Time.seconds(5);
 
 	public ResourceManager(
 			RpcService rpcService,
 			HighAvailabilityServices highAvailabilityServices,
-			SlotManager slotManager) {
+			SlotManagerFactory slotManagerFactory) {
 		super(rpcService);
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
+		this.slotManagerFactory = checkNotNull(slotManagerFactory);
 		this.jobMasterGateways = new HashMap<>();
-		this.slotManager = checkNotNull(slotManager);
 		this.jobMasterLeaderRetrievalListeners = new HashMap<>();
-		this.taskExecutorGateways = new HashMap<>();
+		this.taskExecutors = new HashMap<>();
+		this.leaderSessionID = new UUID(0, 0);
 		infoMessageListeners = new HashMap<>();
 	}
 
@@ -113,9 +133,10 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		// start a leader
 		try {
 			super.start();
+			// SlotManager should start first
+			slotManager = slotManagerFactory.create(createResourceManagerServices());
 			leaderElectionService = highAvailabilityServices.getResourceManagerLeaderElectionService();
 			leaderElectionService.start(this);
-			slotManager.setupResourceManagerServices(new DefaultResourceManagerServices());
 			// framework specific initialization
 			initialize();
 		} catch (Throwable e) {
@@ -196,7 +217,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			.handleAsync(new BiFunction<JobMasterGateway, Throwable, RegistrationResponse>() {
 				@Override
 				public RegistrationResponse apply(JobMasterGateway jobMasterGateway, Throwable throwable) {
-					
+
 					if (throwable != null) {
 						return new RegistrationResponse.Decline(throwable.getMessage());
 					} else {
@@ -234,7 +255,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	public Future<RegistrationResponse> registerTaskExecutor(
 		final UUID resourceManagerLeaderId,
 		final String taskExecutorAddress,
-		final ResourceID resourceID) {
+		final ResourceID resourceID,
+		final SlotReport slotReport) {
 
 		return getRpcService().execute(new Callable<TaskExecutorGateway>() {
 			@Override
@@ -245,7 +267,8 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 						resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
 					throw new Exception("Invalid leader session id");
 				}
-				return getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class).get(5, TimeUnit.SECONDS);
+				return getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class)
+					.get(timeout.toMilliseconds(), timeout.getUnit());
 			}
 		}).handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
 			@Override
@@ -253,14 +276,17 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 				if (throwable != null) {
 					return new RegistrationResponse.Decline(throwable.getMessage());
 				} else {
-					WorkerType oldWorker = taskExecutorGateways.remove(resourceID);
-					if (oldWorker != null) {
+					WorkerRegistration oldRegistration = taskExecutors.remove(resourceID);
+					if (oldRegistration != null) {
 						// TODO :: suggest old taskExecutor to stop itself
-						slotManager.notifyTaskManagerFailure(resourceID);
+						log.info("Replacing old instance of worker for ResourceID {}", resourceID);
 					}
 					WorkerType newWorker = workerStarted(resourceID);
-					taskExecutorGateways.put(resourceID, newWorker);
-					return new TaskExecutorRegistrationSuccess(new InstanceID(), 5000);
+					WorkerRegistration<WorkerType> registration =
+						new WorkerRegistration<>(taskExecutorGateway, newWorker);
+					taskExecutors.put(resourceID, registration);
+					slotManager.registerTaskExecutor(resourceID, registration, slotReport);
+					return new TaskExecutorRegistrationSuccess(registration.getInstanceID(), 5000);
 				}
 			}
 		}, getMainThreadExecutor());
@@ -273,7 +299,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	 * @return Slot assignment
 	 */
 	@RpcMethod
-	public SlotRequestReply requestSlot(
+	public RMSlotRequestReply requestSlot(
 			UUID jobMasterLeaderID,
 			UUID resourceManagerLeaderID,
 			SlotRequest slotRequest) {
@@ -290,8 +316,41 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			return slotManager.requestSlot(slotRequest);
 		} else {
 			log.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
-			return new SlotRequestRejected(slotRequest.getAllocationId());
+			return new RMSlotRequestRejected(slotRequest.getAllocationId());
+		}
+	}
+
+	/**
+	 * Notification from a TaskExecutor that a slot has become available
+	 * @param resourceManagerLeaderId TaskExecutor's resource manager leader id
+	 * @param resourceID TaskExecutor's resource id
+	 * @param instanceID TaskExecutor's instance id
+	 * @param slotID The slot id of the available slot
+	 * @return SlotAvailableReply
+	 */
+	@RpcMethod
+	public SlotAvailableReply notifySlotAvailable(
+			final UUID resourceManagerLeaderId,
+			final ResourceID resourceID,
+			final InstanceID instanceID,
+			final SlotID slotID) {
+
+		if (resourceManagerLeaderId.equals(leaderSessionID)) {
+			WorkerRegistration<WorkerType> registration = taskExecutors.get(resourceID);
+			if (registration != null) {
+				InstanceID registrationInstanceID = registration.getInstanceID();
+				if (registrationInstanceID.equals(instanceID)) {
+					runAsync(new Runnable() {
+						@Override
+						public void run() {
+							slotManager.notifySlotAvailable(resourceID, slotID);
+						}
+					});
+					return new SlotAvailableReply(leaderSessionID, slotID);
+				}
+			}
 		}
+		return null;
 	}
 
 
@@ -329,9 +388,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 			public void run() {
 				log.info("ResourceManager {} was revoked leadership.", getAddress());
 				jobMasterGateways.clear();
-				taskExecutorGateways.clear();
+				taskExecutors.clear();
 				slotManager.clearState();
-				leaderSessionID = null;
+				leaderSessionID = new UUID(0, 0);
 			}
 		});
 	}
@@ -411,7 +470,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		runAsync(new Runnable() {
 			@Override
 			public void run() {
-				WorkerType worker = taskExecutorGateways.remove(resourceID);
+				WorkerType worker = taskExecutors.remove(resourceID).getWorker();
 				if (worker != null) {
 					// TODO :: suggest failed task executor to stop itself
 					slotManager.notifyTaskManagerFailure(resourceID);
@@ -426,7 +485,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 	 * @return The number of currently started TaskManagers.
 	 */
 	public int getNumberOfStartedTaskManagers() {
-		return taskExecutorGateways.size();
+		return taskExecutors.size();
 	}
 
 	/**
@@ -507,6 +566,14 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		});
 	}
 
+	// ------------------------------------------------------------------------
+	//  Resource Manager Services
+	// ------------------------------------------------------------------------
+
+	protected ResourceManagerServices createResourceManagerServices() {
+		return new DefaultResourceManagerServices();
+	}
+
 	private class DefaultResourceManagerServices implements ResourceManagerServices {
 
 		@Override
@@ -520,7 +587,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable>
 		}
 
 		@Override
-		public Executor getExecutor() {
+		public Executor getMainThreadExecutor() {
 			return ResourceManager.this.getMainThreadExecutor();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 87303a1..3c81227 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -22,11 +22,16 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
 
 import java.util.UUID;
 
@@ -56,10 +61,12 @@ public interface ResourceManagerGateway extends RpcGateway {
 	/**
 	 * Requests a slot from the resource manager.
 	 *
-	 * @param slotRequest Slot request
-	 * @return Future slot assignment
+	 * @param jobMasterLeaderID leader id of the JobMaster
+	 * @param resourceManagerLeaderID leader if of the ResourceMaster
+	 * @param slotRequest The slot to request
+	 * @return The confirmation that the slot gets allocated
 	 */
-	Future<SlotRequestReply> requestSlot(
+	Future<RMSlotRequestReply> requestSlot(
 		UUID jobMasterLeaderID,
 		UUID resourceManagerLeaderID,
 		SlotRequest slotRequest,
@@ -71,6 +78,7 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param resourceManagerLeaderId  The fencing token for the ResourceManager leader
 	 * @param taskExecutorAddress     The address of the TaskExecutor that registers
 	 * @param resourceID              The resource ID of the TaskExecutor that registers
+	 * @param slotReport              The slot report containing free and allocated task slots
 	 * @param timeout                 The timeout for the response.
 	 *
 	 * @return The future to the response by the ResourceManager.
@@ -79,6 +87,23 @@ public interface ResourceManagerGateway extends RpcGateway {
 		UUID resourceManagerLeaderId,
 		String taskExecutorAddress,
 		ResourceID resourceID,
+		SlotReport slotReport,
+		@RpcTimeout Time timeout);
+
+	/**
+	 * Sent by the TaskExecutor to notify the ResourceManager that a slot has become available.
+	 *
+	 * @param resourceManagerLeaderId The ResourceManager leader id
+	 * @param resourceID The ResourceID of the TaskExecutor
+	 * @param instanceID The InstanceID of the TaskExecutor
+	 * @param slotID The SlotID of the freed slot
+	 * @return The confirmation by the ResourceManager
+	 */
+	Future<SlotAvailableReply> notifySlotAvailable(
+		UUID resourceManagerLeaderId,
+		ResourceID resourceID,
+		InstanceID instanceID,
+		SlotID slotID,
 		@RpcTimeout Time timeout);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
index 30994dc..b997a3a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
@@ -39,6 +39,6 @@ public interface ResourceManagerServices {
 	/**
 	 * Gets the executor which executes in the main thread of the ResourceManager
 	 */
-	Executor getExecutor();
+	Executor getMainThreadExecutor();
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
deleted file mode 100644
index f719dce..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRegistered.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-
-/**
- * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
- */
-public class SlotRequestRegistered extends SlotRequestReply {
-
-	private static final long serialVersionUID = 4760320859275256855L;
-
-	public SlotRequestRegistered(AllocationID allocationID) {
-		super(allocationID);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
deleted file mode 100644
index 282a7d5..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestRejected.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-
-/**
- * Rejection message by the ResourceManager for a SlotRequest from the JobManager
- */
-public class SlotRequestRejected extends SlotRequestReply {
-
-	private static final long serialVersionUID = 9049346740895325144L;
-
-	public SlotRequestRejected(AllocationID allocationID) {
-		super(allocationID);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
deleted file mode 100644
index 1b85d0c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/SlotRequestReply.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-
-import java.io.Serializable;
-
-/**
- * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
- */
-public abstract class SlotRequestReply implements Serializable {
-
-	private static final long serialVersionUID = 42;
-
-	private final AllocationID allocationID;
-
-	public SlotRequestReply(AllocationID allocationID) {
-		this.allocationID = allocationID;
-	}
-
-	public AllocationID getAllocationID() {
-		return allocationID;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index deca8d3..f9f55f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -22,7 +22,7 @@ import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.RpcService;
 
 /**
@@ -34,9 +34,9 @@ import org.apache.flink.runtime.rpc.RpcService;
 public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 
 	public StandaloneResourceManager(RpcService rpcService,
-		HighAvailabilityServices highAvailabilityServices,
-		SlotManager slotManager) {
-		super(rpcService, highAvailabilityServices, slotManager);
+			HighAvailabilityServices highAvailabilityServices,
+			SlotManagerFactory slotManagerFactory) {
+		super(rpcService, highAvailabilityServices, slotManagerFactory);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRegistered.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRegistered.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRegistered.java
new file mode 100644
index 0000000..01bc532
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRegistered.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.messages.jobmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+/**
+ * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
+ */
+public class RMSlotRequestRegistered extends RMSlotRequestReply {
+
+	private static final long serialVersionUID = 4760320859275256855L;
+
+	public RMSlotRequestRegistered(AllocationID allocationID) {
+		super(allocationID);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRejected.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRejected.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRejected.java
new file mode 100644
index 0000000..649d61c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestRejected.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.messages.jobmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+/**
+ * Rejection message by the ResourceManager for a SlotRequest from the JobManager
+ */
+public class RMSlotRequestRejected extends RMSlotRequestReply {
+
+	private static final long serialVersionUID = 9049346740895325144L;
+
+	public RMSlotRequestRejected(AllocationID allocationID) {
+		super(allocationID);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestReply.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestReply.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestReply.java
new file mode 100644
index 0000000..66e1911
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/jobmanager/RMSlotRequestReply.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.messages.jobmanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.io.Serializable;
+
+/**
+ * Acknowledgment by the ResourceManager for a SlotRequest from the JobManager
+ */
+public abstract class RMSlotRequestReply implements Serializable {
+
+	private static final long serialVersionUID = 42;
+
+	private final AllocationID allocationID;
+
+	public RMSlotRequestReply(AllocationID allocationID) {
+		this.allocationID = allocationID;
+	}
+
+	public AllocationID getAllocationID() {
+		return allocationID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/SlotAvailableReply.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/SlotAvailableReply.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/SlotAvailableReply.java
new file mode 100644
index 0000000..f2e0105
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/SlotAvailableReply.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.messages.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+
+import java.io.Serializable;
+import java.util.UUID;
+
+/**
+ * Sent by the ResourceManager to the TaskExecutor confirm receipt of
+ * {@code org.apache.flink.runtime.resourcemanager.ResourceManagerGateway.notifySlotAvailable}.
+ */
+public class SlotAvailableReply implements Serializable {
+
+	private final UUID resourceManagerLeaderID;
+
+	private final SlotID slotID;
+
+	public SlotAvailableReply(UUID resourceManagerLeaderID, SlotID slotID) {
+		this.resourceManagerLeaderID = resourceManagerLeaderID;
+		this.slotID = slotID;
+	}
+
+	public UUID getResourceManagerLeaderID() {
+		return resourceManagerLeaderID;
+	}
+
+	public SlotID getSlotID() {
+		return slotID;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRegistered.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRegistered.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRegistered.java
new file mode 100644
index 0000000..c0f0f49
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRegistered.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.messages.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.InstanceID;
+
+/**
+ * Acknowledgment by the TaskExecutor for a SlotRequest from the ResourceManager
+ */
+public class TMSlotRequestRegistered extends TMSlotRequestReply {
+
+	private static final long serialVersionUID = 4760320859275256855L;
+
+	public TMSlotRequestRegistered(InstanceID instanceID, ResourceID resourceID, AllocationID allocationID) {
+		super(instanceID, resourceID, allocationID);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRejected.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRejected.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRejected.java
new file mode 100644
index 0000000..9b10a35
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestRejected.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.messages.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.InstanceID;
+
+/**
+ * Rejection by the TaskExecutor for a SlotRequest from the ResourceManager
+ */
+public class TMSlotRequestRejected extends TMSlotRequestReply {
+
+	private static final long serialVersionUID = 9049346740895325144L;
+
+	public TMSlotRequestRejected(InstanceID instanceID, ResourceID resourceID, AllocationID allocationID) {
+		super(instanceID, resourceID, allocationID);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestReply.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestReply.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestReply.java
new file mode 100644
index 0000000..b23b6e9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/messages/taskexecutor/TMSlotRequestReply.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.messages.taskexecutor;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.InstanceID;
+
+import java.io.Serializable;
+
+/**
+ * Acknowledgment by the TaskExecutor for a SlotRequest from the ResourceManager
+ */
+public abstract class TMSlotRequestReply implements Serializable {
+
+	private static final long serialVersionUID = 42;
+
+	private final InstanceID instanceID;
+
+	private final ResourceID resourceID;
+
+	private final AllocationID allocationID;
+
+	protected TMSlotRequestReply(InstanceID instanceID, ResourceID resourceID, AllocationID allocationID) {
+		this.instanceID = instanceID;
+		this.resourceID = resourceID;
+		this.allocationID = allocationID;
+	}
+
+	public InstanceID getInstanceID() {
+		return instanceID;
+	}
+
+	public ResourceID getResourceID() {
+		return resourceID;
+	}
+
+	public AllocationID getAllocationID() {
+		return allocationID;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
index 6b21f5c..bfa9c00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/TaskExecutorRegistration.java
@@ -24,20 +24,20 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import java.io.Serializable;
 
 /**
- * This class is responsible for group the TaskExecutorGateway and the InstanceID of a registered task executor.
+ * This class is responsible for grouping the TaskExecutorGateway and the InstanceID
+ * of a registered task executor.
  */
 public class TaskExecutorRegistration implements Serializable {
 
 	private static final long serialVersionUID = -2062957799469434614L;
 
-	private TaskExecutorGateway taskExecutorGateway;
+	private final InstanceID instanceID;
 
-	private InstanceID instanceID;
+	private TaskExecutorGateway taskExecutorGateway;
 
-	public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway,
-									InstanceID instanceID) {
+	public TaskExecutorRegistration(TaskExecutorGateway taskExecutorGateway) {
+		this.instanceID = new InstanceID();
 		this.taskExecutorGateway = taskExecutorGateway;
-		this.instanceID = instanceID;
 	}
 
 	public InstanceID getInstanceID() {

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
new file mode 100644
index 0000000..ff28f94
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/WorkerRegistration.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.registration;
+
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import java.io.Serializable;
+
+/**
+ * This class extends the {@link TaskExecutorRegistration}, adding the worker information.
+ */
+public class WorkerRegistration<WorkerType extends Serializable> extends TaskExecutorRegistration {
+
+	private static final long serialVersionUID = -2062957799469434614L;
+
+	private WorkerType worker;
+
+	public WorkerRegistration(TaskExecutorGateway taskExecutorGateway, WorkerType worker) {
+		super(taskExecutorGateway);
+		this.worker = worker;
+	}
+
+	public WorkerType getWorker() {
+		return worker;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
deleted file mode 100644
index ae1de5a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SimpleSlotManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.resourcemanager.slotmanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.resourcemanager.SlotRequest;
-
-import java.util.Iterator;
-import java.util.Map;
-
-/**
- * A simple SlotManager which ignores resource profiles.
- */
-public class SimpleSlotManager extends SlotManager {
-
-	@Override
-	protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
-		final Iterator<ResourceSlot> slotIterator = freeSlots.values().iterator();
-		if (slotIterator.hasNext()) {
-			return slotIterator.next();
-		} else {
-			return null;
-		}
-	}
-
-	@Override
-	protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, Map<AllocationID, SlotRequest> pendingRequests) {
-		final Iterator<SlotRequest> requestIterator = pendingRequests.values().iterator();
-		if (requestIterator.hasNext()) {
-			return requestIterator.next();
-		} else {
-			return null;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index a56b2f6..7eb2d78 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -28,11 +28,12 @@ import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,8 +65,11 @@ public abstract class SlotManager {
 
 	protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
+	/** The Resource allocation provider */
+	protected final ResourceManagerServices rmServices;
+
 	/** All registered task managers with ResourceID and gateway. */
-	private final Map<ResourceID, TaskExecutorGateway> taskManagerGateways;
+	private final Map<ResourceID, TaskExecutorRegistration> taskManagers;
 
 	/** All registered slots, including free and allocated slots */
 	private final Map<ResourceID, Map<SlotID, ResourceSlot>> registeredSlots;
@@ -84,29 +88,17 @@ public abstract class SlotManager {
 	/** The current leader id set by the ResourceManager */
 	private UUID leaderID;
 
-	/** The Resource allocation provider */
-	private ResourceManagerServices resourceManagerServices;
-
-	public SlotManager() {
+	public SlotManager(ResourceManagerServices rmServices) {
+		this.rmServices = checkNotNull(rmServices);
 		this.registeredSlots = new HashMap<>(16);
 		this.pendingSlotRequests = new LinkedHashMap<>(16);
 		this.freeSlots = new HashMap<>(16);
 		this.allocationMap = new AllocationMap();
-		this.taskManagerGateways = new HashMap<>();
+		this.taskManagers = new HashMap<>();
 		this.timeout = Time.seconds(10);
+		this.leaderID = new UUID(0, 0);
 	}
 
-	/**
-	 * Initializes the resource supplier which is needed to request new resources.
-	 */
-	public void setupResourceManagerServices(ResourceManagerServices resourceManagerServices) {
-		if (this.resourceManagerServices != null) {
-			throw new IllegalStateException("ResourceManagerServices may only be set once.");
-		}
-		this.resourceManagerServices = resourceManagerServices;
-	}
-
-
 	// ------------------------------------------------------------------------
 	//  slot managements
 	// ------------------------------------------------------------------------
@@ -118,13 +110,13 @@ public abstract class SlotManager {
 	 * RPC's main thread to avoid race condition).
 	 *
 	 * @param request The detailed request of the slot
-	 * @return SlotRequestRegistered The confirmation message to be send to the caller
+	 * @return RMSlotRequestRegistered The confirmation message to be send to the caller
 	 */
-	public SlotRequestRegistered requestSlot(final SlotRequest request) {
+	public RMSlotRequestRegistered requestSlot(final SlotRequest request) {
 		final AllocationID allocationId = request.getAllocationId();
 		if (isRequestDuplicated(request)) {
 			LOG.warn("Duplicated slot request, AllocationID:{}", allocationId);
-			return null;
+			return new RMSlotRequestRegistered(allocationId);
 		}
 
 		// try to fulfil the request with current free slots
@@ -136,53 +128,38 @@ public abstract class SlotManager {
 			// record this allocation in bookkeeping
 			allocationMap.addAllocation(slot.getSlotId(), allocationId);
 			// remove selected slot from free pool
-			final ResourceSlot removedSlot = freeSlots.remove(slot.getSlotId());
-
-			final Future<SlotRequestReply> slotRequestReplyFuture =
-				slot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
-
-			slotRequestReplyFuture.handleAsync(new BiFunction<SlotRequestReply, Throwable, Object>() {
-				@Override
-				public Object apply(SlotRequestReply slotRequestReply, Throwable throwable) {
-					if (throwable != null) {
-						// we failed, put the slot and the request back again
-						if (allocationMap.isAllocated(slot.getSlotId())) {
-							// only re-add if the slot hasn't been removed in the meantime
-							freeSlots.put(slot.getSlotId(), removedSlot);
-						}
-						pendingSlotRequests.put(allocationId, request);
-					}
-					return null;
-				}
-			}, resourceManagerServices.getExecutor());
+			freeSlots.remove(slot.getSlotId());
+
+			sendSlotRequest(slot, request);
 		} else {
 			LOG.info("Cannot fulfil slot request, try to allocate a new container for it, " +
 				"AllocationID:{}, JobID:{}", allocationId, request.getJobId());
-			Preconditions.checkState(resourceManagerServices != null,
+			Preconditions.checkState(rmServices != null,
 				"Attempted to allocate resources but no ResourceManagerServices set.");
-			resourceManagerServices.allocateResource(request.getResourceProfile());
+			rmServices.allocateResource(request.getResourceProfile());
 			pendingSlotRequests.put(allocationId, request);
 		}
 
-		return new SlotRequestRegistered(allocationId);
+		return new RMSlotRequestRegistered(allocationId);
 	}
 
 	/**
-	 * Sync slot status with TaskManager's SlotReport.
+	 * Notifies the SlotManager that a slot is available again after being allocated.
+	 * @param slotID slot id of available slot
 	 */
-	public void updateSlotStatus(final SlotReport slotReport) {
-		for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
-			updateSlotStatus(slotStatus);
+	public void notifySlotAvailable(ResourceID resourceID, SlotID slotID) {
+		if (!allocationMap.isAllocated(slotID)) {
+			throw new IllegalStateException("Slot was not previously allocated but " +
+				"TaskManager reports it as available again");
 		}
-	}
-
-	/**
-	 * Registers a TaskExecutor
-	 * @param resourceID TaskExecutor's ResourceID
-	 * @param gateway TaskExcutor's gateway
-	 */
-	public void registerTaskExecutor(ResourceID resourceID, TaskExecutorGateway gateway) {
-		this.taskManagerGateways.put(resourceID, gateway);
+		allocationMap.removeAllocation(slotID);
+		final Map<SlotID, ResourceSlot> slots = registeredSlots.get(resourceID);
+		ResourceSlot freeSlot = slots.get(slotID);
+		if (freeSlot == null) {
+			throw new IllegalStateException("Slot was not registered with SlotManager but " +
+				"TaskManager reported it to be available.");
+		}
+		handleFreeSlot(freeSlot);
 	}
 
 	/**
@@ -200,51 +177,72 @@ public abstract class SlotManager {
 	 * @param originalRequest The original slot request
 	 * @param slotId          The target SlotID
 	 */
-	public void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) {
+	void handleSlotRequestFailedAtTaskManager(final SlotRequest originalRequest, final SlotID slotId) {
 		final AllocationID originalAllocationId = originalRequest.getAllocationId();
 		LOG.info("Slot request failed at TaskManager, SlotID:{}, AllocationID:{}, JobID:{}",
 			slotId, originalAllocationId, originalRequest.getJobId());
 
-		// verify the allocation info before we do anything
-		if (freeSlots.containsKey(slotId)) {
-			// this slot is currently empty, no need to de-allocate it from our allocations
-			LOG.info("Original slot is somehow empty, retrying this request");
+		if (allocationMap.isAllocated(slotId)) {
+			final AllocationID expectedAllocationId = allocationMap.getAllocationID(slotId);
 
-			// before retry, we should double check whether this request was allocated by some other ways
-			if (!allocationMap.isAllocated(originalAllocationId)) {
-				requestSlot(originalRequest);
+			// check whether we have an agreement on whom this slot belongs to
+			if (originalAllocationId.equals(expectedAllocationId)) {
+				LOG.info("De-allocate this request and retry");
+				allocationMap.removeAllocation(expectedAllocationId);
+				pendingSlotRequests.put(originalRequest.getAllocationId(), originalRequest);
+				ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
+				// treat this slot as empty and retry with a different request
+				handleFreeSlot(slot);
 			} else {
-				LOG.info("The failed request has somehow been allocated, SlotID:{}",
-					allocationMap.getSlotID(originalAllocationId));
+				LOG.error("Slot request failed for slot {} with allocation id {}:" +
+						" Allocation id did not match the expected allocation id {}.",
+					slotId, originalAllocationId, expectedAllocationId);
 			}
-		} else if (allocationMap.isAllocated(slotId)) {
-			final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
+		} else {
+			LOG.error("Slot request failed for slot {} with allocation id {}: " +
+					"Slot was not previously registered.",
+				slotId, originalAllocationId);
+		}
+	}
 
-			// check whether we have an agreement on whom this slot belongs to
-			if (originalAllocationId.equals(currentAllocationId)) {
-				LOG.info("De-allocate this request and retry");
-				allocationMap.removeAllocation(currentAllocationId);
+	/**
+	 * Registers a TaskExecutor
+	 * @param resourceID TaskExecutor's ResourceID
+	 * @param registration TaskExecutor's registration
+	 * @param slotReport TaskExecutor's free and allocated slots
+	 */
+	public void registerTaskExecutor(
+			ResourceID resourceID,
+			TaskExecutorRegistration registration,
+			SlotReport slotReport) {
 
-				// put this slot back to free pool
-				ResourceSlot slot = checkNotNull(getRegisteredSlot(slotId));
-				freeSlots.put(slotId, slot);
+		if (taskManagers.get(resourceID) != null) {
+			notifyTaskManagerFailure(resourceID);
+		}
 
-				// retry the request
-				requestSlot(originalRequest);
-			} else {
-				// the slot is taken by someone else, no need to de-allocate it from our allocations
-				LOG.info("Original slot is taken by someone else, current AllocationID:{}", currentAllocationId);
+		this.taskManagers.put(resourceID, registration);
 
-				// before retry, we should double check whether this request was allocated by some other ways
-				if (!allocationMap.isAllocated(originalAllocationId)) {
-					requestSlot(originalRequest);
-				} else {
-					LOG.info("The failed request is somehow been allocated, SlotID:{}",
-						allocationMap.getSlotID(originalAllocationId));
-				}
+		for (SlotStatus slotStatus : slotReport.getSlotsStatus()) {
+			final SlotID slotId = slotStatus.getSlotID();
+
+			final TaskExecutorRegistration taskExecutorRegistration = taskManagers.get(slotId.getResourceID());
+			if (taskExecutorRegistration == null) {
+				LOG.info("Received SlotStatus but ResourceID {} is unknown to the SlotManager",
+					slotId.getResourceID());
+				return;
+			}
+
+			final ResourceSlot slot = new ResourceSlot(slotId, slotStatus.getProfiler(), taskExecutorRegistration);
+
+			registerNewSlot(slot);
+			LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, slotStatus.getAllocationID());
+
+			if (slotStatus.getAllocationID() != null) {
+				// slot in use, record this in bookkeeping
+				allocationMap.addAllocation(slotId, slotStatus.getAllocationID());
+			} else {
+				handleFreeSlot(slot);
 			}
-		} else {
-			LOG.error("BUG! {} is neither in free pool nor in allocated pool", slotId);
 		}
 	}
 
@@ -255,7 +253,7 @@ public abstract class SlotManager {
 	 */
 	public void notifyTaskManagerFailure(final ResourceID resourceId) {
 		LOG.info("Resource:{} been notified failure", resourceId);
-		taskManagerGateways.remove(resourceId);
+		taskManagers.remove(resourceId);
 		final Map<SlotID, ResourceSlot> slotIdsToRemove = registeredSlots.remove(resourceId);
 		if (slotIdsToRemove != null) {
 			for (SlotID slotId : slotIdsToRemove.keySet()) {
@@ -276,92 +274,6 @@ public abstract class SlotManager {
 	// ------------------------------------------------------------------------
 
 	/**
-	 * Update slot status based on TaskManager's report. There are mainly two situations when we receive the report:
-	 * <ul>
-	 * <li>1. The slot is newly registered.</li>
-	 * <li>2. The slot has registered, it contains its current status.</li>
-	 * </ul>
-	 * <p>
-	 * Regarding 1: It's fairly simple, we just record this slot's status, and trigger schedule if slot is empty.
-	 * <p>
-	 * Regarding 2: It will cause some weird situation since we may have some time-gap on how the slot's status really
-	 * is. We may have some updates on the slot's allocation, but it doesn't reflected by TaskManager's heartbeat yet,
-	 * and we may make some wrong decision if we cannot guarantee we have the exact status about all the slots. So
-	 * the principle here is: We always trust TaskManager's heartbeat, we will correct our information based on that
-	 * and take next action based on the diff between our information and heartbeat status.
-	 *
-	 * @param reportedStatus Reported slot status
-	 */
-	void updateSlotStatus(final SlotStatus reportedStatus) {
-		final SlotID slotId = reportedStatus.getSlotID();
-
-		final TaskExecutorGateway taskExecutorGateway = taskManagerGateways.get(slotId.getResourceID());
-		if (taskExecutorGateway == null) {
-			LOG.info("Received SlotStatus but ResourceID {} is unknown to the SlotManager",
-				slotId.getResourceID());
-			return;
-		}
-
-		final ResourceSlot slot = new ResourceSlot(slotId, reportedStatus.getProfiler(), taskExecutorGateway);
-
-		if (registerNewSlot(slot)) {
-			// we have a newly registered slot
-			LOG.info("New slot appeared, SlotID:{}, AllocationID:{}", slotId, reportedStatus.getAllocationID());
-
-			if (reportedStatus.getAllocationID() != null) {
-				// slot in use, record this in bookkeeping
-				allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
-			} else {
-				handleFreeSlot(slot);
-			}
-		} else {
-			// slot exists, update current information
-			if (reportedStatus.getAllocationID() != null) {
-				// slot is reported in use
-				final AllocationID reportedAllocationId = reportedStatus.getAllocationID();
-
-				// check whether we also thought this slot is in use
-				if (allocationMap.isAllocated(slotId)) {
-					// we also think that slot is in use, check whether the AllocationID matches
-					final AllocationID currentAllocationId = allocationMap.getAllocationID(slotId);
-
-					if (!reportedAllocationId.equals(currentAllocationId)) {
-						LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:{}",
-							slotId, currentAllocationId, reportedAllocationId);
-
-						// seems we have a disagreement about the slot assignments, need to correct it
-						allocationMap.removeAllocation(slotId);
-						allocationMap.addAllocation(slotId, reportedAllocationId);
-					}
-				} else {
-					LOG.info("Slot allocation info mismatch! SlotID:{}, current:null, reported:{}",
-						slotId, reportedAllocationId);
-
-					// we thought the slot is free, should correct this information
-					allocationMap.addAllocation(slotId, reportedStatus.getAllocationID());
-
-					// remove this slot from free slots pool
-					freeSlots.remove(slotId);
-				}
-			} else {
-				// slot is reported empty
-
-				// check whether we also thought this slot is empty
-				if (allocationMap.isAllocated(slotId)) {
-					LOG.info("Slot allocation info mismatch! SlotID:{}, current:{}, reported:null",
-						slotId, allocationMap.getAllocationID(slotId));
-
-					// we thought the slot is in use, correct it
-					allocationMap.removeAllocation(slotId);
-
-					// we have a free slot!
-					handleFreeSlot(slot);
-				}
-			}
-		}
-	}
-
-	/**
 	 * When we have a free slot, try to fulfill the pending request first. If any request can be fulfilled,
 	 * record this allocation in bookkeeping and send slot request to TaskManager, else we just add this slot
 	 * to the free pool.
@@ -373,32 +285,45 @@ public abstract class SlotManager {
 
 		if (chosenRequest != null) {
 			final AllocationID allocationId = chosenRequest.getAllocationId();
-			final SlotRequest removedSlotRequest = pendingSlotRequests.remove(allocationId);
+			final SlotRequest slotRequest = pendingSlotRequests.remove(allocationId);
 
 			LOG.info("Assigning SlotID({}) to AllocationID({}), JobID:{}", freeSlot.getSlotId(),
 				allocationId, chosenRequest.getJobId());
 			allocationMap.addAllocation(freeSlot.getSlotId(), allocationId);
 
-			final Future<SlotRequestReply> slotRequestReplyFuture =
-				freeSlot.getTaskExecutorGateway().requestSlot(allocationId, leaderID, timeout);
-
-			slotRequestReplyFuture.handleAsync(new BiFunction<SlotRequestReply, Throwable, Object>() {
-				@Override
-				public Object apply(SlotRequestReply slotRequestReply, Throwable throwable) {
-					if (throwable != null) {
-						// we failed, add the request back again
-						if (allocationMap.isAllocated(freeSlot.getSlotId())) {
-							pendingSlotRequests.put(allocationId, removedSlotRequest);
-						}
-					}
-					return null;
-				}
-			}, resourceManagerServices.getExecutor());
+			sendSlotRequest(freeSlot, slotRequest);
 		} else {
 			freeSlots.put(freeSlot.getSlotId(), freeSlot);
 		}
 	}
 
+	private void sendSlotRequest(final ResourceSlot freeSlot, final SlotRequest slotRequest) {
+
+		final AllocationID allocationID = slotRequest.getAllocationId();
+		final TaskExecutorRegistration registration = freeSlot.getTaskExecutorRegistration();
+		final Future<TMSlotRequestReply> slotRequestReplyFuture =
+			registration.getTaskExecutorGateway()
+				.requestSlot(freeSlot.getSlotId(), allocationID, leaderID, timeout);
+
+		slotRequestReplyFuture.handleAsync(new BiFunction<TMSlotRequestReply, Throwable, Void>() {
+			@Override
+			public Void apply(TMSlotRequestReply slotRequestReply, Throwable throwable) {
+				TaskExecutorRegistration current = taskManagers.get(slotRequestReply.getResourceID());
+				if (current != null && current.getInstanceID().equals(slotRequestReply.getInstanceID())) {
+					if (throwable != null || slotRequestReply instanceof TMSlotRequestRejected) {
+						handleSlotRequestFailedAtTaskManager(slotRequest, freeSlot.getSlotId());
+					} else {
+						LOG.debug("Successfully registered slot {} ", freeSlot.getSlotId());
+					}
+				} else {
+					LOG.debug("Discarding message from obsolete TaskExecutor with InstanceID {}",
+						slotRequestReply.getInstanceID());
+				}
+				return null;
+			}
+		}, rmServices.getMainThreadExecutor());
+	}
+
 	/**
 	 * Check whether the request is duplicated. We use AllocationID to identify slot request, for each
 	 * formerly received slot request, it is either in pending list or already been allocated.
@@ -413,18 +338,17 @@ public abstract class SlotManager {
 	}
 
 	/**
-	 * Try to register slot, and tell if this slot is newly registered.
+	 * Registers a new slot with the SlotManager.
 	 *
-	 * @param slot The ResourceSlot which will be checked and registered
-	 * @return <tt>true</tt> if we meet a new slot
+	 * @param slot The ResourceSlot which will be registered
 	 */
-	private boolean registerNewSlot(final ResourceSlot slot) {
+	private void registerNewSlot(final ResourceSlot slot) {
 		final SlotID slotId = slot.getSlotId();
 		final ResourceID resourceId = slotId.getResourceID();
 		if (!registeredSlots.containsKey(resourceId)) {
 			registeredSlots.put(resourceId, new HashMap<SlotID, ResourceSlot>());
 		}
-		return registeredSlots.get(resourceId).put(slotId, slot) == null;
+		registeredSlots.get(resourceId).put(slotId, slot);
 	}
 
 	private ResourceSlot getRegisteredSlot(final SlotID slotId) {
@@ -559,12 +483,12 @@ public abstract class SlotManager {
 	 * Clears the state of the SlotManager after leadership revokal
 	 */
 	public void clearState() {
-		taskManagerGateways.clear();
+		taskManagers.clear();
 		registeredSlots.clear();
 		pendingSlotRequests.clear();
 		freeSlots.clear();
 		allocationMap.clear();
-		leaderID = null;
+		leaderID = new UUID(0, 0);
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
new file mode 100644
index 0000000..b4e9c99
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+
+/**
+ * Factory to create a SlotManager and provide it with dependencies.
+ */
+public interface SlotManagerFactory {
+
+	/**
+	 * Creates a SlotManager and provides it with ResourceManager services.
+	 */
+	SlotManager create(ResourceManagerServices rmServices);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
index a5de2d5..54adce6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotReport.java
@@ -18,9 +18,8 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -37,20 +36,20 @@ public class SlotReport implements Serializable {
 	/** The slots status of the TaskManager */
 	private final List<SlotStatus> slotsStatus;
 
-	/** The resource id which identifies the TaskManager */
-	private final ResourceID resourceID;
+	public SlotReport() {
+		this(Collections.<SlotStatus>emptyList());
+	}
+
+	public SlotReport(SlotStatus slotStatus) {
+		this(Collections.singletonList(slotStatus));
+	}
 
-	public SlotReport(final List<SlotStatus> slotsStatus, final ResourceID resourceID) {
+	public SlotReport(final List<SlotStatus> slotsStatus) {
 		this.slotsStatus = checkNotNull(slotsStatus);
-		this.resourceID = checkNotNull(resourceID);
 	}
 
 	public List<SlotStatus> getSlotsStatus() {
 		return slotsStatus;
 	}
 
-	public ResourceID getResourceID() {
-		return resourceID;
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 7df0a91..c0041a3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered;
-import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.annotation.VisibleForTesting;
@@ -38,6 +41,8 @@ import org.apache.flink.runtime.rpc.RpcService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashSet;
+import java.util.Set;
 import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -78,6 +83,9 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	/** The fatal error handler to use in case of a fatal error */
 	private final FatalErrorHandler fatalErrorHandler;
 
+	/** Slots which have become available but haven't been confirmed by the RM */
+	private final Set<SlotID> unconfirmedFreeSlots;
+
 	// --------- resource manager --------
 
 	private TaskExecutorToResourceManagerConnection resourceManagerConnection;
@@ -109,6 +117,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 
 		this.numberOfSlots =  taskManagerConfiguration.getNumberSlots();
+
+		this.unconfirmedFreeSlots = new HashSet<>();
 	}
 
 	// ------------------------------------------------------------------------
@@ -152,6 +162,8 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 			}
 		}
 
+		unconfirmedFreeSlots.clear();
+
 		// establish a connection to the new leader
 		if (newLeaderAddress != null) {
 			log.info("Attempting to register at ResourceManager {}", newLeaderAddress);
@@ -169,13 +181,25 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	/**
 	 * Requests a slot from the TaskManager
 	 *
+	 * @param slotID Slot id for the request
 	 * @param allocationID id for the request
 	 * @param resourceManagerLeaderID current leader id of the ResourceManager
 	 * @return answer to the slot request
 	 */
 	@RpcMethod
-	public SlotRequestReply requestSlot(AllocationID allocationID, UUID resourceManagerLeaderID) {
-		return new SlotRequestRegistered(allocationID);
+	public TMSlotRequestReply requestSlot(SlotID slotID, AllocationID allocationID, UUID resourceManagerLeaderID) {
+		if (!resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderID)) {
+			return new TMSlotRequestRejected(
+				resourceManagerConnection.getRegistrationId(), getResourceID(), allocationID);
+		}
+		if (unconfirmedFreeSlots.contains(slotID)) {
+			// check if request has not been blacklisted because the notification of a free slot
+			// has not been confirmed by the ResourceManager
+			return new TMSlotRequestRejected(
+				resourceManagerConnection.getRegistrationId(), getResourceID(), allocationID);
+		}
+		return new TMSlotRequestRegistered(new InstanceID(), ResourceID.generate(), allocationID);
+
 	}
 
 	// ------------------------------------------------------------------------
@@ -227,6 +251,11 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 		return resourceManagerConnection;
 	}
 
+	@VisibleForTesting
+	public void addUnconfirmedFreeSlotNotification(SlotID slotID) {
+		unconfirmedFreeSlots.add(slotID);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Utility classes
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 0962802..2360b53 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -20,8 +20,9 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 
@@ -41,11 +42,13 @@ public interface TaskExecutorGateway extends RpcGateway {
 	/**
 	 * Requests a slot from the TaskManager
 	 *
+	 * @param slotID slot id for the request
 	 * @param allocationID id for the request
 	 * @param resourceManagerLeaderID current leader id of the ResourceManager
 	 * @return answer to the slot request
 	 */
-	Future<SlotRequestReply> requestSlot(
+	Future<TMSlotRequestReply> requestSlot(
+		SlotID slotID,
 		AllocationID allocationID,
 		UUID resourceManagerLeaderID,
 		@RpcTimeout Time timeout);

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index b4b3bae..2dbd9eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -113,7 +113,7 @@ public class TaskExecutorToResourceManagerConnection
 				ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception {
 
 			Time timeout = Time.milliseconds(timeoutMillis);
-			return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, timeout);
+			return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, new SlotReport(), timeout);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0518af03/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index fdb83f5..ce1fdca 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
@@ -43,7 +44,8 @@ public class ResourceManagerHATest {
 
 	@Test
 	public void testGrantAndRevokeLeadership() throws Exception {
-		// mock a RpcService which will return a special RpcGateway when call its startServer method, the returned RpcGateway directly execute runAsync call
+		// mock a RpcService which will return a special RpcGateway when call its startServer method,
+		// the returned RpcGateway directly executes runAsync calls
 		TestingResourceManagerGatewayProxy gateway = mock(TestingResourceManagerGatewayProxy.class);
 		doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
 
@@ -54,18 +56,18 @@ public class ResourceManagerHATest {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
 
-		SlotManager slotManager = mock(SlotManager.class);
-		final ResourceManager resourceManager = new StandaloneResourceManager(rpcService, highAvailabilityServices, slotManager);
+		final ResourceManager resourceManager =
+			new TestingResourceManager(rpcService, highAvailabilityServices);
 		resourceManager.start();
 		// before grant leadership, resourceManager's leaderId is null
-		Assert.assertNull(resourceManager.getLeaderSessionID());
+		Assert.assertEquals(new UUID(0,0), resourceManager.getLeaderSessionID());
 		final UUID leaderId = UUID.randomUUID();
 		leaderElectionService.isLeader(leaderId);
 		// after grant leadership, resourceManager's leaderId has value
 		Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID());
 		// then revoke leadership, resourceManager's leaderId is null again
 		leaderElectionService.notLeader();
-		Assert.assertNull(resourceManager.getLeaderSessionID());
+		Assert.assertEquals(new UUID(0,0), resourceManager.getLeaderSessionID());
 	}
 
 	private static abstract class TestingResourceManagerGatewayProxy implements MainThreadExecutable, StartStoppable, RpcGateway {


Mime
View raw message