flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/5] flink git commit: [FLINK-4853] [rm] Clean up job manager registration at the resource manager
Date Sun, 23 Oct 2016 09:18:15 GMT
Repository: flink
Updated Branches:
  refs/heads/flip-6 0de568963 -> 80b6c2a01


[FLINK-4853] [rm] Clean up job manager registration at the resource manager

Introduce the JobLeaderIdService which automatically retrieves the current job leader id.
This job leader id is used to validate job manager registartion attempts. Additionally, it
is used to disconnect old job leaders from the resource manager.

Add comments

This closes #2657.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0e965ae3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0e965ae3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0e965ae3

Branch: refs/heads/flip-6
Commit: 0e965ae3e00454575a8920fce7b97842fdb9e3a9
Parents: f38bf44
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Tue Oct 18 18:03:00 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Sun Oct 23 11:11:52 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      |  10 +-
 .../runtime/jobmaster/JobMasterGateway.java     |  12 +
 .../resourcemanager/JobLeaderIdActions.java     |  51 ++
 .../resourcemanager/JobLeaderIdService.java     | 285 ++++++++++
 .../resourcemanager/ResourceManager.java        | 538 ++++++++++---------
 .../resourcemanager/ResourceManagerGateway.java |   2 +-
 .../StandaloneResourceManager.java              |   5 +-
 .../exceptions/ResourceManagerRunner.java       |   5 +-
 .../registration/JobManagerRegistration.java    |  58 ++
 .../registration/JobMasterRegistration.java     |  62 ---
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |   2 -
 .../resourcemanager/ResourceManagerHATest.java  |   8 +-
 .../ResourceManagerJobMasterTest.java           |  14 +-
 .../ResourceManagerTaskExecutorTest.java        |   2 +
 .../slotmanager/SlotProtocolTest.java           |  12 +-
 15 files changed, 736 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 49c4df1..8df3a3e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -758,6 +758,14 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 		}
 	}
 
+	@RpcMethod
+	public void disconnectResourceManager(
+			final UUID jobManagerLeaderId,
+			final UUID resourceManagerLeaderId,
+			final Exception cause) {
+		// TODO: Implement disconnect behaviour
+	}
+
 	//----------------------------------------------------------------------------------------------
 	// Internal methods
 	//----------------------------------------------------------------------------------------------
@@ -948,7 +956,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> {
 						long timeoutMillis) throws Exception
 				{
 					Time timeout = Time.milliseconds(timeoutMillis);
-					return gateway.registerJobMaster(leaderId, jobManagerLeaderID, getAddress(), jobID, timeout);
+					return gateway.registerJobManager(leaderId, jobManagerLeaderID, getAddress(), jobID, timeout);
 				}
 			};
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index 27308d3..a47623b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -130,6 +130,18 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	void disconnectTaskManager(ResourceID resourceID);
 
 	/**
+	 * Disconnects the resource manager from the job manager because of the given cause.
+	 *
+	 * @param jobManagerLeaderId identifying the job manager leader id
+	 * @param resourceManagerLeaderId identifying the resource manager leader id
+	 * @param cause of the disconnect
+	 */
+	void disconnectResourceManager(
+		final UUID jobManagerLeaderId,
+		final UUID resourceManagerLeaderId,
+		final Exception cause);
+
+	/**
 	 * Requests a {@link KvStateLocation} for the specified {@link KvState} registration name.
 	 *
 	 * @param registrationName Name under which the KvState has been registered.

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
new file mode 100644
index 0000000..58777ef
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.UUID;
+
+/**
+ * Interface for actions called by the {@link JobLeaderIdService}.
+ */
+public interface JobLeaderIdActions {
+
+	/**
+	 * Callback when a monitored job leader lost its leadership.
+	 *
+	 * @param jobId identifying the job whose leader lost leadership
+	 * @param oldJobLeaderId of the job manager which lost leadership
+	 */
+	void jobLeaderLostLeadership(JobID jobId, UUID oldJobLeaderId);
+
+	/**
+	 * Request to remove the job from the {@link JobLeaderIdService}.
+	 *
+	 * @param jobId identifying the job to remove
+	 */
+	void removeJob(JobID jobId);
+
+	/**
+	 * Callback to report occurring errors.
+	 *
+	 * @param error which has occurred
+	 */
+	void handleError(Throwable error);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
new file mode 100644
index 0000000..6c7e249
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Service which retrieves for a registered job the current job leader id (the leader id of the
+ * job manager responsible for the job). The leader id will be exposed as a future via the
+ * {@link #getLeaderId(JobID)}. The future will only be completed with an exception in case
+ * the service will be stopped.
+ */
+public class JobLeaderIdService {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobLeaderIdService.class);
+
+	/** High availability services to use by this service */
+	private final HighAvailabilityServices highAvailabilityServices;
+
+	/** Registry to retrieve running jobs */
+	private final RunningJobsRegistry runningJobsRegistry;
+
+	/** Map of currently monitored jobs */
+	private final Map<JobID, JobLeaderIdListener> jobLeaderIdListeners;
+
+	/** Actions to call when the job leader changes */
+	private JobLeaderIdActions jobLeaderIdActions;
+
+	public JobLeaderIdService(HighAvailabilityServices highAvailabilityServices) throws Exception {
+		this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityServices);
+
+		this.runningJobsRegistry = highAvailabilityServices.getRunningJobsRegistry();
+
+		jobLeaderIdListeners = new HashMap<>(4);
+
+		jobLeaderIdActions = null;
+	}
+
+	/**
+	 * Start the service with the given job leader actions.
+	 *
+	 * @param initialJobLeaderIdActions to use for job leader id actions
+	 * @throws Exception which is thrown when clearing up old state
+	 */
+	public void start(JobLeaderIdActions initialJobLeaderIdActions) throws Exception {
+		if (isStarted()) {
+			clear();
+		}
+
+		this.jobLeaderIdActions = Preconditions.checkNotNull(initialJobLeaderIdActions);
+	}
+
+	/**
+	 * Stop the service.
+	 *
+	 * @throws Exception which is thrown in case a retrieval service cannot be stopped properly
+	 */
+	public void stop() throws Exception {
+		clear();
+
+		this.jobLeaderIdActions = null;
+	}
+
+	/**
+	 * Checks whether the service has been started.
+	 *
+	 * @return True if the service has been started; otherwise false
+	 */
+	public boolean isStarted() {
+		return jobLeaderIdActions == null;
+	}
+
+	/**
+	 * Stop and clear the currently registered job leader id listeners.
+	 *
+	 * @throws Exception which is thrown in case a retrieval service cannot be stopped properly
+	 */
+	public void clear() throws Exception {
+		Exception exception = null;
+
+		for (JobLeaderIdListener listener: jobLeaderIdListeners.values()) {
+			try {
+				listener.stop();
+			} catch (Exception e) {
+				exception = ExceptionUtils.firstOrSuppressed(e, exception);
+			}
+		}
+
+		if (exception != null) {
+			ExceptionUtils.rethrowException(exception, "Could not properly stop the " +
+				JobLeaderIdService.class.getSimpleName() + '.');
+		}
+
+		jobLeaderIdListeners.clear();
+	}
+
+	/**
+	 * Add a job to be monitored to retrieve the job leader id.
+	 *
+	 * @param jobId identifying the job to monitor
+	 * @throws Exception if the job could not be added to the service
+	 */
+	public void addJob(JobID jobId) throws Exception {
+		Preconditions.checkNotNull(jobLeaderIdActions);
+
+		LOG.debug("Add job {} to job leader id monitoring.", jobId);
+
+		if (!jobLeaderIdListeners.containsKey(jobId)) {
+			LeaderRetrievalService leaderRetrievalService = highAvailabilityServices.getJobManagerLeaderRetriever(jobId);
+
+			JobLeaderIdListener jobidListener = new JobLeaderIdListener(jobId, jobLeaderIdActions, leaderRetrievalService);
+			jobLeaderIdListeners.put(jobId, jobidListener);
+		}
+	}
+
+	/**
+	 * Remove the given job from being monitored by the service.
+	 *
+	 * @param jobId identifying the job to remove from monitor
+	 * @throws Exception if removing the job fails
+	 */
+	public void removeJob(JobID jobId) throws Exception {
+		LOG.debug("Remove job {} from job leader id monitoring.", jobId);
+
+		JobLeaderIdListener listener = jobLeaderIdListeners.remove(jobId);
+
+		if (listener != null) {
+			listener.stop();
+		}
+	}
+
+	/**
+	 * Check whether the given job is being monitored or not.
+	 *
+	 * @param jobId identifying the job
+	 * @return True if the job is being monitored; otherwise false
+	 */
+	public boolean containsJob(JobID jobId) {
+		return jobLeaderIdListeners.containsKey(jobId);
+	}
+
+	public Future<UUID> getLeaderId(JobID jobId) throws Exception {
+		if (!jobLeaderIdListeners.containsKey(jobId)) {
+			addJob(jobId);
+		}
+
+		JobLeaderIdListener listener = jobLeaderIdListeners.get(jobId);
+
+		return listener.getLeaderIdFuture();
+	}
+
+	// --------------------------------------------------------------------------------
+	// Static utility classes
+	// --------------------------------------------------------------------------------
+
+	/**
+	 * Listener which stores the current leader id and exposes them as a future value when
+	 * requested. The returned future will always be completed properly except when stopping the
+	 * listener.
+	 */
+	private final class JobLeaderIdListener implements LeaderRetrievalListener {
+		private final JobID jobId;
+		private final JobLeaderIdActions listenerJobLeaderIdActions;
+		private final LeaderRetrievalService leaderRetrievalService;
+
+		private volatile CompletableFuture<UUID> leaderIdFuture;
+		private volatile boolean running = true;
+
+		private JobLeaderIdListener(
+				JobID jobId,
+				JobLeaderIdActions listenerJobLeaderIdActions,
+				LeaderRetrievalService leaderRetrievalService) throws Exception {
+			this.jobId = Preconditions.checkNotNull(jobId);
+			this.listenerJobLeaderIdActions = Preconditions.checkNotNull(listenerJobLeaderIdActions);
+			this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
+
+			leaderIdFuture = new FlinkCompletableFuture<>();
+
+			// start the leader service we're listening to
+			leaderRetrievalService.start(this);
+		}
+
+		public Future<UUID> getLeaderIdFuture() {
+			return leaderIdFuture;
+		}
+
+		public void stop() throws Exception {
+			running = false;
+			leaderRetrievalService.stop();
+			leaderIdFuture.completeExceptionally(new Exception("Job leader id service has been stopped."));
+		}
+
+		@Override
+		public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionId) {
+			if (running) {
+				LOG.debug("Found a new job leader {}@{}.", leaderSessionId, leaderAddress);
+
+				UUID previousJobLeaderId = null;
+
+				if (leaderIdFuture.isDone()) {
+					try {
+						previousJobLeaderId = leaderIdFuture.getNow(null);
+					} catch (ExecutionException e) {
+						// this should never happen since we complete this future always properly
+						handleError(e);
+					}
+
+					leaderIdFuture = FlinkCompletableFuture.completed(leaderSessionId);
+				} else {
+					leaderIdFuture.complete(leaderSessionId);
+				}
+
+				try {
+					if (runningJobsRegistry.isJobRunning(jobId)) {
+						if (leaderSessionId == null) {
+							// there is no new leader
+							if (previousJobLeaderId != null) {
+								// we had a previous job leader, so notify about his lost leadership
+								listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId);
+							}
+						} else {
+							if (previousJobLeaderId != null && !leaderSessionId.equals(previousJobLeaderId)) {
+								// we had a previous leader and he's not the same as the new leader
+								listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId);
+							}
+						}
+					} else {
+						// the job is no longer running so remove it
+						listenerJobLeaderIdActions.removeJob(jobId);
+					}
+				} catch (IOException e) {
+					// cannot tell whether the job is still running or not so just remove the listener
+					LOG.debug("Encountered an error while checking the job registry for running jobs.", e);
+					listenerJobLeaderIdActions.removeJob(jobId);
+				}
+			} else {
+				LOG.debug("A leader id change {}@{} has been detected after the listener has been stopped.",
+					leaderSessionId, leaderAddress);
+			}
+		}
+
+		@Override
+		public void handleError(Exception exception) {
+			if (running) {
+				listenerJobLeaderIdActions.handleError(exception);
+			} else {
+				LOG.debug("An error occurred in the {} after the listener has been stopped.",
+					JobLeaderIdListener.class.getSimpleName(), exception);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 4161972..7240087 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -20,7 +20,6 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -36,13 +35,11 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
-import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
 import org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
-import org.apache.flink.runtime.resourcemanager.registration.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.registration.JobManagerRegistration;
 import org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
@@ -61,10 +58,8 @@ import org.apache.flink.util.ExceptionUtils;
 
 import java.io.Serializable;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
@@ -77,7 +72,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * It offers the following methods as part of its rpc interface to interact with him remotely:
  * <ul>
- *     <li>{@link #registerJobMaster(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
+ *     <li>{@link #registerJobManager(UUID, UUID, String, JobID)} registers a {@link JobMaster} at the resource manager</li>
  *     <li>{@link #requestSlot(UUID, UUID, SlotRequest)} requests a slot from the resource manager</li>
  * </ul>
  */
@@ -89,10 +84,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	private final ResourceManagerConfiguration resourceManagerConfiguration;
 
 	/** All currently registered JobMasterGateways scoped by JobID. */
-	private final Map<JobID, JobMasterRegistration> jobMasters;
+	private final Map<JobID, JobManagerRegistration> jobManagerRegistrations;
 
-	/** LeaderListeners for all registered JobIDs. */
-	private final Map<JobID, JobIdLeaderListener> leaderListeners;
+	/** Service to retrieve the job leader ids */
+	private final JobLeaderIdService jobLeaderIdService;
 
 	/** All currently registered TaskExecutors with there framework specific worker information. */
 	private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors;
@@ -116,7 +111,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	private LeaderElectionService leaderElectionService;
 
 	/** ResourceManager's leader session id which is updated on leader election. */
-	private volatile UUID leaderSessionID;
+	private volatile UUID leaderSessionId;
 
 	/** All registered listeners for status updates of the ResourceManager. */
 	private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners;
@@ -127,6 +122,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			HighAvailabilityServices highAvailabilityServices,
 			SlotManagerFactory slotManagerFactory,
 			MetricRegistry metricRegistry,
+			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
 
 		super(rpcService);
@@ -135,12 +131,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		this.highAvailabilityServices = checkNotNull(highAvailabilityServices);
 		this.slotManagerFactory = checkNotNull(slotManagerFactory);
 		this.metricRegistry = checkNotNull(metricRegistry);
+		this.jobLeaderIdService = checkNotNull(jobLeaderIdService);
 		this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
 
-		this.jobMasters = new HashMap<>(4);
-		this.leaderListeners = new HashMap<>(4);
+		this.jobManagerRegistrations = new HashMap<>(4);
 		this.taskExecutors = new HashMap<>(8);
-		this.leaderSessionID = null;
+		this.leaderSessionId = null;
 		infoMessageListeners = new ConcurrentHashMap<>(8);
 	}
 
@@ -168,11 +164,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		}
 
 		try {
-			// framework specific initialization
-			initialize();
+			jobLeaderIdService.start(new JobLeaderIdActionsImpl());
 		} catch (Exception e) {
-			throw new ResourceManagerException("Could not initialize the resource manager.", e);
+			throw new ResourceManagerException("Could not start the job leader id service.", e);
 		}
+
+		initialize();
 	}
 
 	@Override
@@ -180,6 +177,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		Exception exception = null;
 
 		try {
+			jobLeaderIdService.stop();
+		} catch (Exception e) {
+			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		}
+
+		try {
 			leaderElectionService.stop();
 		} catch (Exception e) {
 			exception = ExceptionUtils.firstOrSuppressed(e, exception);
@@ -202,101 +205,124 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	//  RPC methods
 	// ------------------------------------------------------------------------
 
-	/**
-	 * Register a {@link JobMaster} at the resource manager.
-	 *
-	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
-	 * @param jobMasterAddress        The address of the JobMaster that registers
-	 * @param jobID                   The Job ID of the JobMaster that registers
-	 * @return Future registration response
-	 */
 	@RpcMethod
-	public Future<RegistrationResponse> registerJobMaster(
-		final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId,
-		final String jobMasterAddress, final JobID jobID) {
+	public Future<RegistrationResponse> registerJobManager(
+			final UUID resourceManagerLeaderId,
+			final UUID jobManagerLeaderId,
+			final String jobManagerAddress,
+			final JobID jobId) {
+
+		checkNotNull(resourceManagerLeaderId);
+		checkNotNull(jobManagerLeaderId);
+		checkNotNull(jobManagerAddress);
+		checkNotNull(jobId);
+
+		if (isValid(resourceManagerLeaderId)) {
+			if (!jobLeaderIdService.containsJob(jobId)) {
+				try {
+					jobLeaderIdService.addJob(jobId);
+				} catch (Exception e) {
+					ResourceManagerException exception = new ResourceManagerException("Could not add the job " +
+						jobId + " to the job id leader service.", e);
+
+					onFatalErrorAsync(exception);
+
+					log.error("Could not add job {} to job leader id service.", jobId, e);
+					return FlinkCompletableFuture.completedExceptionally(exception);
+				}
+			}
 
-		checkNotNull(jobMasterAddress);
-		checkNotNull(jobID);
+			log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId);
+
+			Future<UUID> jobLeaderIdFuture;
 
-		// create a leader retriever in case it doesn't exist
-		final JobIdLeaderListener jobIdLeaderListener;
-		if (leaderListeners.containsKey(jobID)) {
-			jobIdLeaderListener = leaderListeners.get(jobID);
-		} else {
 			try {
-				LeaderRetrievalService jobMasterLeaderRetriever =
-					highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
-				jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
+				jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
 			} catch (Exception e) {
-				log.warn("Failed to start JobMasterLeaderRetriever for job id {}", jobID, e);
+				// we cannot check the job leader id so let's fail
+				// TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id
+				ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " +
+					"job leader id future to verify the correct job leader.", e);
+
+				onFatalErrorAsync(exception);
 
-				return FlinkCompletableFuture.<RegistrationResponse>completed(
-					new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
+				log.debug("Could not obtain the job leader id future to verify the correct job leader.");
+				return FlinkCompletableFuture.completedExceptionally(exception);
 			}
 
-			leaderListeners.put(jobID, jobIdLeaderListener);
-		}
+			Future<JobMasterGateway> jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, JobMasterGateway.class);
 
-		return getRpcService()
-			.execute(new Callable<JobMasterGateway>() {
+			Future<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(jobLeaderIdFuture, new BiFunction<JobMasterGateway, UUID, RegistrationResponse>() {
 				@Override
-				public JobMasterGateway call() throws Exception {
+				public RegistrationResponse apply(JobMasterGateway jobMasterGateway, UUID jobLeaderId) {
+					if (isValid(resourceManagerLeaderId)) {
+						if (jobLeaderId.equals(jobManagerLeaderId)) {
+							if (jobManagerRegistrations.containsKey(jobId)) {
+								JobManagerRegistration oldJobManagerRegistration = jobManagerRegistrations.get(jobId);
+
+								if (oldJobManagerRegistration.getLeaderID().equals(jobLeaderId)) {
+									// same registration
+									log.debug("Job manager {}@{} was already registered.", jobManagerLeaderId, jobManagerAddress);
+								} else {
+									// tell old job manager that he is no longer the job leader
+									disconnectJobManager(
+										oldJobManagerRegistration.getJobID(),
+										new Exception("New job leader for job " + jobId + " found."));
+
+									JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobLeaderId, jobMasterGateway);
+									jobManagerRegistrations.put(jobId, jobManagerRegistration);
+								}
+							} else {
+								// new registration for the job
+								JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobLeaderId, jobMasterGateway);
+
+								jobManagerRegistrations.put(jobId, jobManagerRegistration);
+							}
 
-					if (!leaderSessionID.equals(resourceManagerLeaderId)) {
-						log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" +
-								" did not equal the received leader session ID  {}",
-							jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
-						throw new Exception("Invalid leader session id");
-					}
+							log.info("Registered job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId);
 
-					final Time timeout = resourceManagerConfiguration.getTimeout();
+							return new JobMasterRegistrationSuccess(
+								resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
+								getLeaderSessionId());
 
-					if (!jobIdLeaderListener.getLeaderID().get(timeout.getSize(), timeout.getUnit())
-							.equals(jobMasterLeaderId)) {
-						throw new Exception("Leader Id did not match");
+						} else {
+							log.debug("The job manager leader id {} did not match the job " +
+								"leader id {}.", jobManagerLeaderId, jobLeaderId);
+							return new RegistrationResponse.Decline("Job manager leader id did not match.");
+						}
+					} else {
+						log.debug("The resource manager leader id changed {}. Discarding job " +
+							"manager registration from {}.", getLeaderSessionId(), jobManagerAddress);
+						return new RegistrationResponse.Decline("Resource manager leader id changed.");
 					}
-
-					return getRpcService().connect(jobMasterAddress, JobMasterGateway.class)
-						.get(timeout.getSize(), timeout.getUnit());
 				}
-			})
-			.handleAsync(new BiFunction<JobMasterGateway, Throwable, RegistrationResponse>() {
-				@Override
-				public RegistrationResponse apply(JobMasterGateway jobMasterGateway, Throwable throwable) {
-
-				if (throwable != null) {
-					return new RegistrationResponse.Decline(throwable.getMessage());
-				} else {
-					if (!leaderSessionID.equals(resourceManagerLeaderId)) {
-						log.warn("Discard registration from JobMaster {} at ({}) because the expected leader session ID {}" +
-								" did not equal the received leader session ID  {}",
-							jobID, jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
-						return new RegistrationResponse.Decline("Invalid leader session id");
-					}
+			}, getMainThreadExecutor());
 
-					try {
-						// LeaderID should be available now, but if not we fail the registration
-						UUID currentJobMasterLeaderId = jobIdLeaderListener.getLeaderID().getNow(null);
-						if (currentJobMasterLeaderId == null || !currentJobMasterLeaderId.equals(jobMasterLeaderId)) {
-							throw new Exception("Leader Id did not match");
+			// handle exceptions which might have occurred in one of the futures inputs of combine
+			return registrationResponseFuture.handleAsync(new BiFunction<RegistrationResponse, Throwable, RegistrationResponse>() {
+				@Override
+				public RegistrationResponse apply(RegistrationResponse registrationResponse, Throwable throwable) {
+					if (throwable != null) {
+						if (log.isDebugEnabled()) {
+							log.debug("Registration of job manager {}@{} failed.", jobManagerLeaderId, jobManagerAddress, throwable);
+						} else {
+							log.info("Registration of job manager {}@{} failed.", jobManagerLeaderId, jobManagerAddress);
 						}
-					} catch (Exception e) {
-						return new RegistrationResponse.Decline(e.getMessage());
-					}
-
-					final JobMasterRegistration registration =
-						new JobMasterRegistration(jobID, jobMasterLeaderId, jobMasterGateway);
 
-					final JobMasterRegistration existingRegistration = jobMasters.put(jobID, registration);
-					if (existingRegistration != null) {
-						log.info("Replacing JobMaster registration for newly registered JobMaster with JobID {}.", jobID);
+						return new RegistrationResponse.Decline(throwable.getMessage());
+					} else {
+						return registrationResponse;
 					}
-					return new JobMasterRegistrationSuccess(
-						resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
-						resourceManagerLeaderId);
 				}
-			}
-		}, getMainThreadExecutor());
+			}, getRpcService().getExecutor());
+		} else {
+			log.debug("Discard register job manager message from {}, because the leader id " +
+				"{} did not match the expected leader id {}.", jobManagerAddress,
+				resourceManagerLeaderId, leaderSessionId);
+
+			return FlinkCompletableFuture.<RegistrationResponse>completed(
+				new RegistrationResponse.Decline("Resource manager leader id did not match."));
+		}
 	}
 
 	/**
@@ -315,7 +341,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		final ResourceID resourceID,
 		final SlotReport slotReport) {
 
-		if (leaderSessionID.equals(resourceManagerLeaderId)) {
+		if (leaderSessionId.equals(resourceManagerLeaderId)) {
 			Future<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class);
 
 			return taskExecutorGatewayFuture.handleAsync(new BiFunction<TaskExecutorGateway, Throwable, RegistrationResponse>() {
@@ -346,12 +372,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		} else {
 			log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did " +
 					"not equal the received leader session ID  {}",
-				resourceID, taskExecutorAddress, leaderSessionID, resourceManagerLeaderId);
+				resourceID, taskExecutorAddress, leaderSessionId, resourceManagerLeaderId);
 
 			return FlinkCompletableFuture.<RegistrationResponse>completed(
 				new RegistrationResponse.Decline("Discard registration because the leader id " +
 					resourceManagerLeaderId + " does not match the expected leader id " +
-					leaderSessionID + '.'));
+					leaderSessionId + '.'));
 		}
 	}
 
@@ -368,11 +394,11 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			SlotRequest slotRequest) {
 
 		JobID jobId = slotRequest.getJobId();
-		JobMasterRegistration jobMasterRegistration = jobMasters.get(jobId);
+		JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
 
-		if (jobMasterRegistration != null
-				&& jobMasterLeaderID.equals(jobMasterRegistration.getLeaderID())
-				&& resourceManagerLeaderID.equals(leaderSessionID)) {
+		if (jobManagerRegistration != null
+				&& jobMasterLeaderID.equals(jobManagerRegistration.getLeaderID())
+				&& resourceManagerLeaderID.equals(leaderSessionId)) {
 			return slotManager.requestSlot(slotRequest);
 		} else {
 			log.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId);
@@ -393,7 +419,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			final InstanceID instanceID,
 			final SlotID slotId) {
 
-		if (resourceManagerLeaderId.equals(leaderSessionID)) {
+		if (resourceManagerLeaderId.equals(leaderSessionId)) {
 			final ResourceID resourceId = slotId.getResourceID();
 			WorkerRegistration<WorkerType> registration = taskExecutors.get(resourceId);
 
@@ -413,7 +439,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		} else {
 			log.debug("Discarding notify slot available message for slot {}, because the " +
 				"leader id {} did not match the expected leader id {}.", slotId,
-				resourceManagerLeaderId, leaderSessionID);
+				resourceManagerLeaderId, leaderSessionId);
 		}
 	}
 
@@ -471,6 +497,150 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	// ------------------------------------------------------------------------
+	//  Testing methods
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Gets the leader session id of current resourceManager.
+	 *
+	 * @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership.
+	 */
+	@VisibleForTesting
+	UUID getLeaderSessionId() {
+		return leaderSessionId;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Internal methods
+	// ------------------------------------------------------------------------
+
+	private void clearState() {
+		jobManagerRegistrations.clear();
+		taskExecutors.clear();
+		slotManager.clearState();
+
+		try {
+			jobLeaderIdService.clear();
+		} catch (Exception e) {
+			onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e));
+		}
+
+		leaderSessionId = null;
+	}
+
+	/**
+	 * Disconnects the job manager which is connected for the given job from the resource manager.
+	 *
+	 * @param jobId identifying the job whose leader shall be disconnected
+	 */
+	protected void disconnectJobManager(JobID jobId, Exception cause) {
+		JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.remove(jobId);
+
+		if (jobManagerRegistration != null) {
+			log.info("Disconnect job manager {}@{} for job {} from the resource manager.",
+				jobManagerRegistration.getLeaderID(),
+				jobManagerRegistration.getJobManagerGateway().getAddress(),
+				jobId);
+
+			JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway();
+
+			// tell the job manager about the disconnect
+			jobMasterGateway.disconnectResourceManager(jobManagerRegistration.getLeaderID(), getLeaderSessionId(), cause);
+		} else {
+			log.debug("There was no registered job manager for job {}.", jobId);
+		}
+	}
+
+	/**
+	 * Checks whether the given resource manager leader id is matching the current leader id and
+	 * not null.
+	 *
+	 * @param resourceManagerLeaderId to check
+	 * @return True if the given leader id matches the actual leader id and is not null; otherwise false
+	 */
+	protected boolean isValid(UUID resourceManagerLeaderId) {
+		if (resourceManagerLeaderId == null) {
+			return false;
+		} else {
+			return resourceManagerLeaderId.equals(leaderSessionId);
+		}
+	}
+
+	protected void removeJob(JobID jobId) {
+		try {
+			jobLeaderIdService.removeJob(jobId);
+		} catch (Exception e) {
+			log.warn("Could not properly remove the job {} from the job leader id service.", jobId, e);
+		}
+
+		if (jobManagerRegistrations.containsKey(jobId)) {
+			disconnectJobManager(jobId, new Exception("Job " + jobId + "was removed"));
+		}
+	}
+
+	protected void jobLeaderLostLeadership(JobID jobId, UUID oldJobLeaderId) {
+		if (jobManagerRegistrations.containsKey(jobId)) {
+			JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
+
+			if (jobManagerRegistration.getLeaderID().equals(oldJobLeaderId)) {
+				disconnectJobManager(jobId, new Exception("Job leader lost leadership."));
+			} else {
+				log.debug("Discarding job leader lost leadership, because a new job leader was found for job {}. ", jobId);
+			}
+		} else {
+			log.debug("Discard job leader lost leadership for outdated leader {} for job {}.", oldJobLeaderId, jobId);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  Info messaging
+	// ------------------------------------------------------------------------
+
+	public void sendInfoMessage(final String message) {
+		getRpcService().execute(new Runnable() {
+			@Override
+			public void run() {
+				InfoMessage infoMessage = new InfoMessage(message);
+				for (InfoMessageListenerRpcGateway listenerRpcGateway : infoMessageListeners.values()) {
+					listenerRpcGateway
+						.notifyInfoMessage(infoMessage);
+				}
+			}
+		});
+	}
+
+	// ------------------------------------------------------------------------
+	//  Error Handling
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Notifies the ResourceManager that a fatal error has occurred and it cannot proceed.
+	 * This method should be used when asynchronous threads want to notify the
+	 * ResourceManager of a fatal error.
+	 *
+	 * @param t The exception describing the fatal error
+	 */
+	void onFatalErrorAsync(final Throwable t) {
+		runAsync(new Runnable() {
+			@Override
+			public void run() {
+				onFatalError(t);
+			}
+		});
+	}
+
+	/**
+	 * Notifies the ResourceManager that a fatal error has occurred and it cannot proceed.
+	 * This method must only be called from within the ResourceManager's main thread.
+	 *
+	 * @param t The exception describing the fatal error
+	 */
+	void onFatalError(Throwable t) {
+		log.error("Fatal error occurred.", t);
+		fatalErrorHandler.onFatalError(t);
+	}
+
+	// ------------------------------------------------------------------------
 	//  Leader Contender
 	// ------------------------------------------------------------------------
 
@@ -487,11 +657,11 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 				log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID);
 
 				// clear the state if we've been the leader before
-				if (leaderSessionID != null) {
+				if (leaderSessionId != null) {
 					clearState();
 				}
 
-				leaderSessionID = newLeaderSessionID;
+				leaderSessionId = newLeaderSessionID;
 
 				getRpcService().execute(new Runnable() {
 					@Override
@@ -516,7 +686,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 				clearState();
 
-				leaderSessionID = null;
+				leaderSessionId = null;
 			}
 		});
 	}
@@ -556,82 +726,15 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	}
 
 	// ------------------------------------------------------------------------
-	//  Error Handling
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Notifies the ResourceManager that a fatal error has occurred and it cannot proceed.
-	 * This method should be used when asynchronous threads want to notify the
-	 * ResourceManager of a fatal error.
-	 *
-	 * @param t The exception describing the fatal error
-	 */
-	void onFatalErrorAsync(final Throwable t) {
-		runAsync(new Runnable() {
-			@Override
-			public void run() {
-				onFatalError(t);
-			}
-		});
-	}
-
-	/**
-	 * Notifies the ResourceManager that a fatal error has occurred and it cannot proceed.
-	 * This method must only be called from within the ResourceManager's main thread.
-	 *
-	 * @param t The exception describing the fatal error
-	 */
-	void onFatalError(Throwable t) {
-		log.error("Fatal error occurred.", t);
-		fatalErrorHandler.onFatalError(t);
-	}
-
-	// ------------------------------------------------------------------------
-	//  Testing methods
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Gets the leader session id of current resourceManager.
-	 *
-	 * @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership.
-	 */
-	@VisibleForTesting
-	UUID getLeaderSessionID() {
-		return leaderSessionID;
-	}
-
-	// ------------------------------------------------------------------------
-	//  Internal methods
-	// ------------------------------------------------------------------------
-
-	private void clearState() {
-		jobMasters.clear();
-		taskExecutors.clear();
-		slotManager.clearState();
-		Iterator<JobIdLeaderListener> leaderListenerIterator =
-			leaderListeners.values().iterator();
-		while (leaderListenerIterator.hasNext()) {
-			JobIdLeaderListener listener = leaderListenerIterator.next();
-			try {
-				listener.stopService();
-			} catch (Exception e) {
-				onFatalError(e);
-			}
-			leaderListenerIterator.remove();
-		}
-		leaderSessionID = new UUID(0, 0);
-	}
-
-	// ------------------------------------------------------------------------
 	//  Framework specific behavior
 	// ------------------------------------------------------------------------
 
 	/**
 	 * Initializes the framework specific components.
 	 *
-	 * @throws Exception Exceptions during initialization cause the resource manager to fail.
+	 * @throws ResourceManagerException which occurs during initialization and causes the resource manager to fail.
 	 */
-	protected abstract void initialize() throws Exception;
+	protected abstract void initialize() throws ResourceManagerException;
 
 	/**
 	 * The framework specific code for shutting down the application. This should report the
@@ -659,24 +762,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	protected abstract WorkerType workerStarted(ResourceID resourceID);
 
 	// ------------------------------------------------------------------------
-	//  Info messaging
-	// ------------------------------------------------------------------------
-
-	public void sendInfoMessage(final String message) {
-		getRpcService().execute(new Runnable() {
-			@Override
-			public void run() {
-				InfoMessage infoMessage = new InfoMessage(message);
-				for (InfoMessageListenerRpcGateway listenerRpcGateway : infoMessageListeners.values()) {
-					listenerRpcGateway
-						.notifyInfoMessage(infoMessage);
-				}
-			}
-		});
-	}
-
-	// ------------------------------------------------------------------------
-	//  Resource Manager Services
+	//  Static utility classes
 	// ------------------------------------------------------------------------
 
 	protected ResourceManagerServices createResourceManagerServices() {
@@ -687,7 +773,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 		@Override
 		public UUID getLeaderID() {
-			return ResourceManager.this.leaderSessionID;
+			return ResourceManager.this.leaderSessionId;
 		}
 
 		@Override
@@ -706,76 +792,26 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		}
 	}
 
-	/**
-	 * Leader instantiated for each connected JobMaster
-	 */
-	private class JobIdLeaderListener implements LeaderRetrievalListener {
-
-		private final JobID jobID;
-		private final LeaderRetrievalService retrievalService;
-
-		private final FlinkCompletableFuture<UUID> initialLeaderIdFuture;
-
-		private volatile UUID leaderID;
-
-		private JobIdLeaderListener(
-				JobID jobID,
-				LeaderRetrievalService retrievalService) throws Exception {
-			this.jobID = jobID;
-			this.retrievalService = retrievalService;
-			this.initialLeaderIdFuture = new FlinkCompletableFuture<>();
-			this.retrievalService.start(this);
-		}
+	private class JobLeaderIdActionsImpl implements JobLeaderIdActions {
 
-		public Future<UUID> getLeaderID() {
-			if (!initialLeaderIdFuture.isDone()) {
-				return initialLeaderIdFuture;
-			} else {
-				return FlinkCompletableFuture.completed(leaderID);
-			}
-		}
-
-		public JobID getJobID() {
-			return jobID;
-		}
-
-
-		public void stopService() throws Exception {
-			retrievalService.stop();
+		@Override
+		public void jobLeaderLostLeadership(final JobID jobId, final UUID oldJobLeaderId) {
+			ResourceManager.this.jobLeaderLostLeadership(jobId, oldJobLeaderId);
 		}
 
 		@Override
-		public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
-			this.leaderID = leaderSessionID;
-
-			if (!initialLeaderIdFuture.isDone()) {
-				initialLeaderIdFuture.complete(leaderSessionID);
-			}
-
-			ResourceManager.this.runAsync(new Runnable() {
+		public void removeJob(final JobID jobId) {
+			runAsync(new Runnable() {
 				@Override
 				public void run() {
-					JobMasterRegistration jobMasterRegistration = ResourceManager.this.jobMasters.get(jobID);
-					if (jobMasterRegistration == null || !jobMasterRegistration.getLeaderID().equals(leaderSessionID)) {
-						// registration is not valid anymore, remove registration
-						ResourceManager.this.jobMasters.remove(jobID);
-						// leader listener is not necessary anymore
-						JobIdLeaderListener listener = ResourceManager.this.leaderListeners.remove(jobID);
-						if (listener != null) {
-							try {
-								listener.stopService();
-							} catch (Exception e) {
-								ResourceManager.this.handleError(e);
-							}
-						}
-					}
+					ResourceManager.this.removeJob(jobId);
 				}
 			});
 		}
 
 		@Override
-		public void handleError(final Exception exception) {
-			ResourceManager.this.handleError(exception);
+		public void handleError(Throwable error) {
+			onFatalErrorAsync(error);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 968eeb8..0a37bb9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -49,7 +49,7 @@ public interface ResourceManagerGateway extends RpcGateway {
 	 * @param timeout                 Timeout for the future to complete
 	 * @return Future registration response
 	 */
-	Future<RegistrationResponse> registerJobMaster(
+	Future<RegistrationResponse> registerJobManager(
 		UUID resourceManagerLeaderId,
 		UUID jobMasterLeaderId,
 		String jobMasterAddress,

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
index 926be0d..73c8a2d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -41,6 +42,7 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 			HighAvailabilityServices highAvailabilityServices,
 			SlotManagerFactory slotManagerFactory,
 			MetricRegistry metricRegistry,
+			JobLeaderIdService jobLeaderIdService,
 			FatalErrorHandler fatalErrorHandler) {
 		super(
 			rpcService,
@@ -48,11 +50,12 @@ public class StandaloneResourceManager extends ResourceManager<ResourceID> {
 			highAvailabilityServices,
 			slotManagerFactory,
 			metricRegistry,
+			jobLeaderIdService,
 			fatalErrorHandler);
 	}
 
 	@Override
-	protected void initialize() throws Exception {
+	protected void initialize() throws ResourceManagerException {
 		// nothing to initialize
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
index 1e6f04c..0c7e4e4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/exceptions/ResourceManagerRunner.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.resourcemanager.exceptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
@@ -48,7 +49,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 			final Configuration configuration,
 			final RpcService rpcService,
 			final HighAvailabilityServices highAvailabilityServices,
-			final MetricRegistry metricRegistry) throws ConfigurationException {
+			final MetricRegistry metricRegistry) throws Exception {
 
 		Preconditions.checkNotNull(configuration);
 		Preconditions.checkNotNull(rpcService);
@@ -57,6 +58,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 
 		final ResourceManagerConfiguration resourceManagerConfiguration = ResourceManagerConfiguration.fromConfiguration(configuration);
 		final SlotManagerFactory slotManagerFactory = new DefaultSlotManager.Factory();
+		final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices);
 
 		this.resourceManager = new StandaloneResourceManager(
 			rpcService,
@@ -64,6 +66,7 @@ public class ResourceManagerRunner implements FatalErrorHandler {
 			highAvailabilityServices,
 			slotManagerFactory,
 			metricRegistry,
+			jobLeaderIdService,
 			this);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
new file mode 100644
index 0000000..a1deb65
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.registration;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.util.Preconditions;
+
+import java.util.UUID;
+
+/**
+ * Container for JobManager related registration information, such as the leader id or the job id.
+ */
+public class JobManagerRegistration {
+	private final JobID jobID;
+
+	private final UUID leaderID;
+
+	private final JobMasterGateway jobManagerGateway;
+
+	public JobManagerRegistration(
+			JobID jobID,
+			UUID leaderID,
+			JobMasterGateway jobManagerGateway) {
+		this.jobID = Preconditions.checkNotNull(jobID);
+		this.leaderID = Preconditions.checkNotNull(leaderID);
+		this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway);
+	}
+
+	public JobID getJobID() {
+		return jobID;
+	}
+
+	public UUID getLeaderID() {
+		return leaderID;
+	}
+
+	public JobMasterGateway getJobManagerGateway() {
+		return jobManagerGateway;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
deleted file mode 100644
index f417935..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager.registration;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-
-import java.util.UUID;
-
-/**
- * This class is responsible for grouping the JobMasterGateway and the JobMaster's
- * leader id
- */
-public class JobMasterRegistration {
-
-	private static final long serialVersionUID = -2062957799469434614L;
-
-	private final JobID jobID;
-
-	private final UUID leaderID;
-
-	private final JobMasterGateway jobMasterGateway;
-
-	public JobMasterRegistration(
-			JobID jobID,
-			UUID leaderID,
-			JobMasterGateway jobMasterGateway) {
-		this.jobID = jobID;
-		this.leaderID = leaderID;
-		this.jobMasterGateway = jobMasterGateway;
-	}
-
-	public JobID getJobID() {
-		return jobID;
-	}
-
-
-	public UUID getLeaderID() {
-		return leaderID;
-	}
-
-	public JobMasterGateway getJobMasterGateway() {
-		return jobMasterGateway;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 44719c8..6e3fb40 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -96,8 +96,6 @@ public class AkkaRpcService implements RpcService {
 
 		Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
 
-
-
 		if (actorSystemAddress.host().isDefined()) {
 			address = actorSystemAddress.host().get();
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index cb38e6e..2e52eeb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -49,6 +49,7 @@ public class ResourceManagerHATest {
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
 		SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 
 		final ResourceManager resourceManager =
@@ -58,17 +59,18 @@ public class ResourceManagerHATest {
 				highAvailabilityServices,
 				slotManagerFactory,
 				metricRegistry,
+				jobLeaderIdService,
 				testingFatalErrorHandler);
 		resourceManager.start();
 		// before grant leadership, resourceManager's leaderId is null
-		Assert.assertEquals(null, resourceManager.getLeaderSessionID());
+		Assert.assertEquals(null, resourceManager.getLeaderSessionId());
 		final UUID leaderId = UUID.randomUUID();
 		leaderElectionService.isLeader(leaderId);
 		// after grant leadership, resourceManager's leaderId has value
-		Assert.assertEquals(leaderId, resourceManager.getLeaderSessionID());
+		Assert.assertEquals(leaderId, resourceManager.getLeaderSessionId());
 		// then revoke leadership, resourceManager's leaderId is null again
 		leaderElectionService.notLeader();
-		Assert.assertEquals(null, resourceManager.getLeaderSessionID());
+		Assert.assertEquals(null, resourceManager.getLeaderSessionId());
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
 			testingFatalErrorHandler.rethrowError();

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 7b8d254..2622634 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -71,8 +71,8 @@ public class ResourceManagerJobMasterTest {
 		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
 		// test response successful
-		Future<RegistrationResponse> successfulFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderID, jobMasterAddress, jobID);
-		RegistrationResponse response = successfulFuture.get(5, TimeUnit.SECONDS);
+		Future<RegistrationResponse> successfulFuture = resourceManager.registerJobManager(rmLeaderSessionId, jmLeaderID, jobMasterAddress, jobID);
+		RegistrationResponse response = successfulFuture.get(5L, TimeUnit.SECONDS);
 		assertTrue(response instanceof JobMasterRegistrationSuccess);
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -96,7 +96,7 @@ public class ResourceManagerJobMasterTest {
 
 		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
-		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(differentLeaderSessionID, jmLeaderID, jobMasterAddress, jobID);
+		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(differentLeaderSessionID, jmLeaderID, jobMasterAddress, jobID);
 		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -120,7 +120,7 @@ public class ResourceManagerJobMasterTest {
 
 		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
-		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobMaster(rmLeaderSessionId, differentLeaderSessionID, jobMasterAddress, jobID);
+		Future<RegistrationResponse> unMatchedLeaderFuture = resourceManager.registerJobManager(rmLeaderSessionId, differentLeaderSessionID, jobMasterAddress, jobID);
 		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -144,7 +144,7 @@ public class ResourceManagerJobMasterTest {
 
 		// test throw exception when receive a registration from job master which takes invalid address
 		String invalidAddress = "/jobMasterAddress2";
-		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, invalidAddress, jobID);
+		Future<RegistrationResponse> invalidAddressFuture = resourceManager.registerJobManager(rmLeaderSessionId, jmLeaderSessionId, invalidAddress, jobID);
 		assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -168,7 +168,7 @@ public class ResourceManagerJobMasterTest {
 
 		JobID unknownJobIDToHAServices = new JobID();
 		// verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
-		Future<RegistrationResponse> declineFuture = resourceManager.registerJobMaster(rmLeaderSessionId, jmLeaderSessionId, jobMasterAddress, unknownJobIDToHAServices);
+		Future<RegistrationResponse> declineFuture = resourceManager.registerJobManager(rmLeaderSessionId, jmLeaderSessionId, jobMasterAddress, unknownJobIDToHAServices);
 		RegistrationResponse response = declineFuture.get(5, TimeUnit.SECONDS);
 		assertTrue(response instanceof RegistrationResponse.Decline);
 
@@ -196,6 +196,7 @@ public class ResourceManagerJobMasterTest {
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
 		SlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices);
 
 		ResourceManager resourceManager = new StandaloneResourceManager(
 			rpcService,
@@ -203,6 +204,7 @@ public class ResourceManagerJobMasterTest {
 			highAvailabilityServices,
 			slotManagerFactory,
 			metricRegistry,
+			jobLeaderIdService,
 			fatalErrorHandler);
 		resourceManager.start();
 		return resourceManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 4640eab..1016181 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -148,6 +148,7 @@ public class ResourceManagerTaskExecutorTest {
 		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
 		MetricRegistry metricRegistry = mock(MetricRegistry.class);
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(highAvailabilityServices);
 
 
 		StandaloneResourceManager resourceManager =
@@ -157,6 +158,7 @@ public class ResourceManagerTaskExecutorTest {
 				highAvailabilityServices,
 				slotManagerFactory,
 				metricRegistry,
+				jobLeaderIdService,
 				fatalErrorHandler);
 		resourceManager.start();
 		return resourceManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/0e965ae3/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 08ceb86..a3ba436 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
@@ -107,6 +108,7 @@ public class SlotProtocolTest extends TestLogger {
 			configureHA(testingHaServices, jobID, rmAddress, rmLeaderID, jmAddress, jmLeaderID);
 
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHaServices);
 
 		final TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		SpiedResourceManager resourceManager =
@@ -116,12 +118,13 @@ public class SlotProtocolTest extends TestLogger {
 				testingHaServices,
 				slotManagerFactory,
 				mock(MetricRegistry.class),
+				jobLeaderIdService,
 				mock(FatalErrorHandler.class));
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);
 
 		Future<RegistrationResponse> registrationFuture =
-			resourceManager.registerJobMaster(rmLeaderID, jmLeaderID, jmAddress, jobID);
+			resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmAddress, jobID);
 		try {
 			registrationFuture.get(5, TimeUnit.SECONDS);
 		} catch (Exception e) {
@@ -207,6 +210,8 @@ public class SlotProtocolTest extends TestLogger {
 
 		ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration(Time.seconds(5L), Time.seconds(5L));
 
+		JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(testingHaServices);
+
 		TestingSlotManagerFactory slotManagerFactory = new TestingSlotManagerFactory();
 		ResourceManager<ResourceID> resourceManager =
 			Mockito.spy(new StandaloneResourceManager(
@@ -215,6 +220,7 @@ public class SlotProtocolTest extends TestLogger {
 				testingHaServices,
 				slotManagerFactory,
 				mock(MetricRegistry.class),
+				jobLeaderIdService,
 				mock(FatalErrorHandler.class)));
 		resourceManager.start();
 		rmLeaderElectionService.isLeader(rmLeaderID);
@@ -222,7 +228,7 @@ public class SlotProtocolTest extends TestLogger {
 		Thread.sleep(1000);
 
 		Future<RegistrationResponse> registrationFuture =
-			resourceManager.registerJobMaster(rmLeaderID, jmLeaderID, jmAddress, jobID);
+			resourceManager.registerJobManager(rmLeaderID, jmLeaderID, jmAddress, jobID);
 		try {
 			registrationFuture.get(5L, TimeUnit.SECONDS);
 		} catch (Exception e) {
@@ -290,6 +296,7 @@ public class SlotProtocolTest extends TestLogger {
 				HighAvailabilityServices highAvailabilityServices,
 				SlotManagerFactory slotManagerFactory,
 				MetricRegistry metricRegistry,
+				JobLeaderIdService jobLeaderIdService,
 				FatalErrorHandler fatalErrorHandler) {
 			super(
 				rpcService,
@@ -297,6 +304,7 @@ public class SlotProtocolTest extends TestLogger {
 				highAvailabilityServices,
 				slotManagerFactory,
 				metricRegistry,
+				jobLeaderIdService,
 				fatalErrorHandler);
 		}
 


Mime
View raw message