flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [2/2] flink git commit: [FLINK-7506] Fence JobMaster
Date Tue, 05 Sep 2017 10:44:57 GMT
[FLINK-7506] Fence JobMaster

This commit lets the JobMaster extend from FencedRpcEndpoint. This enables
automatic fencing of all messages. Moreover, this PR introduces the
JobMasterId which is the new leader id/fencing token replacing the UUID. This
improves type safety when passing multiple fencing tokens around.

This closes #4583.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ba03b78c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ba03b78c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ba03b78c

Branch: refs/heads/master
Commit: ba03b78c7703fb372e955f759aa6b70b5f444de9
Parents: ff16606
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Aug 24 17:26:22 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Tue Sep 5 12:43:45 2017 +0200

----------------------------------------------------------------------
 .../MesosResourceManagerTest.java               |   9 +-
 .../apache/flink/runtime/instance/SlotPool.java |  20 ++-
 .../runtime/jobmaster/JobManagerRunner.java     |  35 +++--
 .../flink/runtime/jobmaster/JobMaster.java      | 145 +++++--------------
 .../runtime/jobmaster/JobMasterGateway.java     |  20 +--
 .../flink/runtime/jobmaster/JobMasterId.java    |  60 ++++++++
 .../jobmaster/RpcTaskManagerGateway.java        |   9 +-
 .../resourcemanager/JobLeaderIdActions.java     |   5 +-
 .../resourcemanager/JobLeaderIdService.java     |  20 +--
 .../resourcemanager/ResourceManager.java        |  67 ++++-----
 .../resourcemanager/ResourceManagerGateway.java |  10 +-
 .../registration/JobManagerRegistration.java    |  11 +-
 .../exceptions/LeaderSessionIDException.java    |  61 --------
 .../runtime/taskexecutor/JobLeaderListener.java |  10 +-
 .../runtime/taskexecutor/JobLeaderService.java  |  79 +++++-----
 .../taskexecutor/JobManagerConnection.java      |  12 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  75 ++++------
 .../taskexecutor/TaskExecutorGateway.java       |   6 +-
 .../taskexecutor/rpc/RpcInputSplitProvider.java |  11 +-
 .../rpc/RpcPartitionStateChecker.java           |   7 +-
 .../RpcResultPartitionConsumableNotifier.java   |   7 +-
 .../clusterframework/ResourceManagerTest.java   |   9 +-
 .../flink/runtime/instance/SlotPoolRpcTest.java |   7 +-
 .../flink/runtime/instance/SlotPoolTest.java    |  16 +-
 .../jobmaster/JobManagerRunnerMockTest.java     |  32 ++--
 .../flink/runtime/jobmaster/JobMasterTest.java  |  18 +--
 .../resourcemanager/JobLeaderIdServiceTest.java |  15 +-
 .../ResourceManagerJobMasterTest.java           |  22 +--
 .../taskexecutor/TaskExecutorITCase.java        |  15 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  40 ++---
 30 files changed, 357 insertions(+), 496 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index 02b043e..dbd0746 100644
--- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -47,6 +47,7 @@ import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
@@ -385,7 +386,7 @@ public class MesosResourceManagerTest extends TestLogger {
 			public final ResourceID resourceID;
 			public final String address;
 			public final JobMasterGateway gateway;
-			public final UUID leaderSessionID;
+			public final JobMasterId jobMasterId;
 			public final TestingLeaderRetrievalService leaderRetrievalService;
 
 			MockJobMaster(JobID jobID) {
@@ -393,8 +394,8 @@ public class MesosResourceManagerTest extends TestLogger {
 				this.resourceID = new ResourceID(jobID.toString());
 				this.address = "/" + jobID;
 				this.gateway = mock(JobMasterGateway.class);
-				this.leaderSessionID = UUID.randomUUID();
-				this.leaderRetrievalService = new TestingLeaderRetrievalService(this.address, this.leaderSessionID);
+				this.jobMasterId = JobMasterId.generate();
+				this.leaderRetrievalService = new TestingLeaderRetrievalService(this.address, this.jobMasterId.toUUID());
 			}
 		}
 
@@ -442,7 +443,7 @@ public class MesosResourceManagerTest extends TestLogger {
 		 */
 		public void registerJobMaster(MockJobMaster jobMaster) throws Exception  {
 			CompletableFuture<RegistrationResponse> registration = resourceManager.registerJobManager(
-				jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, jobMaster.jobID, timeout);
+				jobMaster.jobMasterId, jobMaster.resourceID, jobMaster.address, jobMaster.jobID, timeout);
 			assertTrue(registration.get() instanceof JobMasterRegistrationSuccess);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 326e3a2..6397043 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -53,7 +54,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
@@ -118,8 +118,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 
 	private final Clock clock;
 
-	/** the leader id of job manager */
-	private UUID jobManagerLeaderId;
+	/** the fencing token of the job manager */
+	private JobMasterId jobMasterId;
 
 	/** The gateway to communicate with resource manager */
 	private ResourceManagerGateway resourceManagerGateway;
@@ -155,6 +155,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		this.waitingForResourceManager = new HashMap<>();
 
 		this.providerAndOwner = new ProviderAndOwner(getSelfGateway(SlotPoolGateway.class), slotRequestTimeout);
+
+		this.jobMasterId = null;
+		this.resourceManagerGateway = null;
+		this.jobManagerAddress = null;
 	}
 
 	// ------------------------------------------------------------------------
@@ -169,11 +173,11 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 	/**
 	 * Start the slot pool to accept RPC calls.
 	 *
-	 * @param newJobManagerLeaderId The necessary leader id for running the job.
+	 * @param jobMasterId The necessary leader id for running the job.
 	 * @param newJobManagerAddress for the slot requests which are sent to the resource manager
 	 */
-	public void start(UUID newJobManagerLeaderId, String newJobManagerAddress) throws Exception {
-		this.jobManagerLeaderId = checkNotNull(newJobManagerLeaderId);
+	public void start(JobMasterId jobMasterId, String newJobManagerAddress) throws Exception {
+		this.jobMasterId = checkNotNull(jobMasterId);
 		this.jobManagerAddress = checkNotNull(newJobManagerAddress);
 
 		// TODO - start should not throw an exception
@@ -195,7 +199,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		stop();
 
 		// do not accept any requests
-		jobManagerLeaderId = null;
+		jobMasterId = null;
 		resourceManagerGateway = null;
 
 		// Clear (but not release!) the available slots. The TaskManagers should re-register them
@@ -313,7 +317,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway {
 		pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources));
 
 		CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot(
-			jobManagerLeaderId,
+			jobMasterId,
 			new SlotRequest(jobId, allocationID, resources, jobManagerAddress),
 			resourceManagerRequestsTimeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index b5b4b82..6f5a082 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -61,22 +61,22 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 
 	// ------------------------------------------------------------------------
 
-	/** Lock to ensure that this runner can deal with leader election event and job completion notifies simultaneously */
+	/** Lock to ensure that this runner can deal with leader election event and job completion notifies simultaneously. */
 	private final Object lock = new Object();
 
-	/** The job graph needs to run */
+	/** The job graph needs to run. */
 	private final JobGraph jobGraph;
 
-	/** The listener to notify once the job completes - either successfully or unsuccessfully */
+	/** The listener to notify once the job completes - either successfully or unsuccessfully. */
 	private final OnCompletionActions toNotifyOnComplete;
 
-	/** The handler to call in case of fatal (unrecoverable) errors */ 
+	/** The handler to call in case of fatal (unrecoverable) errors. */
 	private final FatalErrorHandler errorHandler;
 
-	/** Used to check whether a job needs to be run */
+	/** Used to check whether a job needs to be run. */
 	private final RunningJobsRegistry runningJobsRegistry;
 
-	/** Leader election for this job */
+	/** Leader election for this job. */
 	private final LeaderElectionService leaderElectionService;
 
 	private final JobManagerServices jobManagerServices;
@@ -87,19 +87,18 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 
 	private final Time timeout;
 
-	/** flag marking the runner as shut down */
+	/** flag marking the runner as shut down. */
 	private volatile boolean shutdown;
 
 	// ------------------------------------------------------------------------
 
 	/**
-	 * 
-	 * <p>Exceptions that occur while creating the JobManager or JobManagerRunner are directly
+	 * Exceptions that occur while creating the JobManager or JobManagerRunner are directly
 	 * thrown and not reported to the given {@code FatalErrorHandler}.
-	 * 
+	 *
 	 * <p>This JobManagerRunner assumes that it owns the given {@code JobManagerServices}.
 	 * It will shut them down on error and on calls to {@link #shutdown()}.
-	 * 
+	 *
 	 * @throws Exception Thrown if the runner cannot be set up, because either one of the
 	 *                   required services could not be started, ot the Job could not be initialized.
 	 */
@@ -231,7 +230,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	//----------------------------------------------------------------------------------------------
 
 	/**
-	 * Job completion notification triggered by JobManager
+	 * Job completion notification triggered by JobManager.
 	 */
 	@Override
 	public void jobFinished(JobExecutionResult result) {
@@ -247,7 +246,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	}
 
 	/**
-	 * Job completion notification triggered by JobManager
+	 * Job completion notification triggered by JobManager.
 	 */
 	@Override
 	public void jobFailed(Throwable cause) {
@@ -263,7 +262,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	}
 
 	/**
-	 * Job completion notification triggered by self
+	 * Job completion notification triggered by self.
 	 */
 	@Override
 	public void jobFinishedByOther() {
@@ -278,7 +277,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	}
 
 	/**
-	 * Job completion notification triggered by JobManager or self
+	 * Job completion notification triggered by JobManager or self.
 	 */
 	@Override
 	public void onFatalError(Throwable exception) {
@@ -305,7 +304,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 	/**
 	 * Marks this runner's job as not running. Other JobManager will not recover the job
 	 * after this call.
-	 * 
+	 *
 	 * <p>This method never throws an exception.
 	 */
 	private void unregisterJobFromHighAvailability() {
@@ -359,14 +358,14 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F
 			// This will eventually be noticed, but can not be ruled out from the beginning.
 			if (leaderElectionService.hasLeadership()) {
 				try {
-					// Now set the running status is after getting leader ship and 
+					// Now set the running status is after getting leader ship and
 					// set finished status after job in terminated status.
 					// So if finding the job is running, it means someone has already run the job, need recover.
 					if (schedulingStatus == JobSchedulingStatus.PENDING) {
 						runningJobsRegistry.setJobRunning(jobGraph.getJobID());
 					}
 
-					CompletableFuture<Acknowledge> startingFuture = jobManager.start(leaderSessionID, timeout);
+					CompletableFuture<Acknowledge> startingFuture = jobManager.start(new JobMasterId(leaderSessionID), timeout);
 
 					startingFuture.whenCompleteAsync(
 						(Acknowledge ack, Throwable throwable) -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a05242a..80d6e4f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -53,7 +53,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.Slot;
 import org.apache.flink.runtime.instance.SlotPool;
 import org.apache.flink.runtime.instance.SlotPoolGateway;
@@ -83,7 +82,7 @@ import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -125,7 +124,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * given task</li>
  * </ul>
  */
-public class JobMaster extends RpcEndpoint implements JobMasterGateway {
+public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {
 
 	/** Default names for Flink's distributed components */
 	public static final String JOB_MANAGER_NAME = "jobmanager";
@@ -182,8 +181,6 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 
 	private final SlotPoolGateway slotPoolGateway;
 
-	private volatile UUID leaderSessionID;
-
 	// --------- ResourceManager --------
 
 	/** Leader retriever service used to locate ResourceManager's address */
@@ -215,7 +212,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 			FatalErrorHandler errorHandler,
 			ClassLoader userCodeLoader) throws Exception {
 
-		super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME));
+		super(rpcService, AkkaRpcServiceUtils.createRandomName(JobMaster.JOB_MANAGER_NAME), JobMasterId.INITIAL_JOB_MASTER_ID);
 
 		selfGateway = getSelfGateway(JobMasterGateway.class);
 
@@ -308,15 +305,15 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 	/**
 	 * Start the rpc service and begin to run the job.
 	 *
-	 * @param leaderSessionID The necessary leader id for running the job.
+	 * @param newJobMasterId The necessary fencing token to run the job
 	 * @param timeout for the operation
 	 * @return Future acknowledge if the job could be started. Otherwise the future contains an exception
 	 */
-	public CompletableFuture<Acknowledge> start(final UUID leaderSessionID, final Time timeout) throws Exception {
+	public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId, final Time timeout) throws Exception {
 		// make sure we receive RPC and async calls
 		super.start();
 
-		return callAsync(() -> startJobExecution(leaderSessionID), timeout);
+		return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), timeout);
 	}
 
 	/**
@@ -324,7 +321,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 	 * will be disposed.
 	 *
 	 * <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by
-	 * calling the {@link #start(UUID, Time)} method once we take the leadership back again.
+	 * calling the {@link #start(JobMasterId, Time)} method once we take the leadership back again.
 	 *
 	 * <p>This method is executed asynchronously
 	 *
@@ -333,7 +330,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 	 * @return Future acknowledge indicating that the job has been suspended. Otherwise the future contains an exception
 	 */
 	public CompletableFuture<Acknowledge> suspend(final Throwable cause, final Time timeout) {
-		CompletableFuture<Acknowledge> suspendFuture = callAsync(() -> suspendExecution(cause), timeout);
+		CompletableFuture<Acknowledge> suspendFuture = callAsyncWithoutFencing(() -> suspendExecution(cause), timeout);
 
 		stop();
 
@@ -366,17 +363,10 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 	 */
 	@Override
 	public CompletableFuture<Acknowledge> updateTaskExecutionState(
-			final UUID leaderSessionID,
 			final TaskExecutionState taskExecutionState)
 	{
 		checkNotNull(taskExecutionState, "taskExecutionState");
 
-		try {
-			validateLeaderSessionId(leaderSessionID);
-		} catch (LeaderIdMismatchException e) {
-			return FutureUtils.completedExceptionally(e);
-		}
-
 		if (executionGraph.updateState(taskExecutionState)) {
 			return CompletableFuture.completedFuture(Acknowledge.get());
 		} else {
@@ -388,16 +378,9 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 
 	@Override
 	public CompletableFuture<SerializedInputSplit> requestNextInputSplit(
-			final UUID leaderSessionID,
 			final JobVertexID vertexID,
 			final ExecutionAttemptID executionAttempt) {
 
-		try {
-			validateLeaderSessionId(leaderSessionID);
-		} catch (LeaderIdMismatchException e) {
-			return FutureUtils.completedExceptionally(e);
-		}
-
 		final Execution execution = executionGraph.getRegisteredExecutions().get(executionAttempt);
 		if (execution == null) {
 			// can happen when JobManager had already unregistered this execution upon on task failure,
@@ -444,16 +427,9 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 
 	@Override
 	public CompletableFuture<ExecutionState> requestPartitionState(
-			final UUID leaderSessionID,
 			final IntermediateDataSetID intermediateResultId,
 			final ResultPartitionID resultPartitionId) {
 
-		try {
-			validateLeaderSessionId(leaderSessionID);
-		} catch (LeaderIdMismatchException e) {
-			return FutureUtils.completedExceptionally(e);
-		}
-
 		final Execution execution = executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
 		if (execution != null) {
 			return CompletableFuture.completedFuture(execution.getState());
@@ -483,12 +459,9 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 
 	@Override
 	public CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(
-			final UUID leaderSessionID,
 			final ResultPartitionID partitionID,
 			final Time timeout) {
 		try {
-			validateLeaderSessionId(leaderSessionID);
-
 			executionGraph.scheduleOrUpdateConsumers(partitionID);
 			return CompletableFuture.completedFuture(Acknowledge.get());
 		} catch (Exception e) {
@@ -636,15 +609,8 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 	public CompletableFuture<Collection<SlotOffer>> offerSlots(
 			final ResourceID taskManagerId,
 			final Iterable<SlotOffer> slots,
-			final UUID leaderId,
 			final Time timeout) {
 
-		try {
-			validateLeaderSessionId(leaderId);
-		} catch (LeaderIdMismatchException e) {
-			return FutureUtils.completedExceptionally(e);
-		}
-
 		Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = registeredTaskManagers.get(taskManagerId);
 
 		if (taskManager == null) {
@@ -657,7 +623,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 
 		final ArrayList<Tuple2<AllocatedSlot, SlotOffer>> slotsAndOffers = new ArrayList<>();
 
-		final RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, leaderId);
+		final RpcTaskManagerGateway rpcTaskManagerGateway = new RpcTaskManagerGateway(taskExecutorGateway, getFencingToken());
 
 		for (SlotOffer slotOffer : slots) {
 			final AllocatedSlot slot = new AllocatedSlot(
@@ -678,15 +644,8 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 	public void failSlot(
 			final ResourceID taskManagerId,
 			final AllocationID allocationId,
-			final UUID leaderId,
 			final Exception cause) {
 
-		try {
-			validateLeaderSessionId(leaderSessionID);
-		} catch (LeaderIdMismatchException e) {
-			log.warn("Cannot fail slot " + allocationId + '.', e);
-		}
-
 		if (registeredTaskManagers.containsKey(taskManagerId)) {
 			slotPoolGateway.failAllocation(allocationId, cause);
 		} else {
@@ -699,16 +658,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 	public CompletableFuture<RegistrationResponse> registerTaskManager(
 			final String taskManagerRpcAddress,
 			final TaskManagerLocation taskManagerLocation,
-			final UUID leaderId,
 			final Time timeout) {
-		if (!Objects.equals(leaderSessionID, leaderId)) {
-			log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
-							"leader session ID {} did not equal the received leader session ID {}.",
-					taskManagerLocation.getResourceID(), taskManagerRpcAddress, leaderSessionID, leaderId);
-			return FutureUtils.completedExceptionally(
-				new Exception("Leader id not match, expected: " +
-					leaderSessionID + ", actual: " + leaderId));
-		}
 
 		final ResourceID taskManagerId = taskManagerLocation.getResourceID();
 
@@ -725,13 +675,6 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 							return new RegistrationResponse.Decline(throwable.getMessage());
 						}
 
-						if (!Objects.equals(leaderSessionID, leaderId)) {
-							log.warn("Discard registration from TaskExecutor {} at ({}) because the expected " +
-									"leader session ID {} did not equal the received leader session ID {}.",
-								taskManagerId, taskManagerRpcAddress, leaderSessionID, leaderId);
-							return new RegistrationResponse.Decline("Invalid leader session id");
-						}
-
 						slotPoolGateway.registerTaskManager(taskManagerId);
 						registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, taskExecutorGateway));
 
@@ -756,16 +699,9 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 
 	@Override
 	public void disconnectResourceManager(
-			final UUID jobManagerLeaderId,
 			final ResourceManagerId resourceManagerId,
 			final Exception cause) {
 
-		try {
-			validateLeaderSessionId(jobManagerLeaderId);
-		} catch (LeaderIdMismatchException e) {
-			log.warn("Cannot disconnect resource manager " + resourceManagerId + '.', e);
-		}
-
 		if (resourceManagerConnection != null
 				&& resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId)) {
 			closeResourceManagerConnection(cause);
@@ -788,34 +724,34 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 
 	//-- job starting and stopping  -----------------------------------------------------------------
 
-	private Acknowledge startJobExecution(UUID newLeaderSessionId) throws Exception {
-		Preconditions.checkNotNull(newLeaderSessionId, "The new leader session id must not be null.");
+	private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {
+		validateRunsInMainThread();
 
-		if (leaderSessionID == null) {
-			log.info("Start job execution with leader id {}.", newLeaderSessionId);
+		Preconditions.checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");
 
-			leaderSessionID = newLeaderSessionId;
-		} else if (Objects.equals(leaderSessionID, newLeaderSessionId)) {
-			log.info("Already started the job execution with leader id {}.", leaderSessionID);
+		if (Objects.equals(getFencingToken(), newJobMasterId)) {
+			log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);
 
 			return Acknowledge.get();
-		} else {
-			log.info("Restarting old job with leader id {}. The new leader id is {}.", leaderSessionID, newLeaderSessionId);
+		}
 
-			// first we have to suspend the current execution
-			suspendExecution(new FlinkException("Old job with leader id " + leaderSessionID +
-				" is restarted with a new leader id " + newLeaderSessionId + '.'));
+		if (!Objects.equals(getFencingToken(), JobMasterId.INITIAL_JOB_MASTER_ID)) {
+			log.info("Restarting old job with JobMasterId {}. The new JobMasterId is {}.", getFencingToken(), newJobMasterId);
 
-			// set new leader id
-			leaderSessionID = newLeaderSessionId;
+			// first we have to suspend the current execution
+			suspendExecution(new FlinkException("Old job with JobMasterId " + getFencingToken() +
+				" is restarted with a new JobMasterId " + newJobMasterId + '.'));
 		}
 
+		// set new leader id
+		setFencingToken(newJobMasterId);
+
 		log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID());
 
 		try {
 			// start the slot pool make sure the slot pool now accepts messages for this leader
 			log.debug("Staring SlotPool component");
-			slotPool.start(leaderSessionID, getAddress());
+			slotPool.start(getFencingToken(), getAddress());
 
 			// job is ready to go, try to establish connection with resource manager
 			//   - activate leader retrieval for the resource manager
@@ -848,18 +784,20 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 	 * will be disposed.
 	 *
 	 * <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by
-	 * calling the {@link #start(UUID, Time)} method once we take the leadership back again.
+	 * calling the {@link #start(JobMasterId, Time)} method once we take the leadership back again.
 	 *
 	 * @param cause The reason of why this job been suspended.
 	 */
 	private Acknowledge suspendExecution(final Throwable cause) {
-		if (leaderSessionID == null) {
+		validateRunsInMainThread();
+
+		if (getFencingToken() == null) {
 			log.debug("Job has already been suspended or shutdown.");
 			return Acknowledge.get();
 		}
 
-		// not leader any more - should not accept any leader messages any more
-		leaderSessionID = null;
+		// not leader anymore --> set the JobMasterId to the initial id
+		setFencingToken(JobMasterId.INITIAL_JOB_MASTER_ID);
 
 		try {
 			resourceManagerLeaderRetriever.stop();
@@ -973,7 +911,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 				jobGraph.getJobID(),
 				resourceId,
 				getAddress(),
-				leaderSessionID,
+				getFencingToken(),
 				resourceManagerAddress,
 				resourceManagerId,
 				executor);
@@ -1028,12 +966,6 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 		slotPoolGateway.disconnectResourceManager();
 	}
 
-	private void validateLeaderSessionId(UUID leaderSessionID) throws LeaderIdMismatchException {
-		if (this.leaderSessionID == null || !this.leaderSessionID.equals(leaderSessionID)) {
-			throw new LeaderIdMismatchException(this.leaderSessionID, leaderSessionID);
-		}
-	}
-
 	//----------------------------------------------------------------------------------------------
 	// Utility classes
 	//----------------------------------------------------------------------------------------------
@@ -1065,7 +997,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 
 		private final String jobManagerRpcAddress;
 
-		private final UUID jobManagerLeaderID;
+		private final JobMasterId jobMasterId;
 
 		private ResourceID resourceManagerResourceID;
 
@@ -1074,7 +1006,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 				final JobID jobID,
 				final ResourceID jobManagerResourceID,
 				final String jobManagerRpcAddress,
-				final UUID jobManagerLeaderID,
+				final JobMasterId jobMasterId,
 				final String resourceManagerAddress,
 				final ResourceManagerId resourceManagerId,
 				final Executor executor)
@@ -1083,7 +1015,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 			this.jobID = checkNotNull(jobID);
 			this.jobManagerResourceID = checkNotNull(jobManagerResourceID);
 			this.jobManagerRpcAddress = checkNotNull(jobManagerRpcAddress);
-			this.jobManagerLeaderID = checkNotNull(jobManagerLeaderID);
+			this.jobMasterId = checkNotNull(jobMasterId);
 		}
 
 		@Override
@@ -1099,7 +1031,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 					Time timeout = Time.milliseconds(timeoutMillis);
 
 					return gateway.registerJobManager(
-						jobManagerLeaderID,
+						jobMasterId,
 						jobManagerResourceID,
 						jobManagerRpcAddress,
 						jobID,
@@ -1145,12 +1077,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway {
 				final Throwable error) {
 
 			// run in rpc thread to avoid concurrency
-			runAsync(new Runnable() {
-				@Override
-				public void run() {
-					jobStatusChanged(newJobStatus, timestamp, error);
-				}
-			});
+			runAsync(() -> jobStatusChanged(newJobStatus, timestamp, error));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index b39f419..965d88d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -45,36 +46,31 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.util.Collection;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * {@link JobMaster} rpc gateway interface
  */
-public interface JobMasterGateway extends CheckpointCoordinatorGateway {
+public interface JobMasterGateway extends CheckpointCoordinatorGateway, FencedRpcGateway<JobMasterId> {
 
 	/**
 	 * Updates the task execution state for a given task.
 	 *
-	 * @param leaderSessionID    The leader id of JobManager
 	 * @param taskExecutionState New task execution state for a given task
 	 * @return Future flag of the task execution state update result
 	 */
 	CompletableFuture<Acknowledge> updateTaskExecutionState(
-			final UUID leaderSessionID,
 			final TaskExecutionState taskExecutionState);
 
 	/**
 	 * Requesting next input split for the {@link ExecutionJobVertex}. The next input split is sent back to the sender
 	 * as a {@link SerializedInputSplit} message.
 	 *
-	 * @param leaderSessionID  The leader id of JobManager
 	 * @param vertexID         The job vertex id
 	 * @param executionAttempt The execution attempt id
 	 * @return The future of the input split. If there is no further input split, will return an empty object.
 	 */
 	CompletableFuture<SerializedInputSplit> requestNextInputSplit(
-			final UUID leaderSessionID,
 			final JobVertexID vertexID,
 			final ExecutionAttemptID executionAttempt);
 
@@ -82,13 +78,11 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * Requests the current state of the partition.
 	 * The state of a partition is currently bound to the state of the producing execution.
 	 *
-	 * @param leaderSessionID The leader id of JobManager
 	 * @param intermediateResultId The execution attempt ID of the task requesting the partition state.
 	 * @param partitionId          The partition ID of the partition to request the state of.
 	 * @return The future of the partition state
 	 */
 	CompletableFuture<ExecutionState> requestPartitionState(
-			final UUID leaderSessionID,
 			final IntermediateDataSetID intermediateResultId,
 			final ResultPartitionID partitionId);
 
@@ -101,13 +95,11 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 * <p>
 	 * The JobManager then can decide when to schedule the partition consumers of the given session.
 	 *
-	 * @param leaderSessionID The leader id of JobManager
 	 * @param partitionID     The partition which has already produced data
 	 * @param timeout         before the rpc call fails
 	 * @return Future acknowledge of the schedule or update operation
 	 */
 	CompletableFuture<Acknowledge> scheduleOrUpdateConsumers(
-			final UUID leaderSessionID,
 			final ResultPartitionID partitionID,
 			@RpcTimeout final Time timeout);
 
@@ -123,12 +115,10 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	/**
 	 * Disconnects the resource manager from the job manager because of the given cause.
 	 *
-	 * @param jobManagerLeaderId identifying the job manager leader id
 	 * @param resourceManagerId identifying the resource manager leader id
 	 * @param cause of the disconnect
 	 */
 	void disconnectResourceManager(
-		final UUID jobManagerLeaderId,
 		final ResourceManagerId resourceManagerId,
 		final Exception cause);
 
@@ -174,14 +164,12 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 *
 	 * @param taskManagerId identifying the task manager
 	 * @param slots         to offer to the job manager
-	 * @param leaderId      identifying the job leader
 	 * @param timeout       for the rpc call
 	 * @return Future set of accepted slots.
 	 */
 	CompletableFuture<Collection<SlotOffer>> offerSlots(
 			final ResourceID taskManagerId,
 			final Iterable<SlotOffer> slots,
-			final UUID leaderId,
 			@RpcTimeout final Time timeout);
 
 	/**
@@ -189,12 +177,10 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 *
 	 * @param taskManagerId identifying the task manager
 	 * @param allocationId  identifying the slot to fail
-	 * @param leaderId      identifying the job leader
 	 * @param cause         of the failing
 	 */
 	void failSlot(final ResourceID taskManagerId,
 			final AllocationID allocationId,
-			final UUID leaderId,
 			final Exception cause);
 
 	/**
@@ -202,14 +188,12 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 	 *
 	 * @param taskManagerRpcAddress the rpc address of the task manager
 	 * @param taskManagerLocation   location of the task manager
-	 * @param leaderId              identifying the job leader
 	 * @param timeout               for the rpc call
 	 * @return Future registration response indicating whether the registration was successful or not
 	 */
 	CompletableFuture<RegistrationResponse> registerTaskManager(
 			final String taskManagerRpcAddress,
 			final TaskManagerLocation taskManagerLocation,
-			final UUID leaderId,
 			@RpcTimeout final Time timeout);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
new file mode 100644
index 0000000..ffd53b3
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterId.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.util.AbstractID;
+
+import java.util.UUID;
+
+/**
+ * The {@link JobMaster} fencing token.
+ */
+public class JobMasterId extends AbstractID {
+
+	private static final long serialVersionUID = -933276753644003754L;
+
+	public static final JobMasterId INITIAL_JOB_MASTER_ID = new JobMasterId(0L, 0L);
+
+	public JobMasterId(byte[] bytes) {
+		super(bytes);
+	}
+
+	public JobMasterId(long lowerPart, long upperPart) {
+		super(lowerPart, upperPart);
+	}
+
+	public JobMasterId(AbstractID id) {
+		super(id);
+	}
+
+	public JobMasterId() {
+	}
+
+	public JobMasterId(UUID uuid) {
+		this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits());
+	}
+
+	public UUID toUUID() {
+		return new UUID(getUpperPart(), getLowerPart());
+	}
+
+	public static JobMasterId generate() {
+		return new JobMasterId();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index e93c907..8967aae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.util.Preconditions;
 
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -43,11 +42,11 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
 
 	private final TaskExecutorGateway taskExecutorGateway;
 
-	private final UUID leaderId;
+	private final JobMasterId jobMasterId;
 
-	public RpcTaskManagerGateway(TaskExecutorGateway taskExecutorGateway, UUID leaderId) {
+	public RpcTaskManagerGateway(TaskExecutorGateway taskExecutorGateway, JobMasterId jobMasterId) {
 		this.taskExecutorGateway = Preconditions.checkNotNull(taskExecutorGateway);
-		this.leaderId = Preconditions.checkNotNull(leaderId);
+		this.jobMasterId = Preconditions.checkNotNull(jobMasterId);
 	}
 
 	@Override
@@ -87,7 +86,7 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
 
 	@Override
 	public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
-		return taskExecutorGateway.submitTask(tdd, leaderId, timeout);
+		return taskExecutorGateway.submitTask(tdd, jobMasterId, timeout);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
index 4ca6209..565cd82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdActions.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 
 import java.util.UUID;
 
@@ -31,9 +32,9 @@ public interface JobLeaderIdActions {
 	 * Callback when a monitored job leader lost its leadership.
 	 *
 	 * @param jobId identifying the job whose leader lost leadership
-	 * @param oldJobLeaderId of the job manager which lost leadership
+	 * @param oldJobMasterId of the job manager which lost leadership
 	 */
-	void jobLeaderLostLeadership(JobID jobId, UUID oldJobLeaderId);
+	void jobLeaderLostLeadership(JobID jobId, JobMasterId oldJobMasterId);
 
 	/**
 	 * Notify a job timeout. The job is identified by the given JobID. In order to check

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
index aaa72d9..da0a7fd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdService.java
@@ -22,14 +22,17 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -49,17 +52,17 @@ public class JobLeaderIdService {
 
 	private static final Logger LOG = LoggerFactory.getLogger(JobLeaderIdService.class);
 
-	/** High availability services to use by this service */
+	/** High availability services to use by this service. */
 	private final HighAvailabilityServices highAvailabilityServices;
 
 	private final ScheduledExecutor scheduledExecutor;
 
 	private final Time jobTimeout;
 
-	/** Map of currently monitored jobs */
+	/** Map of currently monitored jobs. */
 	private final Map<JobID, JobLeaderIdListener> jobLeaderIdListeners;
 
-	/** Actions to call when the job leader changes */
+	/** Actions to call when the job leader changes. */
 	private JobLeaderIdActions jobLeaderIdActions;
 
 	public JobLeaderIdService(
@@ -178,14 +181,14 @@ public class JobLeaderIdService {
 		return jobLeaderIdListeners.containsKey(jobId);
 	}
 
-	public CompletableFuture<UUID> getLeaderId(JobID jobId) throws Exception {
+	public CompletableFuture<JobMasterId> getLeaderId(JobID jobId) throws Exception {
 		if (!jobLeaderIdListeners.containsKey(jobId)) {
 			addJob(jobId);
 		}
 
 		JobLeaderIdListener listener = jobLeaderIdListeners.get(jobId);
 
-		return listener.getLeaderIdFuture();
+		return listener.getLeaderIdFuture().thenApply((UUID id) -> id != null ? new JobMasterId(id) : null);
 	}
 
 	public boolean isValidTimeout(JobID jobId, UUID timeoutId) {
@@ -216,15 +219,14 @@ public class JobLeaderIdService {
 		private volatile CompletableFuture<UUID> leaderIdFuture;
 		private volatile boolean running = true;
 
-		/** Null if no timeout has been scheduled; otherwise non null */
+		/** Null if no timeout has been scheduled; otherwise non null. */
 		@Nullable
 		private  volatile ScheduledFuture<?> timeoutFuture;
 
-		/** Null if no timeout has been scheduled; otherwise non null */
+		/** Null if no timeout has been scheduled; otherwise non null. */
 		@Nullable
 		private volatile UUID timeoutId;
 
-
 		private JobLeaderIdListener(
 				JobID jobId,
 				JobLeaderIdActions listenerJobLeaderIdActions,
@@ -279,7 +281,7 @@ public class JobLeaderIdService {
 
 				if (previousJobLeaderId != null && !previousJobLeaderId.equals(leaderSessionId)) {
 					// we had a previous job leader, so notify about his lost leadership
-					listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, previousJobLeaderId);
+					listenerJobLeaderIdActions.jobLeaderLostLeadership(jobId, new JobMasterId(previousJobLeaderId));
 
 					if (null == leaderSessionId) {
 						// No current leader active ==> Set a timeout for the job

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 659b3d4..87cf7d1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -51,7 +52,6 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
@@ -75,8 +75,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>It offers the following methods as part of its rpc interface to interact with him remotely:
  * <ul>
- *     <li>{@link #registerJobManager(UUID, ResourceID, String, JobID, Time)} registers a {@link JobMaster} at the resource manager</li>
- *     <li>{@link #requestSlot(UUID, SlotRequest, Time)} requests a slot from the resource manager</li>
+ *     <li>{@link #registerJobManager(JobMasterId, ResourceID, String, JobID, Time)} registers a {@link JobMaster} at the resource manager</li>
+ *     <li>{@link #requestSlot(JobMasterId, SlotRequest, Time)} requests a slot from the resource manager</li>
  * </ul>
  */
 public abstract class ResourceManager<WorkerType extends Serializable>
@@ -240,13 +240,13 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 	@Override
 	public CompletableFuture<RegistrationResponse> registerJobManager(
-			final UUID jobManagerLeaderId,
+			final JobMasterId jobMasterId,
 			final ResourceID jobManagerResourceId,
 			final String jobManagerAddress,
 			final JobID jobId,
 			final Time timeout) {
 
-		checkNotNull(jobManagerLeaderId);
+		checkNotNull(jobMasterId);
 		checkNotNull(jobManagerResourceId);
 		checkNotNull(jobManagerAddress);
 		checkNotNull(jobId);
@@ -265,12 +265,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			}
 		}
 
-		log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId);
+		log.info("Registering job manager {}@{} for job {}.", jobMasterId, jobManagerAddress, jobId);
 
-		CompletableFuture<UUID> jobLeaderIdFuture;
+		CompletableFuture<JobMasterId> jobMasterIdFuture;
 
 		try {
-			jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
+			jobMasterIdFuture = jobLeaderIdService.getLeaderId(jobId);
 		} catch (Exception e) {
 			// we cannot check the job leader id so let's fail
 			// TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id
@@ -283,21 +283,20 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			return FutureUtils.completedExceptionally(exception);
 		}
 
-		CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, JobMasterGateway.class);
+		CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, jobMasterId, JobMasterGateway.class);
 
 		CompletableFuture<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(
-			jobLeaderIdFuture,
-			(JobMasterGateway jobMasterGateway, UUID jobLeaderId) -> {
-				if (Objects.equals(jobLeaderId, jobManagerLeaderId)) {
+			jobMasterIdFuture,
+			(JobMasterGateway jobMasterGateway, JobMasterId currentJobMasterId) -> {
+				if (Objects.equals(currentJobMasterId, jobMasterId)) {
 					return registerJobMasterInternal(
 						jobMasterGateway,
-						jobLeaderId,
 						jobId,
 						jobManagerAddress,
 						jobManagerResourceId);
 				} else {
-					log.debug("The job manager leader id {} did not match the job " +
-						"leader id {}.", jobManagerLeaderId, jobLeaderId);
+					log.debug("The current JobMaster leader id {} did not match the received " +
+						"JobMaster id {}.", jobMasterId, currentJobMasterId);
 					return new RegistrationResponse.Decline("Job manager leader id did not match.");
 				}
 			},
@@ -308,9 +307,9 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			(RegistrationResponse registrationResponse, Throwable throwable) -> {
 				if (throwable != null) {
 					if (log.isDebugEnabled()) {
-						log.debug("Registration of job manager {}@{} failed.", jobManagerLeaderId, jobManagerAddress, throwable);
+						log.debug("Registration of job manager {}@{} failed.", jobMasterId, jobManagerAddress, throwable);
 					} else {
-						log.info("Registration of job manager {}@{} failed.", jobManagerLeaderId, jobManagerAddress);
+						log.info("Registration of job manager {}@{} failed.", jobMasterId, jobManagerAddress);
 					}
 
 					return new RegistrationResponse.Decline(throwable.getMessage());
@@ -367,7 +366,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 	@Override
 	public CompletableFuture<Acknowledge> requestSlot(
-			UUID jobMasterLeaderID,
+			JobMasterId jobMasterId,
 			SlotRequest slotRequest,
 			final Time timeout) {
 
@@ -375,7 +374,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
 
 		if (null != jobManagerRegistration) {
-			if (Objects.equals(jobMasterLeaderID, jobManagerRegistration.getLeaderID())) {
+			if (Objects.equals(jobMasterId, jobManagerRegistration.getJobMasterId())) {
 				log.info("Request slot with profile {} for job {} with allocation id {}.",
 					slotRequest.getResourceProfile(),
 					slotRequest.getJobId(),
@@ -389,7 +388,8 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 				return CompletableFuture.completedFuture(Acknowledge.get());
 			} else {
-				return FutureUtils.completedExceptionally(new LeaderSessionIDException(jobMasterLeaderID, jobManagerRegistration.getLeaderID()));
+				return FutureUtils.completedExceptionally(new ResourceManagerException("The job leader's id " +
+					jobManagerRegistration.getJobMasterId() + " does not match the received id " + jobMasterId + '.'));
 			}
 
 		} else {
@@ -488,7 +488,6 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 * Registers a new JobMaster.
 	 *
 	 * @param jobMasterGateway to communicate with the registering JobMaster
-	 * @param jobLeaderId leader id of the JobMaster
 	 * @param jobId of the job for which the JobMaster is responsible
 	 * @param jobManagerAddress address of the JobMaster
 	 * @param jobManagerResourceId ResourceID of the JobMaster
@@ -496,16 +495,15 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	 */
 	private RegistrationResponse registerJobMasterInternal(
 		final JobMasterGateway jobMasterGateway,
-		UUID jobLeaderId,
 		JobID jobId,
 		String jobManagerAddress,
 		ResourceID jobManagerResourceId) {
 		if (jobManagerRegistrations.containsKey(jobId)) {
 			JobManagerRegistration oldJobManagerRegistration = jobManagerRegistrations.get(jobId);
 
-			if (oldJobManagerRegistration.getLeaderID().equals(jobLeaderId)) {
+			if (Objects.equals(oldJobManagerRegistration.getJobMasterId(), jobMasterGateway.getFencingToken())) {
 				// same registration
-				log.debug("Job manager {}@{} was already registered.", jobLeaderId, jobManagerAddress);
+				log.debug("Job manager {}@{} was already registered.", jobMasterGateway.getFencingToken(), jobManagerAddress);
 			} else {
 				// tell old job manager that he is no longer the job leader
 				disconnectJobManager(
@@ -515,7 +513,6 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 				JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(
 					jobId,
 					jobManagerResourceId,
-					jobLeaderId,
 					jobMasterGateway);
 				jobManagerRegistrations.put(jobId, jobManagerRegistration);
 				jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
@@ -525,13 +522,12 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(
 				jobId,
 				jobManagerResourceId,
-				jobLeaderId,
 				jobMasterGateway);
 			jobManagerRegistrations.put(jobId, jobManagerRegistration);
 			jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
 		}
 
-		log.info("Registered job manager {}@{} for job {}.", jobLeaderId, jobManagerAddress, jobId);
+		log.info("Registered job manager {}@{} for job {}.", jobMasterGateway.getFencingToken(), jobManagerAddress, jobId);
 
 		jobManagerHeartbeatManager.monitorTarget(jobManagerResourceId, new HeartbeatTarget<Void>() {
 			@Override
@@ -633,10 +629,10 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		if (jobManagerRegistration != null) {
 			final ResourceID jobManagerResourceId = jobManagerRegistration.getJobManagerResourceID();
 			final JobMasterGateway jobMasterGateway = jobManagerRegistration.getJobManagerGateway();
-			final UUID jobManagerLeaderId = jobManagerRegistration.getLeaderID();
+			final JobMasterId jobMasterId = jobManagerRegistration.getJobMasterId();
 
 			log.info("Disconnect job manager {}@{} for job {} from the resource manager.",
-				jobManagerLeaderId,
+				jobMasterId,
 				jobMasterGateway.getAddress(),
 				jobId);
 
@@ -645,7 +641,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 			jmResourceIdRegistrations.remove(jobManagerResourceId);
 
 			// tell the job manager about the disconnect
-			jobMasterGateway.disconnectResourceManager(jobManagerLeaderId, getFencingToken(), cause);
+			jobMasterGateway.disconnectResourceManager(getFencingToken(), cause);
 		} else {
 			log.debug("There was no registered job manager for job {}.", jobId);
 		}
@@ -687,17 +683,17 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 		}
 	}
 
-	protected void jobLeaderLostLeadership(JobID jobId, UUID oldJobLeaderId) {
+	protected void jobLeaderLostLeadership(JobID jobId, JobMasterId oldJobMasterId) {
 		if (jobManagerRegistrations.containsKey(jobId)) {
 			JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId);
 
-			if (Objects.equals(jobManagerRegistration.getLeaderID(), oldJobLeaderId)) {
+			if (Objects.equals(jobManagerRegistration.getJobMasterId(), oldJobMasterId)) {
 				disconnectJobManager(jobId, new Exception("Job leader lost leadership."));
 			} else {
 				log.debug("Discarding job leader lost leadership, because a new job leader was found for job {}. ", jobId);
 			}
 		} else {
-			log.debug("Discard job leader lost leadership for outdated leader {} for job {}.", oldJobLeaderId, jobId);
+			log.debug("Discard job leader lost leadership for outdated leader {} for job {}.", oldJobMasterId, jobId);
 		}
 	}
 
@@ -824,6 +820,7 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 
 	/**
 	 * Allocates a resource using the resource profile.
+	 *
 	 * @param resourceProfile The resource description
 	 */
 	@VisibleForTesting
@@ -891,11 +888,11 @@ public abstract class ResourceManager<WorkerType extends Serializable>
 	private class JobLeaderIdActionsImpl implements JobLeaderIdActions {
 
 		@Override
-		public void jobLeaderLostLeadership(final JobID jobId, final UUID oldJobLeaderId) {
+		public void jobLeaderLostLeadership(final JobID jobId, final JobMasterId oldJobMasterId) {
 			runAsync(new Runnable() {
 				@Override
 				public void run() {
-					ResourceManager.this.jobLeaderLostLeadership(jobId, oldJobLeaderId);
+					ResourceManager.this.jobLeaderLostLeadership(jobId, oldJobMasterId);
 				}
 			});
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index ac81048..a957716 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
@@ -33,7 +34,6 @@ import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -44,7 +44,7 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 	/**
 	 * Register a {@link JobMaster} at the resource manager.
 	 *
-	 * @param jobMasterLeaderId The fencing token for the JobMaster leader
+	 * @param jobMasterId The fencing token for the JobMaster leader
 	 * @param jobMasterResourceId The resource ID of the JobMaster that registers
 	 * @param jobMasterAddress The address of the JobMaster that registers
 	 * @param jobId The Job ID of the JobMaster that registers
@@ -52,7 +52,7 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 	 * @return Future registration response
 	 */
 	CompletableFuture<RegistrationResponse> registerJobManager(
-		UUID jobMasterLeaderId,
+		JobMasterId jobMasterId,
 		ResourceID jobMasterResourceId,
 		String jobMasterAddress,
 		JobID jobId,
@@ -61,12 +61,12 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager
 	/**
 	 * Requests a slot from the resource manager.
 	 *
-	 * @param jobMasterLeaderID leader if of the JobMaster
+	 * @param jobMasterId id of the JobMaster
 	 * @param slotRequest The slot to request
 	 * @return The confirmation that the slot gets allocated
 	 */
 	CompletableFuture<Acknowledge> requestSlot(
-		UUID jobMasterLeaderID,
+		JobMasterId jobMasterId,
 		SlotRequest slotRequest,
 		@RpcTimeout Time timeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
index df3a39f..dca2db6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobManagerRegistration.java
@@ -21,10 +21,9 @@ package org.apache.flink.runtime.resourcemanager.registration;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.util.Preconditions;
 
-import java.util.UUID;
-
 /**
  * Container for JobManager related registration information, such as the leader id or the job id.
  */
@@ -33,18 +32,14 @@ public class JobManagerRegistration {
 
 	private final ResourceID jobManagerResourceID;
 
-	private final UUID leaderID;
-
 	private final JobMasterGateway jobManagerGateway;
 
 	public JobManagerRegistration(
 			JobID jobID,
 			ResourceID jobManagerResourceID,
-			UUID leaderID,
 			JobMasterGateway jobManagerGateway) {
 		this.jobID = Preconditions.checkNotNull(jobID);
 		this.jobManagerResourceID = Preconditions.checkNotNull(jobManagerResourceID);
-		this.leaderID = Preconditions.checkNotNull(leaderID);
 		this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway);
 	}
 
@@ -56,8 +51,8 @@ public class JobManagerRegistration {
 		return jobManagerResourceID;
 	}
 
-	public UUID getLeaderID() {
-		return leaderID;
+	public JobMasterId getJobMasterId() {
+		return jobManagerGateway.getFencingToken();
 	}
 
 	public JobMasterGateway getJobManagerGateway() {

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
deleted file mode 100644
index d3ba9a9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.rpc.exceptions;
-
-import java.util.UUID;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * An exception specifying that the received leader session ID is not the same as expected.
- */
-public class LeaderSessionIDException extends Exception {
-
-	private static final long serialVersionUID = -3276145308053264636L;
-
-	/** expected leader session id */
-	private final UUID expectedLeaderSessionID;
-
-	/** actual leader session id */
-	private final UUID actualLeaderSessionID;
-
-	public LeaderSessionIDException(UUID expectedLeaderSessionID, UUID actualLeaderSessionID) {
-		super("Unmatched leader session ID : expected " + expectedLeaderSessionID + ", actual " + actualLeaderSessionID);
-		this.expectedLeaderSessionID =  checkNotNull(expectedLeaderSessionID);
-		this.actualLeaderSessionID = checkNotNull(actualLeaderSessionID);
-	}
-
-	/**
-	 * Get expected leader session id
-	 *
-	 * @return expect leader session id
-	 */
-	public UUID getExpectedLeaderSessionID() {
-		return expectedLeaderSessionID;
-	}
-
-	/**
-	 * Get actual leader session id
-	 *
-	 * @return actual leader session id
-	 */
-	public UUID getActualLeaderSessionID() {
-		return actualLeaderSessionID;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java
index f02a8c2..65012a0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java
@@ -21,8 +21,7 @@ package org.apache.flink.runtime.taskexecutor;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-
-import java.util.UUID;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 
 /**
  * Listener for the {@link JobLeaderService}. The listener is notified whenever a job manager
@@ -38,18 +37,17 @@ public interface JobLeaderListener {
 	 *
 	 * @param jobId identifying the job for which the job manager has gained leadership
 	 * @param jobManagerGateway to the job leader
-	 * @param jobLeaderId new leader id of the job leader
 	 * @param registrationMessage containing further registration information
 	 */
-	void jobManagerGainedLeadership(JobID jobId, JobMasterGateway jobManagerGateway, UUID jobLeaderId, JMTMRegistrationSuccess registrationMessage);
+	void jobManagerGainedLeadership(JobID jobId, JobMasterGateway jobManagerGateway, JMTMRegistrationSuccess registrationMessage);
 
 	/**
 	 * Callback if the job leader for the job with the given job id lost its leadership.
 	 *
 	 * @param jobId identifying the job whose leader has lost leadership
-	 * @param jobLeaderId old leader id
+	 * @param jobMasterId old JobMasterId
 	 */
-	void jobManagerLostLeadership(JobID jobId, UUID jobLeaderId);
+	void jobManagerLostLeadership(JobID jobId, JobMasterId jobMasterId);
 
 	/**
 	 * Callback for errors which might occur in the {@link JobLeaderService}.

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index f564df4..20dcfa9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
@@ -32,6 +33,7 @@ import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,25 +56,25 @@ public class JobLeaderService {
 
 	private static final Logger LOG = LoggerFactory.getLogger(JobLeaderService.class);
 
-	/** Self's location, used for the job manager connection */
+	/** Self's location, used for the job manager connection. */
 	private final TaskManagerLocation ownLocation;
 
-	/** The leader retrieval service and listener for each registered job */
+	/** The leader retrieval service and listener for each registered job. */
 	private final Map<JobID, Tuple2<LeaderRetrievalService, JobLeaderService.JobManagerLeaderListener>> jobLeaderServices;
 
-	/** Internal state of the service */
+	/** Internal state of the service. */
 	private volatile JobLeaderService.State state;
 
-	/** Address of the owner of this service. This address is used for the job manager connection */
+	/** Address of the owner of this service. This address is used for the job manager connection. */
 	private String ownerAddress;
 
-	/** Rpc service to use for establishing connections */
+	/** Rpc service to use for establishing connections. */
 	private RpcService rpcService;
 
-	/** High availability services to create the leader retrieval services from */
+	/** High availability services to create the leader retrieval services from. */
 	private HighAvailabilityServices highAvailabilityServices;
 
-	/** Job leader listener listening for job leader changes */
+	/** Job leader listener listening for job leader changes. */
 	private JobLeaderListener jobLeaderListener;
 
 	public JobLeaderService(TaskManagerLocation location) {
@@ -207,24 +209,24 @@ public class JobLeaderService {
 	 */
 	private final class JobManagerLeaderListener implements LeaderRetrievalListener {
 
-		/** Job id identifying the job to look for a leader */
+		/** Job id identifying the job to look for a leader. */
 		private final JobID jobId;
 
-		/** Rpc connection to the job leader */
-		private RegisteredRpcConnection<UUID, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
+		/** Rpc connection to the job leader. */
+		private RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection;
 
-		/** State of the listener */
+		/** State of the listener. */
 		private volatile boolean stopped;
 
-		/** Leader id of the current job leader */
-		private volatile UUID currentLeaderId;
+		/** Leader id of the current job leader. */
+		private volatile JobMasterId currentJobMasterId;
 
 		private JobManagerLeaderListener(JobID jobId) {
 			this.jobId = Preconditions.checkNotNull(jobId);
 
 			stopped = false;
 			rpcConnection = null;
-			currentLeaderId = null;
+			currentJobMasterId = null;
 		}
 
 		public void stop() {
@@ -241,8 +243,10 @@ public class JobLeaderService {
 				LOG.debug("{}'s leader retrieval listener reported a new leader for job {}. " +
 					"However, the service is no longer running.", JobLeaderService.class.getSimpleName(), jobId);
 			} else {
+				final JobMasterId jobMasterId = leaderId != null ? new JobMasterId(leaderId) : null;
+
 				LOG.debug("New leader information for job {}. Address: {}, leader id: {}.",
-					jobId, leaderAddress, leaderId);
+					jobId, leaderAddress, jobMasterId);
 
 				if (leaderAddress == null || leaderAddress.isEmpty()) {
 					// the leader lost leadership but there is no other leader yet.
@@ -250,28 +254,28 @@ public class JobLeaderService {
 						rpcConnection.close();
 					}
 
-					jobLeaderListener.jobManagerLostLeadership(jobId, currentLeaderId);
+					jobLeaderListener.jobManagerLostLeadership(jobId, currentJobMasterId);
 
-					currentLeaderId = leaderId;
+					currentJobMasterId = jobMasterId;
 				} else {
-					currentLeaderId = leaderId;
+					currentJobMasterId = jobMasterId;
 
 					if (rpcConnection != null) {
 						// check if we are already trying to connect to this leader
-						if (!leaderId.equals(rpcConnection.getTargetLeaderId())) {
+						if (!Objects.equals(jobMasterId, rpcConnection.getTargetLeaderId())) {
 							rpcConnection.close();
 
 							rpcConnection = new JobManagerRegisteredRpcConnection(
 								LOG,
 								leaderAddress,
-								leaderId,
+								jobMasterId,
 								rpcService.getExecutor());
 						}
 					} else {
 						rpcConnection = new JobManagerRegisteredRpcConnection(
 							LOG,
 							leaderAddress,
-							leaderId,
+							jobMasterId,
 							rpcService.getExecutor());
 					}
 
@@ -300,18 +304,18 @@ public class JobLeaderService {
 		/**
 		 * Rpc connection for the job manager <--> task manager connection.
 		 */
-		private final class JobManagerRegisteredRpcConnection extends RegisteredRpcConnection<UUID, JobMasterGateway, JMTMRegistrationSuccess> {
+		private final class JobManagerRegisteredRpcConnection extends RegisteredRpcConnection<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> {
 
 			JobManagerRegisteredRpcConnection(
 				Logger log,
 				String targetAddress,
-				UUID targetLeaderId,
+				JobMasterId jobMasterId,
 				Executor executor) {
-				super(log, targetAddress, targetLeaderId, executor);
+				super(log, targetAddress, jobMasterId, executor);
 			}
 
 			@Override
-			protected RetryingRegistration<UUID, JobMasterGateway, JMTMRegistrationSuccess> generateRegistration() {
+			protected RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> generateRegistration() {
 				return new JobLeaderService.JobManagerRetryingRegistration(
 						LOG,
 						rpcService,
@@ -326,10 +330,10 @@ public class JobLeaderService {
 			@Override
 			protected void onRegistrationSuccess(JMTMRegistrationSuccess success) {
 				// filter out old registration attempts
-				if (Objects.equals(getTargetLeaderId(), currentLeaderId)) {
+				if (Objects.equals(getTargetLeaderId(), currentJobMasterId)) {
 					log.info("Successful registration at job manager {} for job {}.", getTargetAddress(), jobId);
 
-					jobLeaderListener.jobManagerGainedLeadership(jobId, getTargetGateway(), getTargetLeaderId(), success);
+					jobLeaderListener.jobManagerGainedLeadership(jobId, getTargetGateway(), success);
 				} else {
 					log.debug("Encountered obsolete JobManager registration success from {} with leader session ID {}.", getTargetAddress(), getTargetLeaderId());
 				}
@@ -338,7 +342,7 @@ public class JobLeaderService {
 			@Override
 			protected void onRegistrationFailure(Throwable failure) {
 				// filter out old registration attempts
-				if (Objects.equals(getTargetLeaderId(), currentLeaderId)) {
+				if (Objects.equals(getTargetLeaderId(), currentJobMasterId)) {
 					log.info("Failed to register at job  manager {} for job {}.", getTargetAddress(), jobId);
 					jobLeaderListener.handleError(failure);
 				} else {
@@ -352,7 +356,7 @@ public class JobLeaderService {
 	 * Retrying registration for the job manager <--> task manager connection.
 	 */
 	private static final class JobManagerRetryingRegistration
-			extends RetryingRegistration<UUID, JobMasterGateway, JMTMRegistrationSuccess>
+			extends RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess>
 	{
 
 		private final String taskManagerRpcAddress;
@@ -365,11 +369,10 @@ public class JobLeaderService {
 				String targetName,
 				Class<JobMasterGateway> targetType,
 				String targetAddress,
-				UUID leaderId,
+				JobMasterId jobMasterId,
 				String taskManagerRpcAddress,
-				TaskManagerLocation taskManagerLocation)
-		{
-			super(log, rpcService, targetName, targetType, targetAddress, leaderId);
+				TaskManagerLocation taskManagerLocation) {
+			super(log, rpcService, targetName, targetType, targetAddress, jobMasterId);
 
 			this.taskManagerRpcAddress = taskManagerRpcAddress;
 			this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
@@ -377,15 +380,15 @@ public class JobLeaderService {
 
 		@Override
 		protected CompletableFuture<RegistrationResponse> invokeRegistration(
-				JobMasterGateway gateway, UUID leaderId, long timeoutMillis) throws Exception
-		{
-			return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation,
-					leaderId, Time.milliseconds(timeoutMillis));
+				JobMasterGateway gateway,
+				JobMasterId jobMasterId,
+				long timeoutMillis) throws Exception {
+			return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation, Time.milliseconds(timeoutMillis));
 		}
 	}
 
 	/**
-	 * Internal state of the service
+	 * Internal state of the service.
 	 */
 	private enum State {
 		CREATED, STARTED, STOPPED

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
index 363c107..2c05388 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -25,12 +25,11 @@ import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.util.Preconditions;
 
-import java.util.UUID;
-
 /**
  * Container class for JobManager specific communication utils used by the {@link TaskExecutor}.
  */
@@ -42,9 +41,6 @@ public class JobManagerConnection {
 	// The unique id used for identifying the job manager
 	private final ResourceID resourceID;
 
-	// Job master leader session id
-	private final UUID leaderId;
-
 	// Gateway to the job master
 	private final JobMasterGateway jobMasterGateway;
 
@@ -70,7 +66,6 @@ public class JobManagerConnection {
 				JobID jobID,
 				ResourceID resourceID,
 				JobMasterGateway jobMasterGateway,
-				UUID leaderId,
 				TaskManagerActions taskManagerActions,
 				CheckpointResponder checkpointResponder,
 				BlobCache blobCache, LibraryCacheManager libraryCacheManager,
@@ -78,7 +73,6 @@ public class JobManagerConnection {
 				PartitionProducerStateChecker partitionStateChecker) {
 		this.jobID = Preconditions.checkNotNull(jobID);
 		this.resourceID = Preconditions.checkNotNull(resourceID);
-		this.leaderId = Preconditions.checkNotNull(leaderId);
 		this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
 		this.taskManagerActions = Preconditions.checkNotNull(taskManagerActions);
 		this.checkpointResponder = Preconditions.checkNotNull(checkpointResponder);
@@ -96,8 +90,8 @@ public class JobManagerConnection {
 		return resourceID;
 	}
 
-	public UUID getLeaderId() {
-		return leaderId;
+	public JobMasterId getJobMasterId() {
+		return jobMasterGateway.getFencingToken();
 	}
 
 	public JobMasterGateway getJobManagerGateway() {


Mime
View raw message