flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/7] flink git commit: [FLINK-5810] [flip-6] Introduce a hardened slot manager
Date Fri, 28 Apr 2017 13:42:42 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index bae7086..85b7eb4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -26,12 +26,14 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.TestLogger;
 import org.junit.After;
@@ -152,10 +154,15 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		HeartbeatServices heartbeatServices = new HeartbeatServices(5L, 5L);
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
-		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
 			Time.seconds(5L),
 			Time.seconds(5L));
+			
+		SlotManager slotManager = new SlotManager(
+			rpcService.getScheduledExecutor(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
 
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
 		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
@@ -171,7 +178,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 				resourceManagerConfiguration,
 				highAvailabilityServices,
 				heartbeatServices,
-				slotManagerFactory,
+				slotManager,
 				metricRegistry,
 				jobLeaderIdService,
 				fatalErrorHandler);

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
deleted file mode 100644
index 67b208d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.mockito.Mockito;
-
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.Executor;
-
-public class TestingSlotManager extends SlotManager {
-
-	public TestingSlotManager() {
-		this(new TestingResourceManagerServices());
-	}
-
-	public TestingSlotManager(ResourceManagerServices rmServices) {
-		super(rmServices);
-	}
-
-	@Override
-	protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
-		final Iterator<ResourceSlot> slotIterator = freeSlots.values().iterator();
-		if (slotIterator.hasNext()) {
-			return slotIterator.next();
-		} else {
-			return null;
-		}
-	}
-
-	@Override
-	protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, Map<AllocationID, SlotRequest> pendingRequests) {
-		final Iterator<SlotRequest> requestIterator = pendingRequests.values().iterator();
-		if (requestIterator.hasNext()) {
-			return requestIterator.next();
-		} else {
-			return null;
-		}
-	}
-
-	private static class TestingResourceManagerServices implements ResourceManagerServices {
-
-		private final UUID leaderID = UUID.randomUUID();
-
-		@Override
-		public UUID getLeaderID() {
-			return leaderID;
-		}
-
-		@Override
-		public void allocateResource(ResourceProfile resourceProfile) {
-
-		}
-
-		@Override
-		public Executor getAsyncExecutor() {
-			return Mockito.mock(Executor.class);
-		}
-
-		@Override
-		public Executor getMainThreadExecutor() {
-			return Mockito.mock(Executor.class);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
deleted file mode 100644
index 6b5f6b2..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManagerFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
-
-public class TestingSlotManagerFactory implements SlotManagerFactory {
-
-	@Override
-	public SlotManager create(ResourceManagerServices rmServices) {
-		return new TestingSlotManager(rmServices);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 041747d..b0b5d32 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -23,510 +23,1058 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
-import org.junit.BeforeClass;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
-import org.mockito.Mockito;
+import org.mockito.ArgumentCaptor;
 
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SlotManagerTest extends TestLogger {
 
-public class SlotManagerTest {
-
-	private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
+	/**
+	 * Tests that we can register task manager and their slots at the slot manager.
+	 */
+	@Test
+	public void testTaskManagerRegistration() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
-	private static final int DEFAULT_TESTING_MEMORY = 512;
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
-	private static final ResourceProfile DEFAULT_TESTING_PROFILE =
-		new ResourceProfile(DEFAULT_TESTING_CPU_CORES, DEFAULT_TESTING_MEMORY);
+		ResourceID resourceId = ResourceID.generate();
+		final SlotID slotId1 = new SlotID(resourceId, 0);
+		final SlotID slotId2 = new SlotID(resourceId, 1);
+		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
+		final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
+		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
+		final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
 
-	private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
-		new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, DEFAULT_TESTING_MEMORY * 2);
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
-	private static TaskExecutorRegistration taskExecutorRegistration;
+			assertTrue("The number registered slots does not equal the expected number.",2 == slotManager.getNumberRegisteredSlots());
 
-	@BeforeClass
-	public static void setUp() {
-		taskExecutorRegistration = Mockito.mock(TaskExecutorRegistration.class);
-		TaskExecutorGateway gateway = Mockito.mock(TaskExecutorGateway.class);
-		Mockito.when(taskExecutorRegistration.getTaskExecutorGateway()).thenReturn(gateway);
-		Mockito.when(gateway.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
-			.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
+			assertNotNull(slotManager.getSlot(slotId1));
+			assertNotNull(slotManager.getSlot(slotId2));
+		}
 	}
 
 	/**
-	 * Tests that there are no free slots when we request, need to allocate from cluster manager master
+	 * Tests that un-registration of task managers will free and remove all registered slots.
 	 */
 	@Test
-	public void testRequestSlotWithoutFreeSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager();
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertEquals(1, slotManager.getAllocatedContainers().size());
-		assertEquals(DEFAULT_TESTING_PROFILE, slotManager.getAllocatedContainers().get(0));
+	public void testTaskManagerUnregistration() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final JobID jobId = new JobID();
+
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		when(taskExecutorGateway.requestSlot(
+			any(SlotID.class),
+			any(JobID.class),
+			any(AllocationID.class),
+			anyString(),
+			eq(leaderId),
+			any(Time.class))).thenReturn(new FlinkCompletableFuture<Acknowledge>());
+
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+
+		ResourceID resourceId = ResourceID.generate();
+		final SlotID slotId1 = new SlotID(resourceId, 0);
+		final SlotID slotId2 = new SlotID(resourceId, 1);
+		final AllocationID allocationId1 = new AllocationID();
+		final AllocationID allocationId2 = new AllocationID();
+		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
+		final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile, jobId, allocationId1);
+		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
+		final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
+
+		final SlotRequest slotRequest = new SlotRequest(
+			new JobID(),
+			allocationId2,
+			resourceProfile,
+			"foobar");
+
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+			slotManager.registerTaskManager(taskManagerConnection, slotReport);
+
+			assertTrue("The number registered slots does not equal the expected number.",2 == slotManager.getNumberRegisteredSlots());
+
+			TaskManagerSlot slot1 = slotManager.getSlot(slotId1);
+			TaskManagerSlot slot2 = slotManager.getSlot(slotId2);
+
+			assertTrue(slot1.isAllocated());
+			assertTrue(slot2.isFree());
+
+			assertTrue(slotManager.registerSlotRequest(slotRequest));
+
+			assertFalse(slot2.isFree());
+			assertTrue(slot2.hasPendingSlotRequest());
+
+			PendingSlotRequest pendingSlotRequest = slotManager.getSlotRequest(allocationId2);
+
+			assertTrue("The pending slot request should have been assigned to slot 2", pendingSlotRequest.isAssigned());
+
+			slotManager.unregisterTaskManager(taskManagerConnection.getInstanceID());
+
+			assertTrue(0 == slotManager.getNumberRegisteredSlots());
+			assertFalse(pendingSlotRequest.isAssigned());
+		}
 	}
 
 	/**
-	 * Tests that there are some free slots when we request, and the request is fulfilled immediately
+	 * Tests that a slot request with no free slots will trigger the resource allocation
 	 */
 	@Test
-	public void testRequestSlotWithFreeSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager();
+	public void testSlotRequestWithoutFreeSlots() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
+		final SlotRequest slotRequest = new SlotRequest(
+			new JobID(),
+			new AllocationID(),
+			resourceProfile,
+			"localhost");
 
-		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
-		assertEquals(1, slotManager.getFreeSlotCount());
+		ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertEquals(0, slotManager.getAllocatedContainers().size());
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+
+			slotManager.registerSlotRequest(slotRequest);
+
+			verify(resourceManagerActions).allocateResource(eq(resourceProfile));
+		}
 	}
 
 	/**
-	 * Tests that there are some free slots when we request, but none of them are suitable
+	 * Tests that the slot request fails if we cannot allocate more resources.
 	 */
 	@Test
-	public void testRequestSlotWithoutSuitableSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager();
-
-		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 2);
-		assertEquals(2, slotManager.getFreeSlotCount());
-
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(2, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertEquals(1, slotManager.getAllocatedContainers().size());
-		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+	public void testSlotRequestWithResourceAllocationFailure() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
+		final SlotRequest slotRequest = new SlotRequest(
+			new JobID(),
+			new AllocationID(),
+			resourceProfile,
+			"localhost");
+
+		ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		doThrow(new ResourceManagerException("Test exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class));
+
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+
+			slotManager.registerSlotRequest(slotRequest);
+
+			fail("The slot request should have failed with a ResourceManagerException.");
+
+		} catch (ResourceManagerException e) {
+			// expected exception
+		}
 	}
 
 	/**
-	 * Tests that we send duplicated slot request
+	 * Tests that a slot request which can be fulfilled will trigger a slot allocation.
 	 */
 	@Test
-	public void testDuplicatedSlotRequest() {
-		TestingSlotManager slotManager = new TestingSlotManager();
-		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 1);
-
-		SlotRequest request1 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
-
-		slotManager.requestSlot(request1);
-		slotManager.requestSlot(request2);
-		slotManager.requestSlot(request2);
-		slotManager.requestSlot(request1);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertEquals(1, slotManager.getAllocatedContainers().size());
-		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
+	public void testSlotRequestWithFreeSlot() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceID resourceID = ResourceID.generate();
+		final JobID jobId = new JobID();
+		final SlotID slotId = new SlotID(resourceID, 0);
+		final String targetAddress = "localhost";
+		final AllocationID allocationId = new AllocationID();
+		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
+		final SlotRequest slotRequest = new SlotRequest(
+			jobId,
+			allocationId,
+			resourceProfile,
+			targetAddress);
+
+		ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+
+			// accept an incoming slot request
+			final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+			when(taskExecutorGateway.requestSlot(
+				eq(slotId),
+				eq(jobId),
+				eq(allocationId),
+				anyString(),
+				eq(leaderId),
+				any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+			final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+
+			final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
+			final SlotReport slotReport = new SlotReport(slotStatus);
+
+			slotManager.registerTaskManager(
+				taskExecutorConnection,
+				slotReport);
+
+			assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest));
+
+			verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(leaderId), any(Time.class));
+
+			TaskManagerSlot slot = slotManager.getSlot(slotId);
+
+			assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
+		}
 	}
 
 	/**
-	 * Tests that we send multiple slot requests
+	 * Checks that un-registering a pending slot request will cancel it, removing it from all
+	 * assigned task manager slots and then remove it from the slot manager.
 	 */
 	@Test
-	public void testRequestMultipleSlots() {
-		TestingSlotManager slotManager = new TestingSlotManager();
-		directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 5);
+	public void testUnregisterPendingSlotRequest() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final AllocationID allocationId = new AllocationID();
+
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		when(taskExecutorGateway.requestSlot(
+			any(SlotID.class),
+			any(JobID.class),
+			any(AllocationID.class),
+			anyString(),
+			eq(leaderId),
+			any(Time.class))).thenReturn(new FlinkCompletableFuture<Acknowledge>());
+
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
+		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
+		final SlotReport slotReport = new SlotReport(slotStatus);
 
-		// request 3 normal slots
-		for (int i = 0; i < 3; ++i) {
-			slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-		}
+		final SlotRequest slotRequest = new SlotRequest(new JobID(), allocationId, resourceProfile, "foobar");
 
-		// request 2 big slots
-		for (int i = 0; i < 2; ++i) {
-			slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-		}
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
-		// request 1 normal slot again
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+			TaskManagerSlot slot = slotManager.getSlot(slotId);
 
-		assertEquals(4, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(2, slotManager.getPendingRequestCount());
-		assertEquals(2, slotManager.getAllocatedContainers().size());
-		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(0));
-		assertEquals(DEFAULT_TESTING_BIG_PROFILE, slotManager.getAllocatedContainers().get(1));
+			slotManager.registerSlotRequest(slotRequest);
+
+			assertNotNull(slotManager.getSlotRequest(allocationId));
+
+			assertTrue(slot.hasPendingSlotRequest());
+
+			slotManager.unregisterSlotRequest(allocationId);
+
+			assertNull(slotManager.getSlotRequest(allocationId));
+
+			slot = slotManager.getSlot(slotId);
+			assertTrue(slot.isFree());
+		}
 	}
 
 	/**
-	 * Tests that a new slot appeared in SlotReport, and we used it to fulfill a pending request
+	 * Tests that pending slot requests are tried to be fulfilled upon new slot registrations.
 	 */
 	@Test
-	public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
-		TestingSlotManager slotManager = new TestingSlotManager();
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-		assertEquals(1, slotManager.getPendingRequestCount());
-
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus));
-		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotId));
+	public void testFulfillingPendingSlotRequest() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceID resourceID = ResourceID.generate();
+		final JobID jobId = new JobID();
+		final SlotID slotId = new SlotID(resourceID, 0);
+		final String targetAddress = "localhost";
+		final AllocationID allocationId = new AllocationID();
+		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
+		final SlotRequest slotRequest = new SlotRequest(
+			jobId,
+			allocationId,
+			resourceProfile,
+			targetAddress);
+
+		ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+
+		// accept an incoming slot request
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		when(taskExecutorGateway.requestSlot(
+			eq(slotId),
+			eq(jobId),
+			eq(allocationId),
+			anyString(),
+			eq(leaderId),
+			any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
+
+		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
+		final SlotReport slotReport = new SlotReport(slotStatus);
+
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+
+			assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest));
+
+			verify(resourceManagerActions, times(1)).allocateResource(eq(resourceProfile));
+
+			slotManager.registerTaskManager(
+				taskExecutorConnection,
+				slotReport);
+
+			verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(leaderId), any(Time.class));
+
+			TaskManagerSlot slot = slotManager.getSlot(slotId);
+
+			assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
+		}
 	}
 
 	/**
-	 * Tests that a new slot appeared in SlotReport, but we have no pending request
+	 * Tests that freeing a slot will correctly reset the slot and mark it as a free slot
 	 */
 	@Test
-	public void testNewlyAppearedFreeSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager();
+	public void testFreeSlot() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceID resourceID = ResourceID.generate();
+		final JobID jobId = new JobID();
+		final SlotID slotId = new SlotID(resourceID, 0);
+		final AllocationID allocationId = new AllocationID();
+		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
+
+		ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+
+		// accept an incoming slot request
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus));
-		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport);
+		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
 
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
+		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
+		final SlotReport slotReport = new SlotReport(slotStatus);
+
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+
+			slotManager.registerTaskManager(
+				taskExecutorConnection,
+				slotReport);
+
+			TaskManagerSlot slot = slotManager.getSlot(slotId);
+
+			assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
+
+			// this should be ignored since the allocation id does not match
+			slotManager.freeSlot(slotId, new AllocationID());
+
+			assertTrue(slot.isAllocated());
+			assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
+
+			slotManager.freeSlot(slotId, allocationId);
+
+			assertTrue(slot.isFree());
+			assertNull(slot.getAllocationId());
+		}
 	}
 
 	/**
-	 * Tests that a new slot appeared in SlotReport, but it't not suitable for all the pending requests
+	 * Tests that a second pending slot request is detected as a duplicate if the allocation ids are
+	 * the same.
 	 */
 	@Test
-	public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
-		TestingSlotManager slotManager = new TestingSlotManager();
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-		assertEquals(1, slotManager.getPendingRequestCount());
-
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE);
-		SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus));
-		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport);
-
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertFalse(slotManager.isAllocated(slotId));
+	public void testDuplicatePendingSlotRequest() throws Exception {
+
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final AllocationID allocationId = new AllocationID();
+		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
+		final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
+		final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
+		final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
+
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+			assertTrue(slotManager.registerSlotRequest(slotRequest1));
+			assertFalse(slotManager.registerSlotRequest(slotRequest2));
+		}
+
+		// check that we have only called the resource allocation only for the first slot request,
+		// since the second request is a duplicate
+		verify(resourceManagerActions, times(1)).allocateResource(any(ResourceProfile.class));
 	}
 
 	/**
-	 * Tests that a new slot appeared in SlotReport, and it's been reported using by some job
+	 * Tests that if we have received a slot report with some allocated slots, then we don't accept
+	 * slot requests with allocated allocation ids.
 	 */
 	@Test
-	public void testNewlyAppearedInUseSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager();
+	public void testDuplicatePendingSlotRequestAfterSlotReport() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final JobID jobId = new JobID();
+		final AllocationID allocationId = new AllocationID();
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
+		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+
+		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
+		final SlotReport slotReport = new SlotReport(slotStatus);
 
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
-		SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus));
-		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport);
+		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
 
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertTrue(slotManager.isAllocated(slotId));
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+			slotManager.registerTaskManager(taskManagerConnection, slotReport);
+
+			assertFalse(slotManager.registerSlotRequest(slotRequest));
+		}
 	}
 
 	/**
-	 * Tests that we had a slot in-use and is freed again subsequently.
+	 * Tests that duplicate slot requests (requests with an already registered allocation id) are
+	 * also detected after a pending slot request has been fulfilled but not yet freed.
 	 */
 	@Test
-	public void testExistingInUseSlotUpdateStatus() {
-		TestingSlotManager slotManager = new TestingSlotManager();
+	public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final AllocationID allocationId = new AllocationID();
+		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
+		final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
+		final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
+		final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
+
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		when(taskExecutorGateway.requestSlot(
+			any(SlotID.class),
+			any(JobID.class),
+			any(AllocationID.class),
+			anyString(),
+			eq(leaderId),
+			any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+
+		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile1);
+		final SlotReport slotReport = new SlotReport(slotStatus);
 
-		SlotID slotId = SlotID.generate();
-		SlotStatus slotStatus = new SlotStatus(slotId, DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
-		SlotReport slotReport = new SlotReport(Collections.singletonList(slotStatus));
-		slotManager.registerTaskExecutor(slotId.getResourceID(), taskExecutorRegistration, slotReport);
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+			slotManager.registerTaskManager(taskManagerConnection, slotReport);
+			assertTrue(slotManager.registerSlotRequest(slotRequest1));
 
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertTrue(slotManager.isAllocated(slotId));
+			TaskManagerSlot slot = slotManager.getSlot(slotId);
 
-		// slot is freed again
-		slotManager.notifySlotAvailable(slotId.getResourceID(), slotId);
+			assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
 
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertFalse(slotManager.isAllocated(slotId));
+			assertFalse(slotManager.registerSlotRequest(slotRequest2));
+		}
+
+		// check that we have only called the resource allocation only for the first slot request,
+		// since the second request is a duplicate
+		verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class));
 	}
 
 	/**
-	 * Tests multiple slot requests with one slots.
+	 * Tests that an already registered allocation id can be reused after the initial slot request
+	 * has been freed.
 	 */
 	@Test
-	public void testMultipleSlotRequestsWithOneSlot() {
-		TestingSlotManager slotManager = new TestingSlotManager();
-		final AllocationID allocationID = new AllocationID();
+	public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final AllocationID allocationId = new AllocationID();
+		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
+		final ResourceProfile resourceProfile2 = new ResourceProfile(2.0, 1);
+		final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
+		final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
+
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		when(taskExecutorGateway.requestSlot(
+			any(SlotID.class),
+			any(JobID.class),
+			any(AllocationID.class),
+			anyString(),
+			eq(leaderId),
+			any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+
+		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final SlotStatus slotStatus = new SlotStatus(slotId, new ResourceProfile(2.0, 2));
+		final SlotReport slotReport = new SlotReport(slotStatus);
 
-		SlotRequest request1 = new SlotRequest(new JobID(), allocationID, DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request1);
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+			slotManager.registerTaskManager(taskManagerConnection, slotReport);
+			assertTrue(slotManager.registerSlotRequest(slotRequest1));
 
-		final ResourceID resourceID = ResourceID.generate();
-		final SlotStatus slotStatus = new SlotStatus(new SlotID(resourceID, 0), DEFAULT_TESTING_PROFILE);
-		final SlotReport slotReport = new SlotReport(slotStatus);
-		slotManager.registerTaskExecutor(resourceID, taskExecutorRegistration, slotReport);
+			TaskManagerSlot slot = slotManager.getSlot(slotId);
 
-		// another request pending
-		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request2);
+			assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
 
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(1, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(allocationID));
-		assertTrue(slotManager.isAllocated(request1.getAllocationId()));
+			slotManager.freeSlot(slotId, allocationId);
 
-		// but slot is reported empty in a report in the meantime which shouldn't affect the state
-		slotManager.notifySlotAvailable(resourceID, slotStatus.getSlotID());
+			// check that the slot has been freed
+			assertTrue(slot.isFree());
+			assertNull(slot.getAllocationId());
 
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slotStatus.getSlotID()));
-		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+			assertTrue(slotManager.registerSlotRequest(slotRequest2));
 
-		// but slot is reported empty in a report in the meantime which shouldn't affect the state
-		slotManager.notifySlotAvailable(resourceID, slotStatus.getSlotID());
+			assertEquals("The slot has not been allocated to the expected allocation id.", allocationId, slot.getAllocationId());
+		}
 
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
+		// check that we have only called the resource allocation only for the first slot request,
+		// since the second request is a duplicate
+		verify(resourceManagerActions, never()).allocateResource(any(ResourceProfile.class));
 	}
 
 	/**
-	 * Tests that we did some allocation but failed / rejected by TaskManager, request will retry
+	 * Tests that the slot manager ignores slot reports of unknown origin (not registered
+	 * task managers).
 	 */
 	@Test
-	public void testSlotAllocationFailedAtTaskManager() {
-		TestingSlotManager slotManager = new TestingSlotManager();
-		ResourceSlot slot = new ResourceSlot(SlotID.generate(), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
-		slotManager.addFreeSlot(slot);
+	public void testReceivingUnknownSlotReport() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
-		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request);
+		final InstanceID unknownInstanceID = new InstanceID();
+		final SlotID unknownSlotId = new SlotID(ResourceID.generate(), 0);
+		final ResourceProfile unknownResourceProfile = new ResourceProfile(1.0, 1);
+		final SlotStatus unknownSlotStatus = new SlotStatus(unknownSlotId, unknownResourceProfile);
+		final SlotReport unknownSlotReport = new SlotReport(unknownSlotStatus);
 
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertTrue(slotManager.isAllocated(slot.getSlotId()));
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+			// check that we don't have any slots registered
+			assertTrue(0 == slotManager.getNumberRegisteredSlots());
 
-		slotManager.handleSlotRequestFailedAtTaskManager(request, slot.getSlotId());
+			// this should not update anything since the instance id is not known to the slot manager
+			assertFalse(slotManager.reportSlotStatus(unknownInstanceID, unknownSlotReport));
 
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
+			assertTrue(0 == slotManager.getNumberRegisteredSlots());
+		}
 	}
 
-
 	/**
-	 * Tests that we did some allocation but failed / rejected by TaskManager, and slot is occupied by another request
-	 * This can only occur after reconnect of the TaskExecutor.
+	 * Tests that slots are updated with respect to the latest incoming slot report. This means that
+	 * slot for which not report has been received will be removed and those for which a report was
+	 * received are updated accordingly.
 	 */
 	@Test
-	public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
-		TestingSlotManager slotManager = new TestingSlotManager();
-		final SlotID slotID = SlotID.generate();
-		SlotStatus slot = new SlotStatus(slotID, DEFAULT_TESTING_PROFILE);
-		SlotReport slotReport = new SlotReport(slot);
-		slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorRegistration, slotReport);
-
-		SlotRequest request = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-
-		// slot is set empty by a reconnect of the TaskExecutor
-		slotManager.registerTaskExecutor(slotID.getResourceID(), taskExecutorRegistration, slotReport);
-
-		assertEquals(0, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-
-		// another request takes the slot
-		SlotRequest request2 = new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE);
-		slotManager.requestSlot(request2);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertFalse(slotManager.isAllocated(request.getAllocationId()));
-		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-
-		// original request should be retried
-		slotManager.handleSlotRequestFailedAtTaskManager(request, slotID);
-
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-		assertFalse(slotManager.isAllocated(request.getAllocationId()));
-		assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+	public void testUpdateSlotReport() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+
+		final JobID jobId = new JobID();
+		final AllocationID allocationId = new AllocationID();
+
+		final ResourceID resourceId = ResourceID.generate();
+		final SlotID slotId1 = new SlotID(resourceId, 0);
+		final SlotID slotId2 = new SlotID(resourceId, 1);
+		final SlotID slotId3 = new SlotID(resourceId, 2);
+
+
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
+		final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
+		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
+
+		final SlotStatus newSlotStatus2 = new SlotStatus(slotId2, resourceProfile, jobId, allocationId);
+		final SlotStatus slotStatus3 = new SlotStatus(slotId3, resourceProfile);
+
+		final SlotReport slotReport1 = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
+		final SlotReport slotReport2 = new SlotReport(Arrays.asList(newSlotStatus2, slotStatus3));
+
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+			// check that we don't have any slots registered
+			assertTrue(0 == slotManager.getNumberRegisteredSlots());
+
+			slotManager.registerTaskManager(taskManagerConnection, slotReport1);
+
+			TaskManagerSlot slot = slotManager.getSlot(slotId2);
+
+			assertTrue(2 == slotManager.getNumberRegisteredSlots());
+
+			assertTrue(slot.isFree());
+
+			assertTrue(slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), slotReport2));
+
+			assertTrue(2 == slotManager.getNumberRegisteredSlots());
+
+			// the slot manager should have removed slotId1
+			assertNull(slotManager.getSlot(slotId1));
+
+			assertNotNull(slotManager.getSlot(slotId3));
+
+			// slotId2 should have been allocated for allocationId
+			assertEquals(allocationId, slotManager.getSlot(slotId2).getAllocationId());
+		}
 	}
 
+	/**
+	 * Tests that idle task managers time out after the configured timeout. A timed out task manager
+	 * will be removed from the slot manager and the resource manager will be notified about the
+	 * timeout.
+	 */
 	@Test
-	public void testNotifyTaskManagerFailure() {
-		TestingSlotManager slotManager = new TestingSlotManager();
+	public void testTaskManagerTimeout() throws Exception {
+		final long tmTimeout = 50L;
 
-		ResourceID resource1 = ResourceID.generate();
-		ResourceID resource2 = ResourceID.generate();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final UUID leaderId = UUID.randomUUID();
 
-		ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 1), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
-		ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 2), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
-		ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 1), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
-		ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 2), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
-		slotManager.addFreeSlot(slot11);
-		slotManager.addFreeSlot(slot21);
+		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
+		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
+		final SlotReport slotReport = new SlotReport(slotStatus);
 
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
-		slotManager.requestSlot(new SlotRequest(new JobID(), new AllocationID(), DEFAULT_TESTING_PROFILE));
+		final Executor mainThreadExecutor = mock(Executor.class);
 
-		assertEquals(2, slotManager.getAllocatedSlotCount());
-		assertEquals(0, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
+		try (SlotManager slotManager = new SlotManager(
+			TestingUtils.defaultScheduledExecutor(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			Time.milliseconds(tmTimeout))) {
 
-		slotManager.addFreeSlot(slot12);
-		slotManager.addFreeSlot(slot22);
+			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
 
-		assertEquals(2, slotManager.getAllocatedSlotCount());
-		assertEquals(2, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
+			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
-		slotManager.notifyTaskManagerFailure(resource2);
+			ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
 
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
+			verify(mainThreadExecutor, timeout(tmTimeout * 10L)).execute(runnableArgumentCaptor.capture());
 
-		// notify an not exist resource failure
-		slotManager.notifyTaskManagerFailure(ResourceID.generate());
+			// the only runnable being executed by the main thread executor should be the timeout runnable
+			Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
 
-		assertEquals(1, slotManager.getAllocatedSlotCount());
-		assertEquals(1, slotManager.getFreeSlotCount());
-		assertEquals(0, slotManager.getPendingRequestCount());
-	}
+			timeoutRunnable.run();
 
-	// ------------------------------------------------------------------------
-	//  testing utilities
-	// ------------------------------------------------------------------------
-
-	private void directlyProvideFreeSlots(
-		final SlotManager slotManager,
-		final ResourceProfile resourceProfile,
-		final int freeSlotNum)
-	{
-		for (int i = 0; i < freeSlotNum; ++i) {
-			slotManager.addFreeSlot(new ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile), taskExecutorRegistration));
+			verify(resourceManagerActions, times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()));
 		}
 	}
 
-	// ------------------------------------------------------------------------
-	//  testing classes
-	// ------------------------------------------------------------------------
+	/**
+	 * Tests that slot requests time out after the specified request timeout. If a slot request
+	 * times out, then the request is cancelled, removed from the slot manager and the resourc
+	 * manager is notified about the failed allocation.
+	 */
+	@Test
+	public void testSlotRequestTimeout() throws Exception {
+		final long allocationTimeout = 50L;
+
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final UUID leaderId = UUID.randomUUID();
+		final JobID jobId = new JobID();
+		final AllocationID allocationId = new AllocationID();
+
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
+		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
 
-	private static class TestingSlotManager extends SlotManager {
+		final Executor mainThreadExecutor = mock(Executor.class);
 
-		private static TestingRmServices testingRmServices = new TestingRmServices();
+		try (SlotManager slotManager = new SlotManager(
+			TestingUtils.defaultScheduledExecutor(),
+			TestingUtils.infiniteTime(),
+			Time.milliseconds(allocationTimeout),
+			TestingUtils.infiniteTime())) {
 
-		TestingSlotManager() {
-			super(testingRmServices);
-			testingRmServices.allocatedContainers.clear();
+			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
+
+			assertTrue(slotManager.registerSlotRequest(slotRequest));
+
+			ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
+
+			verify(mainThreadExecutor, timeout(allocationTimeout * 10L)).execute(runnableArgumentCaptor.capture());
+
+			// the only runnable being executed by the main thread executor should be the timeout runnable
+			Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
+
+			timeoutRunnable.run();
+
+			verify(resourceManagerActions, times(1)).notifyAllocationFailure(
+				eq(jobId),
+				eq(allocationId),
+				any(TimeoutException.class));
 		}
+	}
 
-		/**
-		 * Choose slot randomly if it matches requirement
-		 *
-		 * @param request   The slot request
-		 * @param freeSlots All slots which can be used
-		 * @return The chosen slot or null if cannot find a match
-		 */
-		@Override
-		protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, ResourceSlot> freeSlots) {
-			for (ResourceSlot slot : freeSlots.values()) {
-				if (slot.isMatchingRequirement(request.getResourceProfile())) {
-					return slot;
-				}
+	/**
+	 * Tests that a slot request is retried if it times out on the task manager side
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testTaskManagerSlotRequestTimeoutHandling() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+
+		final JobID jobId = new JobID();
+		final AllocationID allocationId = new AllocationID();
+		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
+		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
+		final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = new FlinkCompletableFuture<>();
+		final FlinkCompletableFuture<Acknowledge> slotRequestFuture2 = new FlinkCompletableFuture<>();
+
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		when(taskExecutorGateway.requestSlot(
+			any(SlotID.class),
+			any(JobID.class),
+			eq(allocationId),
+			anyString(),
+			any(UUID.class),
+			any(Time.class))).thenReturn(slotRequestFuture1, slotRequestFuture2);
+
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+
+		final ResourceID resourceId = ResourceID.generate();
+		final SlotID slotId1 = new SlotID(resourceId, 0);
+		final SlotID slotId2 = new SlotID(resourceId, 1);
+		final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
+		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
+		final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
+
+		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+
+			slotManager.registerTaskManager(taskManagerConnection, slotReport);
+
+			slotManager.registerSlotRequest(slotRequest);
+
+			ArgumentCaptor<SlotID> slotIdCaptor = ArgumentCaptor.forClass(SlotID.class);
+
+			verify(taskExecutorGateway, times(1)).requestSlot(
+				slotIdCaptor.capture(),
+				eq(jobId),
+				eq(allocationId),
+				anyString(),
+				eq(leaderId),
+				any(Time.class));
+
+			TaskManagerSlot failedSlot = slotManager.getSlot(slotIdCaptor.getValue());
+
+			// let the first attempt fail --> this should trigger a second attempt
+			slotRequestFuture1.completeExceptionally(new SlotAllocationException("Test exception."));
+
+			verify(taskExecutorGateway, times(2)).requestSlot(
+				slotIdCaptor.capture(),
+				eq(jobId),
+				eq(allocationId),
+				anyString(),
+				eq(leaderId),
+				any(Time.class));
+
+			// the second attempt succeeds
+			slotRequestFuture2.complete(Acknowledge.get());
+
+			TaskManagerSlot slot = slotManager.getSlot(slotIdCaptor.getValue());
+
+			assertTrue(slot.isAllocated());
+			assertEquals(allocationId, slot.getAllocationId());
+
+			if (!failedSlot.getSlotId().equals(slot.getSlotId())) {
+				assertTrue(failedSlot.isFree());
 			}
-			return null;
 		}
+	}
 
-		/**
-		 * Choose request randomly if offered slot can match its requirement
-		 *
-		 * @param offeredSlot     The free slot
-		 * @param pendingRequests All the pending slot requests
-		 * @return The chosen request's AllocationID or null if cannot find a match
-		 */
-		@Override
-		protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot,
-			Map<AllocationID, SlotRequest> pendingRequests)
-		{
-			for (Map.Entry<AllocationID, SlotRequest> pendingRequest : pendingRequests.entrySet()) {
-				if (offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile())) {
-					return pendingRequest.getValue();
-				}
-			}
-			return null;
+	/**
+	 * Tests that pending slot requests are rejected if a slot report with a different allocation
+	 * is received.
+	 */
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testSlotReportWhileActiveSlotRequest() throws Exception {
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+
+		final JobID jobId = new JobID();
+		final AllocationID allocationId = new AllocationID();
+		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
+		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
+		final FlinkCompletableFuture<Acknowledge> slotRequestFuture1 = new FlinkCompletableFuture<>();
+
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		when(taskExecutorGateway.requestSlot(
+			any(SlotID.class),
+			any(JobID.class),
+			eq(allocationId),
+			anyString(),
+			any(UUID.class),
+			any(Time.class))).thenReturn(slotRequestFuture1, FlinkCompletableFuture.completed(Acknowledge.get()));
+
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+
+		final ResourceID resourceId = ResourceID.generate();
+		final SlotID slotId1 = new SlotID(resourceId, 0);
+		final SlotID slotId2 = new SlotID(resourceId, 1);
+		final SlotID slotId3 = new SlotID(resourceId, 2);
+		final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
+		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
+		final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
+
+		// we have to manually trigger the future call backs to simulate the main thread executor behaviour
+		final Executor mainThreadExecutorMock = mock(Executor.class);
+
+		try (SlotManager slotManager = new SlotManager(
+			TestingUtils.defaultScheduledExecutor(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime())) {
+
+			slotManager.start(leaderId, mainThreadExecutorMock, resourceManagerActions);
+
+			slotManager.registerTaskManager(taskManagerConnection, slotReport);
+
+			slotManager.registerSlotRequest(slotRequest);
+
+			ArgumentCaptor<SlotID> slotIdCaptor = ArgumentCaptor.forClass(SlotID.class);
+
+			verify(taskExecutorGateway, times(1)).requestSlot(
+				slotIdCaptor.capture(),
+				eq(jobId),
+				eq(allocationId),
+				anyString(),
+				eq(leaderId),
+				any(Time.class));
+
+			final SlotStatus newSlotStatus1 = new SlotStatus(slotIdCaptor.getValue(), resourceProfile, new JobID(), new AllocationID());
+			final SlotStatus newSlotStatus2 = new SlotStatus(slotId3, resourceProfile);
+			final SlotReport newSlotReport = new SlotReport(Arrays.asList(newSlotStatus1, newSlotStatus2));
+
+			// this should remove the unused slot, replacing it with slotId3 and retry the pending slot request
+			slotManager.reportSlotStatus(taskManagerConnection.getInstanceID(), newSlotReport);
+
+			ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
+			verify(mainThreadExecutorMock).execute(runnableArgumentCaptor.capture());
+
+			Runnable requestFailureRunnable = runnableArgumentCaptor.getValue();
+
+			requestFailureRunnable.run();
+
+			verify(taskExecutorGateway, times(2)).requestSlot(
+				slotIdCaptor.capture(),
+				eq(jobId),
+				eq(allocationId),
+				anyString(),
+				eq(leaderId),
+				any(Time.class));
+
+			verify(mainThreadExecutorMock, times(2)).execute(runnableArgumentCaptor.capture());
+			Runnable requestSuccessRunnable = runnableArgumentCaptor.getValue();
+
+			requestSuccessRunnable.run();
+
+			final SlotID requestedSlotId = slotIdCaptor.getValue();
+
+			assertEquals(slotId3, requestedSlotId);
+
+			TaskManagerSlot slot = slotManager.getSlot(requestedSlotId);
+
+			assertTrue(slot.isAllocated());
+			assertEquals(allocationId, slot.getAllocationId());
 		}
+	}
+
+	/**
+	 * Tests that formerly used task managers can again timeout after all of their slots have
+	 * been freed.
+	 */
+	@Test
+	public void testTimeoutForUnusedTaskManager() throws Exception {
+		final long taskManagerTimeout = 123456L;
+
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
+		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
+
+		final ResourceID resourceId = ResourceID.generate();
+
+		final JobID jobId = new JobID();
+		final AllocationID allocationId = new AllocationID();
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
+		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
+
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		when(taskExecutorGateway.requestSlot(
+			any(SlotID.class),
+			eq(jobId),
+			eq(allocationId),
+			anyString(),
+			eq(leaderId),
+			any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
+
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
+
+		final SlotID slotId1 = new SlotID(resourceId, 0);
+		final SlotID slotId2 = new SlotID(resourceId, 1);
+		final SlotStatus slotStatus1 = new SlotStatus(slotId1, resourceProfile);
+		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
+		final SlotReport initialSlotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
+
+		try (SlotManager slotManager = new SlotManager(
+			scheduledExecutor,
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			Time.of(taskManagerTimeout, TimeUnit.MILLISECONDS))) {
+
+			slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions);
+
+			slotManager.registerSlotRequest(slotRequest);
+
+			slotManager.registerTaskManager(taskManagerConnection, initialSlotReport);
+
+			ArgumentCaptor<SlotID> slotIdArgumentCaptor = ArgumentCaptor.forClass(SlotID.class);
+
+			verify(taskExecutorGateway).requestSlot(
+				slotIdArgumentCaptor.capture(),
+				eq(jobId),
+				eq(allocationId),
+				anyString(),
+				eq(leaderId),
+				any(Time.class));
+
+			assertFalse(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID()));
+
+			SlotID slotId = slotIdArgumentCaptor.getValue();
+			TaskManagerSlot slot = slotManager.getSlot(slotId);
+
+			assertTrue(slot.isAllocated());
+			assertEquals(allocationId, slot.getAllocationId());
+
+			slotManager.freeSlot(slotId, allocationId);
+
+			assertTrue(slotManager.hasTimeoutRegistered(taskManagerConnection.getInstanceID()));
+
+			ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
+
+			// filter out the schedule call for the task manager which will be registered using the
+			// taskManagerTimeout value
+			verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), eq(taskManagerTimeout), eq(TimeUnit.MILLISECONDS));
+
+			Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
 
-		List<ResourceProfile> getAllocatedContainers() {
-			return testingRmServices.allocatedContainers;
+			timeoutRunnable.run();
+
+			verify(resourceManagerActions, times(1)).releaseResource(eq(taskManagerConnection.getInstanceID()));
 		}
+	}
 
+	/**
+	 * Tests that the slot manager re-registers a timeout for a rejected slot request.
+	 */
+	@Test
+	public void testTimeoutForRejectedSlotRequest() throws Exception {
 
-		private static class TestingRmServices implements ResourceManagerServices {
+		final long slotRequestTimeout = 1337L;
+		final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
 
-			private final UUID leaderID;
+		final ResourceID resourceId = ResourceID.generate();
+		final SlotID slotId = new SlotID(resourceId, 0);
+		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
+		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
+		final SlotReport slotReport = new SlotReport(slotStatus);
 
-			private final List<ResourceProfile> allocatedContainers;
+		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
-			public TestingRmServices() {
-				this.leaderID = UUID.randomUUID();
-				this.allocatedContainers = new LinkedList<>();
-			}
+		final JobID jobId = new JobID();
+		final AllocationID allocationId = new AllocationID();
+		final AllocationID allocationId2 = new AllocationID();
+		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
 
-			@Override
-			public UUID getLeaderID() {
-				return leaderID;
-			}
+		CompletableFuture<Acknowledge> requestFuture = new FlinkCompletableFuture<>();
 
-			@Override
-			public void allocateResource(ResourceProfile resourceProfile) {
-				allocatedContainers.add(resourceProfile);
-			}
+		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+		when(taskExecutorGateway.requestSlot(
+			eq(slotId),
+			eq(jobId),
+			eq(allocationId),
+			anyString(),
+			eq(leaderId),
+			any(Time.class))).thenReturn(requestFuture);
 
-			@Override
-			public Executor getAsyncExecutor() {
-				return Mockito.mock(Executor.class);
-			}
+		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
-			@Override
-			public Executor getMainThreadExecutor() {
-				return Mockito.mock(Executor.class);
-			}
+		try (SlotManager slotManager = new SlotManager(
+			scheduledExecutor,
+			TestingUtils.infiniteTime(),
+			Time.milliseconds(slotRequestTimeout),
+			TestingUtils.infiniteTime())) {
+
+			slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions);
+
+			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
+			slotManager.registerSlotRequest(slotRequest);
+
+			verify(taskExecutorGateway).requestSlot(
+				eq(slotId),
+				eq(jobId),
+				eq(allocationId),
+				anyString(),
+				eq(leaderId),
+				any(Time.class));
+
+			requestFuture.completeExceptionally(new SlotOccupiedException("Slot is already occupied", allocationId2));
+
+			ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);
+			verify(scheduledExecutor).schedule(runnableArgumentCaptor.capture(), eq(slotRequestTimeout), eq(TimeUnit.MILLISECONDS));
+
+			Runnable timeoutRunnable = runnableArgumentCaptor.getValue();
+
+			timeoutRunnable.run();
+
+			verify(resourceManagerActions).notifyAllocationFailure(eq(jobId), eq(allocationId), any(Exception.class));
+
+			TaskManagerSlot slot = slotManager.getSlot(slotId);
+
+			assertTrue(slot.isAllocated());
+			assertEquals(allocationId2, slot.getAllocationId());
 		}
 	}
+
+	private SlotManager createSlotManager(UUID leaderId, ResourceManagerActions resourceManagerActions) {
+		SlotManager slotManager = new SlotManager(
+			TestingUtils.defaultScheduledExecutor(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime());
+
+		slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions);
+
+		return slotManager;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/59aefb57/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 37690b5..a72969e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -19,47 +19,29 @@ package org.apache.flink.runtime.resourcemanager.slotmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.heartbeat.HeartbeatServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
-import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
-import org.apache.flink.runtime.resourcemanager.ResourceManager;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
-import org.apache.flink.runtime.resourcemanager.TestingSlotManager;
-import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
-import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import static org.mockito.Matchers.any;
@@ -70,128 +52,75 @@ import static org.mockito.Mockito.verify;
 
 public class SlotProtocolTest extends TestLogger {
 
-	private static TestingSerialRpcService testRpcService;
+	private static final long timeout = 10000L;
+
+
+	private static ScheduledExecutorService scheduledExecutorService;
 
 	@BeforeClass
 	public static void beforeClass() {
-		testRpcService = new TestingSerialRpcService();
+		scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
 	}
 
 	@AfterClass
 	public static void afterClass() {
-		testRpcService.stopService();
-		testRpcService = null;
-	}
-
-	@Before
-	public void beforeTest(){
-		testRpcService.clearGateways();
+		Executors.gracefulShutdown(timeout, TimeUnit.MILLISECONDS, scheduledExecutorService);
 	}
 
 	/**
 	 * Tests whether
-	 * 1) SlotRequest is routed to the SlotManager
-	 * 2) SlotRequest is confirmed
-	 * 3) SlotRequest leads to a container allocation
-	 * 4) Slot becomes available and TaskExecutor gets a SlotRequest
+	 * 1) SlotManager accepts a slot request
+	 * 2) SlotRequest leads to a container allocation
+	 * 3) Slot becomes available and TaskExecutor gets a SlotRequest
 	 */
 	@Test
 	public void testSlotsUnavailableRequest() throws Exception {
-		final String rmAddress = "/rm1";
-		final String jmAddress = "/jm1";
 		final JobID jobID = new JobID();
-		final ResourceID rmResourceId = new ResourceID(rmAddress);
 		final ResourceID jmResourceId = new ResourceID(jmAddress);
 
-		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
-
-		final TestingHighAvailabilityServices testingHaServices = new TestingHighAvailabilityServices();
 		final UUID rmLeaderID = UUID.randomUUID();
-		final UUID jmLeaderID = UUID.randomUUID();
-		TestingLeaderElectionService rmLeaderElectionService =
-			configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
-
-		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
-			Time.seconds(5L),
-			Time.seconds(5L));
-
-		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
-			testingHaServices,
-			testRpcService.getScheduledExecutor(),
-			Time.seconds(5L));
-
-		final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
-
-		final HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
-
-		SpiedResourceManager resourceManager =
-			new SpiedResourceManager(
-				rmResourceId,
-				testRpcService,
-				resourceManagerConfiguration,
-				testingHaServices,
-				heartbeatServices,
-				slotManagerFactory,
-				mock(MetricRegistry.class),
-				jobLeaderIdService,
-				mock(FatalErrorHandler.class));
-		resourceManager.start();
-		rmLeaderElectionService.isLeader(rmLeaderID);
-
-		Future<RegistrationResponse> registrationFuture =
-			resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmResourceId, jmAddress, jobID);
-		try {
-			registrationFuture.get(5, TimeUnit.SECONDS);
-		} catch (Exception e) {
-			Assert.fail("JobManager registration Future didn't become ready.");
-		}
 
-		final SlotManager slotManager = slotManagerFactory.slotManager;
+		try (SlotManager slotManager = new SlotManager(
+			new ScheduledExecutorServiceAdapter(scheduledExecutorService),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime())) {
 
-		final AllocationID allocationID = new AllocationID();
-		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
+			ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
-		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
-		RMSlotRequestReply slotRequestReply =
-			resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest);
+			slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions);
 
-		// 1) SlotRequest is routed to the SlotManager
-		verify(slotManager).requestSlot(slotRequest);
+			final AllocationID allocationID = new AllocationID();
+			final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
+			final String targetAddress = "foobar";
 
-		// 2) SlotRequest is confirmed
-		Assert.assertEquals(
-			slotRequestReply.getAllocationID(),
-			allocationID);
+			SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile, targetAddress);
 
-		// 3) SlotRequest leads to a container allocation
-		Assert.assertEquals(1, resourceManager.startNewWorkerCalled);
+			slotManager.registerSlotRequest(slotRequest);
 
-		Assert.assertFalse(slotManager.isAllocated(allocationID));
+			verify(resourceManagerActions).allocateResource(eq(slotRequest.getResourceProfile()));
 
-		// slot becomes available
-		final String tmAddress = "/tm1";
-		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
-		Mockito
-			.when(
+			// slot becomes available
+			TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
+			Mockito.when(
 				taskExecutorGateway
 					.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
-			.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
-		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
-
-		final ResourceID resourceID = ResourceID.generate();
-		final SlotID slotID = new SlotID(resourceID, 0);
-
-		final SlotStatus slotStatus =
-			new SlotStatus(slotID, resourceProfile);
-		final SlotReport slotReport =
-			new SlotReport(Collections.singletonList(slotStatus));
-		// register slot at SlotManager
-		slotManager.registerTaskExecutor(
-			resourceID, new TaskExecutorRegistration(taskExecutorGateway), slotReport);
-
-		// 4) Slot becomes available and TaskExecutor gets a SlotRequest
-		verify(taskExecutorGateway, timeout(5000))
-			.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class));
+				.thenReturn(mock(FlinkFuture.class));
+
+			final ResourceID resourceID = ResourceID.generate();
+			final SlotID slotID = new SlotID(resourceID, 0);
+
+			final SlotStatus slotStatus =
+				new SlotStatus(slotID, resourceProfile);
+			final SlotReport slotReport =
+				new SlotReport(Collections.singletonList(slotStatus));
+			// register slot at SlotManager
+			slotManager.registerTaskManager(new TaskExecutorConnection(taskExecutorGateway), slotReport);
+
+			// 4) Slot becomes available and TaskExecutor gets a SlotRequest
+			verify(taskExecutorGateway, timeout(5000L))
+				.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class));
+		}
 	}
 
 	/**
@@ -203,159 +132,48 @@ public class SlotProtocolTest extends TestLogger {
 	 */
 	@Test
 	public void testSlotAvailableRequest() throws Exception {
-		final String rmAddress = "/rm1";
-		final String jmAddress = "/jm1";
-		final String tmAddress = "/tm1";
 		final JobID jobID = new JobID();
-		final ResourceID rmResourceId = new ResourceID(rmAddress);
 		final ResourceID jmResourceId = new ResourceID(jmAddress);
 
-		testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class));
-
-		final TestingHighAvailabilityServices testingHaServices = new TestingHighAvailabilityServices();
 		final UUID rmLeaderID = UUID.randomUUID();
-		final UUID jmLeaderID = UUID.randomUUID();
-		TestingLeaderElectionService rmLeaderElectionService =
-			configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
 
 		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 		Mockito.when(
 			taskExecutorGateway
 				.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
-			.thenReturn(new FlinkCompletableFuture<TMSlotRequestReply>());
-		testRpcService.registerGateway(tmAddress, taskExecutorGateway);
-
-		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(
-			Time.seconds(5L),
-			Time.seconds(5L));
-
-		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
-			testingHaServices,
-			testRpcService.getScheduledExecutor(),
-			Time.seconds(5L));
-
-		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
-
-		HeartbeatServices heartbeatServices = mock(HeartbeatServices.class);
-
-		ResourceManager<ResourceID> resourceManager =
-			Mockito.spy(new StandaloneResourceManager(
-				testRpcService,
-				FlinkResourceManager.RESOURCE_MANAGER_NAME,
-				rmResourceId,
-				resourceManagerConfiguration,
-				testingHaServices,
-				heartbeatServices,
-				slotManagerFactory,
-				mock(MetricRegistry.class),
-				jobLeaderIdService,
-				mock(FatalErrorHandler.class)));
-		resourceManager.start();
-		rmLeaderElectionService.isLeader(rmLeaderID);
-
-		Thread.sleep(1000);
-
-		Future<RegistrationResponse> registrationFuture =
-			resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmResourceId, jmAddress, jobID);
-		try {
-			registrationFuture.get(5L, TimeUnit.SECONDS);
-		} catch (Exception e) {
-			Assert.fail("JobManager registration Future didn't become ready.");
-		}
-
-		final SlotManager slotManager = slotManagerFactory.slotManager;
+			.thenReturn(mock(FlinkFuture.class));
 
-		final ResourceID resourceID = ResourceID.generate();
-		final AllocationID allocationID = new AllocationID();
-		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
-		final SlotID slotID = new SlotID(resourceID, 0);
+		try (SlotManager slotManager = new SlotManager(
+			new ScheduledExecutorServiceAdapter(scheduledExecutorService),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime(),
+			TestingUtils.infiniteTime())) {
 
-		final SlotStatus slotStatus =
-			new SlotStatus(slotID, resourceProfile);
-		final SlotReport slotReport =
-			new SlotReport(Collections.singletonList(slotStatus));
-		// register slot at SlotManager
-		slotManager.registerTaskExecutor(
-			resourceID, new TaskExecutorRegistration(taskExecutorGateway), slotReport);
+			ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
-		SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile);
-		RMSlotRequestReply slotRequestReply =
-			resourceManager.requestSlot(jmLeaderID, rmLeaderID, slotRequest);
+			slotManager.start(rmLeaderID, Executors.directExecutor(), resourceManagerActions);
 
-		// 1) a SlotRequest is routed to the SlotManager
-		verify(slotManager).requestSlot(slotRequest);
+			final ResourceID resourceID = ResourceID.generate();
+			final AllocationID allocationID = new AllocationID();
+			final ResourceProfile resourceProfile = new ResourceProfile(1.0, 100);
+			final SlotID slotID = new SlotID(resourceID, 0);
 
-		// 2) a SlotRequest is confirmed
-		Assert.assertEquals(
-			slotRequestReply.getAllocationID(),
-			allocationID);
-
-		// 3) a SlotRequest leads to an allocation of a registered slot
-		Assert.assertTrue(slotManager.isAllocated(slotID));
-		Assert.assertTrue(slotManager.isAllocated(allocationID));
-
-		// 4) a SlotRequest is routed to the TaskExecutor
-		verify(taskExecutorGateway, timeout(5000))
-			.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class));
-	}
-
-	private static TestingLeaderElectionService configureHA(
-			TestingHighAvailabilityServices testingHA, JobID jobID, String rmAddress, UUID rmID, String jmAddress, UUID jmID) {
-		final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
-		testingHA.setResourceManagerLeaderElectionService(rmLeaderElectionService);
-		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(rmAddress, rmID);
-		testingHA.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
-
-		final TestingLeaderElectionService jmLeaderElectionService = new TestingLeaderElectionService();
-		testingHA.setJobMasterLeaderElectionService(jobID, jmLeaderElectionService);
-		final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jmAddress, jmID);
-		testingHA.setJobMasterLeaderRetriever(jobID, jmLeaderRetrievalService);
-
-		return rmLeaderElectionService;
-	}
-
-	private static class SpiedResourceManager extends StandaloneResourceManager {
-
-		private int startNewWorkerCalled = 0;
-
-		public SpiedResourceManager(
-				ResourceID resourceId,
-				RpcService rpcService,
-				ResourceManagerConfiguration resourceManagerConfiguration,
-				HighAvailabilityServices highAvailabilityServices,
-				HeartbeatServices heartbeatServices,
-				SlotManagerFactory slotManagerFactory,
-				MetricRegistry metricRegistry,
-				JobLeaderIdService jobLeaderIdService,
-				FatalErrorHandler fatalErrorHandler) {
-			super(
-				rpcService,
-				FlinkResourceManager.RESOURCE_MANAGER_NAME,
-				resourceId,
-				resourceManagerConfiguration,
-				highAvailabilityServices,
-				heartbeatServices,
-				slotManagerFactory,
-				metricRegistry,
-				jobLeaderIdService,
-				fatalErrorHandler);
-		}
-
-
-		@Override
-		public void startNewWorker(ResourceProfile resourceProfile) {
-			startNewWorkerCalled++;
-		}
-	}
+			final SlotStatus slotStatus =
+				new SlotStatus(slotID, resourceProfile);
+			final SlotReport slotReport =
+				new SlotReport(Collections.singletonList(slotStatus));
+			// register slot at SlotManager
+			slotManager.registerTaskManager(
+				new TaskExecutorConnection(taskExecutorGateway), slotReport);
 
-	private static class TestingSlotManagerFactory implements SlotManagerFactory {
+			final String targetAddress = "foobar";
 
-		private SlotManager slotManager;
+			SlotRequest slotRequest = new SlotRequest(jobID, allocationID, resourceProfile, targetAddress);
+			slotManager.registerSlotRequest(slotRequest);
 
-		@Override
-		public SlotManager create(ResourceManagerServices rmServices) {
-			this.slotManager = Mockito.spy(new TestingSlotManager(rmServices));
-			return this.slotManager;
+			// a SlotRequest is routed to the TaskExecutor
+			verify(taskExecutorGateway, timeout(5000))
+				.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class));
 		}
 	}
 }


Mime
View raw message