flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-8504] [flip6] Deregister jobs from the JobLeaderService when no more slots allocated
Date Tue, 30 Jan 2018 21:55:55 GMT
Repository: flink
Updated Branches:
  refs/heads/master 0e20b6130 -> e94a488dd


[FLINK-8504] [flip6] Deregister jobs from the JobLeaderService when no more slots allocated

Let the TaskExecutor deregister jobs from the JobLeaderService once it has no more slots
for this job allocated.

This closes #5361.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e94a488d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e94a488d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e94a488d

Branch: refs/heads/master
Commit: e94a488dd78e7c2efdf55a67cea886ee15a641a6
Parents: 23ff120
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Jan 25 13:50:43 2018 +0100
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Jan 30 18:30:17 2018 +0100

----------------------------------------------------------------------
 .../runtime/taskexecutor/JobLeaderService.java  |  40 +++---
 .../runtime/taskexecutor/TaskExecutor.java      |  80 ++++++------
 .../taskexecutor/slot/TaskSlotTable.java        |  45 ++++++-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 121 ++++++++++++++++++-
 4 files changed, 223 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 3b4da4e..5376362 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -37,11 +38,11 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 
 /**
@@ -80,7 +81,9 @@ public class JobLeaderService {
 	public JobLeaderService(TaskManagerLocation location) {
 		this.ownLocation = Preconditions.checkNotNull(location);
 
-		jobLeaderServices = new HashMap<>(4);
+		// Has to be a concurrent hash map because tests might access this service
+		// concurrently via containsJob
+		jobLeaderServices = new ConcurrentHashMap<>(4);
 
 		state = JobLeaderService.State.CREATED;
 
@@ -147,18 +150,6 @@ public class JobLeaderService {
 	}
 
 	/**
-	 * Check whether the service monitors the given job.
-	 *
-	 * @param jobId identifying the job
-	 * @return True if the given job is monitored; otherwise false
-	 */
-	public boolean containsJob(JobID jobId) {
-		Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently
not running.");
-
-		return jobLeaderServices.containsKey(jobId);
-	}
-
-	/**
 	 * Remove the given job from being monitored by the job leader service.
 	 *
 	 * @param jobId identifying the job to remove from monitoring
@@ -199,9 +190,9 @@ public class JobLeaderService {
 
 		JobLeaderService.JobManagerLeaderListener jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
 
-		leaderRetrievalService.start(jobManagerLeaderListener);
-
 		jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, jobManagerLeaderListener));
+
+		leaderRetrievalService.start(jobManagerLeaderListener);
 	}
 
 	/**
@@ -435,4 +426,21 @@ public class JobLeaderService {
 	private enum State {
 		CREATED, STARTED, STOPPED
 	}
+
+	// -----------------------------------------------------------
+	// Testing methods
+	// -----------------------------------------------------------
+
+	/**
+	 * Check whether the service monitors the given job.
+	 *
+	 * @param jobId identifying the job
+	 * @return True if the given job is monitored; otherwise false
+	 */
+	@VisibleForTesting
+	public boolean containsJob(JobID jobId) {
+		Preconditions.checkState(JobLeaderService.State.STARTED == state, "The service is currently
not running.");
+
+		return jobLeaderServices.containsKey(jobId);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index ad7414c..9df2e88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -93,7 +93,6 @@ import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -163,7 +162,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 
 	// --------- job manager connections -----------
 
-	private Map<ResourceID, JobManagerConnection> jobManagerConnections;
+	private final Map<ResourceID, JobManagerConnection> jobManagerConnections;
 
 	// --------- task slot allocation table -----------
 
@@ -195,7 +194,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 			JobLeaderService jobLeaderService,
 			FatalErrorHandler fatalErrorHandler) {
 
-		super(rpcService, AkkaRpcServiceUtils.createRandomName(TaskExecutor.TASK_MANAGER_NAME));
+		super(rpcService, AkkaRpcServiceUtils.createRandomName(TASK_MANAGER_NAME));
 
 		checkArgument(taskManagerConfiguration.getNumberSlots() > 0, "The number of slots has
to be larger than 0.");
 
@@ -978,10 +977,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 			ResourceID resourceID,
 			JobMasterGateway jobMasterGateway,
 			int blobPort) {
-		Preconditions.checkNotNull(jobID);
-		Preconditions.checkNotNull(resourceID);
-		Preconditions.checkNotNull(jobMasterGateway);
-		Preconditions.checkArgument(blobPort > 0 && blobPort < MAX_BLOB_PORT, "Blob
server port is out of range.");
+		checkNotNull(jobID);
+		checkNotNull(resourceID);
+		checkNotNull(jobMasterGateway);
+		checkArgument(blobPort > 0 && blobPort < MAX_BLOB_PORT, "Blob server port
is out of range.");
 
 		TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway);
 
@@ -1029,7 +1028,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 	}
 
 	private void disassociateFromJobManager(JobManagerConnection jobManagerConnection, Exception
cause) throws IOException {
-		Preconditions.checkNotNull(jobManagerConnection);
+		checkNotNull(jobManagerConnection);
 		JobMasterGateway jobManagerGateway = jobManagerConnection.getJobManagerGateway();
 		jobManagerGateway.disconnectTaskManager(getResourceID(), cause);
 		jobManagerConnection.getLibraryCacheManager().shutdown();
@@ -1104,36 +1103,40 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 	}
 
 	private void freeSlotInternal(AllocationID allocationId, Throwable cause) {
-		Preconditions.checkNotNull(allocationId);
+		checkNotNull(allocationId);
 
 		try {
-			TaskSlot taskSlot = taskSlotTable.freeSlot(allocationId, cause);
+			final JobID jobId = taskSlotTable.getOwningJob(allocationId);
 
-			if (taskSlot != null && isConnectedToResourceManager()) {
-				// the slot was freed. Tell the RM about it
-				ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
+			final int slotIndex = taskSlotTable.freeSlot(allocationId, cause);
 
-				resourceManagerGateway.notifySlotAvailable(
-					resourceManagerConnection.getRegistrationId(),
-					new SlotID(getResourceID(), taskSlot.getIndex()),
-					allocationId);
+			if (slotIndex != -1) {
 
-				// check whether we still have allocated slots for the same job
-				final JobID jobId = taskSlot.getJobId();
-				final Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
+				if (isConnectedToResourceManager()) {
+					// the slot was freed. Tell the RM about it
+					ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway();
 
-				if (!tasks.hasNext()) {
-					// we can remove the job from the job leader service
-					try {
-						jobLeaderService.removeJob(jobId);
-					} catch (Exception e) {
-						log.info("Could not remove job {} from JobLeaderService.", jobId, e);
-					}
+					resourceManagerGateway.notifySlotAvailable(
+						resourceManagerConnection.getRegistrationId(),
+						new SlotID(getResourceID(), slotIndex),
+						allocationId);
+				}
 
-					closeJobManagerConnection(
-						jobId,
-						new FlinkException("TaskExecutor " + getAddress() +
-							" has no more allocated slots for job " + jobId + '.'));
+				if (jobId != null) {
+					// check whether we still have allocated slots for the same job
+					if (taskSlotTable.getAllocationIdsPerJob(jobId).isEmpty()) {
+						// we can remove the job from the job leader service
+						try {
+							jobLeaderService.removeJob(jobId);
+						} catch (Exception e) {
+							log.info("Could not remove job {} from JobLeaderService.", jobId, e);
+						}
+
+						closeJobManagerConnection(
+							jobId,
+							new FlinkException("TaskExecutor " + getAddress() +
+								" has no more allocated slots for job " + jobId + '.'));
+					}
 				}
 			}
 		} catch (SlotNotFoundException e) {
@@ -1141,13 +1144,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 		}
 	}
 
-	private void freeSlotInternal(AllocationID allocationId) {
-		freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " is being freed."));
-	}
-
 	private void timeoutSlot(AllocationID allocationId, UUID ticket) {
-		Preconditions.checkNotNull(allocationId);
-		Preconditions.checkNotNull(ticket);
+		checkNotNull(allocationId);
+		checkNotNull(ticket);
 
 		if (taskSlotTable.isValidTimeout(allocationId, ticket)) {
 			freeSlotInternal(allocationId, new Exception("The slot " + allocationId + " has timed
out."));
@@ -1285,7 +1284,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 		private final JobMasterGateway jobMasterGateway;
 
 		private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) {
-			this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
+			this.jobMasterGateway = checkNotNull(jobMasterGateway);
 		}
 
 		@Override
@@ -1318,7 +1317,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 
 		@Override
 		public void freeSlot(final AllocationID allocationId) {
-			runAsync(() -> TaskExecutor.this.freeSlotInternal(allocationId));
+			runAsync(() ->
+				freeSlotInternal(
+					allocationId,
+					new FlinkException("TaskSlotTable requested freeing the TaskSlot " + allocationId +
'.')));
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index fcb2761..f8f9164 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -133,6 +133,22 @@ public class TaskSlotTable implements TimeoutListener<AllocationID>
{
 		slotActions = null;
 	}
 
+	/**
+	 * Returns the all {@link AllocationID} for the given job.
+	 *
+	 * @param jobId for which to return the set of {@link AllocationID}.
+	 * @return Set of {@link AllocationID} for the given job
+	 */
+	public Set<AllocationID> getAllocationIdsPerJob(JobID jobId) {
+		final Set<AllocationID> allocationIds = slotsPerJob.get(jobId);
+
+		if (allocationIds == null) {
+			return Collections.emptySet();
+		} else {
+			return Collections.unmodifiableSet(allocationIds);
+		}
+	}
+
 	// ---------------------------------------------------------------------
 	// Slot report methods
 	// ---------------------------------------------------------------------
@@ -268,7 +284,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID>
{
 	 * @throws SlotNotFoundException if there is not task slot for the given allocation id
 	 * @return Index of the freed slot if the slot could be freed; otherwise -1
 	 */
-	public TaskSlot freeSlot(AllocationID allocationId) throws SlotNotFoundException {
+	public int freeSlot(AllocationID allocationId) throws SlotNotFoundException {
 		return freeSlot(allocationId, new Exception("The task slot of this task is being freed."));
 	}
 
@@ -282,8 +298,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID>
{
 	 * @throws SlotNotFoundException if there is not task slot for the given allocation id
 	 * @return The freed TaskSlot. If the TaskSlot cannot be freed then null.
 	 */
-	@Nullable
-	public TaskSlot freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException
{
+	public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException
{
 		checkInit();
 
 		TaskSlot taskSlot = getTaskSlot(allocationId);
@@ -317,7 +332,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID>
{
 					slotsPerJob.remove(jobId);
 				}
 
-				return taskSlot;
+				return taskSlot.getIndex();
 			} else {
 				// we couldn't free the task slot because it still contains task, fail the tasks
 				// and set the slot state to releasing so that it gets eventually freed
@@ -329,7 +344,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID>
{
 					taskIterator.next().failExternally(cause);
 				}
 
-				return null;
+				return -1;
 			}
 		} else {
 			throw new SlotNotFoundException(allocationId);
@@ -422,6 +437,25 @@ public class TaskSlotTable implements TimeoutListener<AllocationID>
{
 		return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE);
 	}
 
+	/**
+	 * Returns the owning job of the {@link TaskSlot} identified by the
+	 * given {@link AllocationID}.
+	 *
+	 * @param allocationId identifying the slot for which to retrieve the owning job
+	 * @return Owning job of the specified {@link TaskSlot} or null if there is no slot for
+	 * the given allocation id or if the slot has no owning job assigned
+	 */
+	@Nullable
+	public JobID getOwningJob(AllocationID allocationId) {
+		final TaskSlot taskSlot = getTaskSlot(allocationId);
+
+		if (taskSlot != null) {
+			return taskSlot.getJobId();
+		} else {
+			return null;
+		}
+	}
+
 	// ---------------------------------------------------------------------
 	// Task methods
 	// ---------------------------------------------------------------------
@@ -538,6 +572,7 @@ public class TaskSlotTable implements TimeoutListener<AllocationID>
{
 	// Internal methods
 	// ---------------------------------------------------------------------
 
+	@Nullable
 	private TaskSlot getTaskSlot(AllocationID allocationId) {
 		Preconditions.checkNotNull(allocationId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e94a488d/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 0c3adae..efd27f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -61,12 +61,14 @@ import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
@@ -89,6 +91,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
@@ -106,7 +109,6 @@ import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.slf4j.Logger;
 
-import java.io.File;
 import java.net.InetAddress;
 import java.util.Arrays;
 import java.util.Collection;
@@ -122,6 +124,7 @@ import java.util.concurrent.TimeoutException;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -143,7 +146,6 @@ import static org.mockito.Mockito.when;
 public class TaskExecutorTest extends TestLogger {
 
 	private final Time timeout = Time.milliseconds(10000L);
-	private final File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
 	private TimerService<AllocationID> timerService;
 
@@ -1034,7 +1036,6 @@ public class TaskExecutorTest extends TestLogger {
 			mock(NetworkEnvironment.class),
 			haServices,
 			mock(HeartbeatServices.class, RETURNS_MOCKS),
-
 			mock(TaskManagerMetricGroup.class),
 			mock(BroadcastVariableManager.class),
 			mock(FileCache.class),
@@ -1544,6 +1545,120 @@ public class TaskExecutorTest extends TestLogger {
 	}
 
 	/**
+	 * Tests that a job is removed from the JobLeaderService once a TaskExecutor has
+	 * no more slots assigned to this job.
+	 *
+	 * <p>See FLINK-8504
+	 */
+	@Test
+	public void testRemoveJobFromJobLeaderService() throws Exception {
+		final Configuration configuration = new Configuration();
+		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+		final LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
+		final JobLeaderService jobLeaderService = new JobLeaderService(localTaskManagerLocation);
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+		final TaskSlotTable taskSlotTable = new TaskSlotTable(
+			Collections.singleton(ResourceProfile.UNKNOWN),
+			timerService);
+
+		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+
+		final TestingLeaderRetrievalService resourceManagerLeaderRetriever = new TestingLeaderRetrievalService();
+		haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
+
+		final TaskExecutor taskExecutor = new TaskExecutor(
+			rpc,
+			taskManagerConfiguration,
+			localTaskManagerLocation,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			new TaskExecutorLocalStateStoresManager(),
+			mock(NetworkEnvironment.class),
+			haServices,
+			new HeartbeatServices(1000L, 1000L),
+			UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(),
+			new BroadcastVariableManager(),
+			mock(FileCache.class),
+			taskSlotTable,
+			new JobManagerTable(),
+			jobLeaderService,
+			testingFatalErrorHandler);
+
+		try {
+			final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
+			final ResourceManagerId resourceManagerId = resourceManagerGateway.getFencingToken();
+
+			rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway);
+			resourceManagerLeaderRetriever.notifyListener(resourceManagerGateway.getAddress(), resourceManagerId.toUUID());
+
+			final JobID jobId = new JobID();
+
+			final CompletableFuture<LeaderRetrievalListener> startFuture = new CompletableFuture<>();
+			final CompletableFuture<Void> stopFuture = new CompletableFuture<>();
+
+			final StartStopNotifyingLeaderRetrievalService jobMasterLeaderRetriever = new StartStopNotifyingLeaderRetrievalService(
+				startFuture,
+				stopFuture);
+			haServices.setJobMasterLeaderRetriever(jobId, jobMasterLeaderRetriever);
+
+			taskExecutor.start();
+
+			final TaskExecutorGateway taskExecutorGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
+
+			final SlotID slotId = new SlotID(localTaskManagerLocation.getResourceID(), 0);
+			final AllocationID allocationId = new AllocationID();
+
+			assertThat(startFuture.isDone(), is(false));
+			assertThat(jobLeaderService.containsJob(jobId), is(false));
+
+			taskExecutorGateway.requestSlot(
+				slotId,
+				jobId,
+				allocationId,
+				"foobar",
+				resourceManagerId,
+				timeout).get();
+
+			// wait until the job leader retrieval service for jobId is started
+			startFuture.get();
+			assertThat(jobLeaderService.containsJob(jobId), is(true));
+
+			taskExecutorGateway.freeSlot(allocationId, new FlinkException("Test exception"), timeout).get();
+
+			// wait that the job leader retrieval service for jobId stopped becaue it should get removed
+			stopFuture.get();
+			assertThat(jobLeaderService.containsJob(jobId), is(false));
+
+			testingFatalErrorHandler.rethrowError();
+		} finally {
+			RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
+		}
+	}
+
+	private static final class StartStopNotifyingLeaderRetrievalService implements LeaderRetrievalService
{
+		private final CompletableFuture<LeaderRetrievalListener> startFuture;
+
+		private final CompletableFuture<Void> stopFuture;
+
+		private StartStopNotifyingLeaderRetrievalService(
+				CompletableFuture<LeaderRetrievalListener> startFuture,
+				CompletableFuture<Void> stopFuture) {
+			this.startFuture = startFuture;
+			this.stopFuture = stopFuture;
+		}
+
+		@Override
+		public void start(LeaderRetrievalListener listener) throws Exception {
+			startFuture.complete(listener);
+		}
+
+		@Override
+		public void stop() throws Exception {
+			stopFuture.complete(null);
+		}
+	}
+
+	/**
 	 * Special {@link HeartbeatServices} which creates a {@link RecordingHeartbeatManagerImpl}.
 	 */
 	private static final class RecordingHeartbeatServices extends HeartbeatServices {


Mime
View raw message