flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-7504] Fence the ResourceManager
Date Mon, 04 Sep 2017 11:44:26 GMT
Repository: flink
Updated Branches:
  refs/heads/master 84c2a928c -> e70de0eb8


http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 139bfc4..1de3284 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -27,15 +27,20 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
@@ -43,9 +48,11 @@ import org.junit.Test;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.*;
 
 public class ResourceManagerJobMasterTest extends TestLogger {
@@ -71,18 +78,15 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 	public void testRegisterJobMaster() throws Exception {
 		String jobMasterAddress = "/jobMasterAddress1";
 		JobID jobID = mockJobMaster(jobMasterAddress);
-		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
 		UUID jmLeaderID = UUID.randomUUID();
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(mock(LeaderElectionService.class), jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 
 		// test response successful
 		CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerJobManager(
-			rmLeaderSessionId,
 			jmLeaderID,
 			jmResourceId,
 			jobMasterAddress,
@@ -103,25 +107,28 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 	public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception {
 		String jobMasterAddress = "/jobMasterAddress1";
 		JobID jobID = mockJobMaster(jobMasterAddress);
-		TestingLeaderElectionService resourceManagerLeaderElectionService = new TestingLeaderElectionService();
 		UUID jmLeaderID = UUID.randomUUID();
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 		TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
-		final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+		final ResourceManager<?> resourceManager = createAndStartResourceManager(mock(LeaderElectionService.class), jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
+		final ResourceManagerGateway wronglyFencedGateway = rpcService.connect(resourceManager.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class)
+			.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 
 		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
-		UUID differentLeaderSessionID = UUID.randomUUID();
-		CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = rmGateway.registerJobManager(
-			differentLeaderSessionID,
+		CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = wronglyFencedGateway.registerJobManager(
 			jmLeaderID,
 			jmResourceId,
 			jobMasterAddress,
 			jobID,
 			timeout);
-		assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
+
+		try {
+			unMatchedLeaderFuture.get(5L, TimeUnit.SECONDS);
+			fail("Should fail because we are using the wrong fencing token.");
+		} catch (ExecutionException e) {
+			assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
+		}
 
 		if (testingFatalErrorHandler.hasExceptionOccurred()) {
 			testingFatalErrorHandler.rethrowError();
@@ -142,14 +149,11 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
-		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		// test throw exception when receive a registration from job master which takes unmatched leaderSessionId
 		UUID differentLeaderSessionID = UUID.randomUUID();
 		CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = rmGateway.registerJobManager(
-			rmLeaderSessionId,
 			differentLeaderSessionID,
 			jmResourceId,
 			jobMasterAddress,
@@ -176,15 +180,12 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
 		final ResourceManager<?> resourceManager = createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler);
 		final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
-		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		// test throw exception when receive a registration from job master which takes invalid address
 		String invalidAddress = "/jobMasterAddress2";
 		CompletableFuture<RegistrationResponse> invalidAddressFuture = rmGateway.registerJobManager(
-			rmLeaderSessionId,
-			jmLeaderSessionId,
+			HighAvailabilityServices.DEFAULT_LEADER_ID,
 			jmResourceId,
 			invalidAddress,
 			jobID,
@@ -214,24 +215,22 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 			jobMasterLeaderRetrievalService,
 			testingFatalErrorHandler);
 		final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-		final UUID rmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
-		final UUID jmLeaderSessionId = grantResourceManagerLeadership(resourceManagerLeaderElectionService);
 		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
 
 		JobID unknownJobIDToHAServices = new JobID();
-		// verify return RegistrationResponse.Decline when failed to start a job master Leader retrieval listener
-		CompletableFuture<RegistrationResponse> declineFuture = rmGateway.registerJobManager(
-			rmLeaderSessionId,
-			jmLeaderSessionId,
+
+		// this should fail because we try to register a job leader listener for an unknown job id
+		CompletableFuture<RegistrationResponse> registrationFuture = rmGateway.registerJobManager(
+			HighAvailabilityServices.DEFAULT_LEADER_ID,
 			jmResourceId,
 			jobMasterAddress,
 			unknownJobIDToHAServices,
 			timeout);
-		RegistrationResponse response = declineFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-		assertTrue(response instanceof RegistrationResponse.Decline);
 
-		if (testingFatalErrorHandler.hasExceptionOccurred()) {
-			testingFatalErrorHandler.rethrowError();
+		try {
+			registrationFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		} catch (ExecutionException e) {
+			assertTrue(ExceptionUtils.stripExecutionException(e) instanceof ResourceManagerException);
 		}
 	}
 
@@ -243,9 +242,9 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 	}
 
 	private ResourceManager createAndStartResourceManager(
-			TestingLeaderElectionService resourceManagerLeaderElectionService,
+			LeaderElectionService resourceManagerLeaderElectionService,
 			JobID jobID,
-			TestingLeaderRetrievalService jobMasterLeaderRetrievalService,
+			LeaderRetrievalService jobMasterLeaderRetrievalService,
 			FatalErrorHandler fatalErrorHandler) throws Exception {
 		ResourceID rmResourceId = ResourceID.generate();
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
@@ -283,11 +282,4 @@ public class ResourceManagerJobMasterTest extends TestLogger {
 		resourceManager.start();
 		return resourceManager;
 	}
-
-	private UUID grantResourceManagerLeadership(TestingLeaderElectionService resourceManagerLeaderElectionService) {
-		UUID leaderSessionId = UUID.randomUUID();
-		resourceManagerLeaderElectionService.isLeader(leaderSessionId);
-		return leaderSessionId;
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 616ed5c..8add168 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -23,17 +23,20 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.exceptions.FencingTokenMismatchException;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.Before;
@@ -41,10 +44,12 @@ import org.junit.Test;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 
 public class ResourceManagerTaskExecutorTest extends TestLogger {
@@ -65,7 +70,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 	private ResourceManagerGateway rmGateway;
 
-	private UUID leaderSessionId;
+	private ResourceManagerGateway wronglyFencedGateway;
 
 	private TestingFatalErrorHandler testingFatalErrorHandler;
 
@@ -75,11 +80,14 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 
 		taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
 		resourceManagerResourceID = ResourceID.generate();
-		TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
 		testingFatalErrorHandler = new TestingFatalErrorHandler();
+		TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService();
 		resourceManager = createAndStartResourceManager(rmLeaderElectionService, testingFatalErrorHandler);
 		rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class);
-		leaderSessionId = grantLeadership(rmLeaderElectionService);
+		wronglyFencedGateway = rpcService.connect(resourceManager.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class)
+			.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+		grantLeadership(rmLeaderElectionService).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 	}
 
 	@After
@@ -95,13 +103,13 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		try {
 			// test response successful
 			CompletableFuture<RegistrationResponse> successfulFuture =
-				rmGateway.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
 			RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 			assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 
 			// test response successful with instanceID not equal to previous when receive duplicate registration from taskExecutor
 			CompletableFuture<RegistrationResponse> duplicateFuture =
-				rmGateway.registerTaskExecutor(leaderSessionId, taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
+				rmGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
 			RegistrationResponse duplicateResponse = duplicateFuture.get();
 			assertTrue(duplicateResponse instanceof TaskExecutorRegistrationSuccess);
 			assertNotEquals(((TaskExecutorRegistrationSuccess) response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) duplicateResponse).getRegistrationId());
@@ -119,10 +127,15 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 	public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() throws Exception {
 		try {
 			// test throw exception when receive a registration from taskExecutor which takes unmatched leaderSessionId
-			UUID differentLeaderSessionID = UUID.randomUUID();
 			CompletableFuture<RegistrationResponse> unMatchedLeaderFuture =
-				rmGateway.registerTaskExecutor(differentLeaderSessionID, taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
-			assertTrue(unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline);
+				wronglyFencedGateway.registerTaskExecutor(taskExecutorAddress, taskExecutorResourceID, slotReport, timeout);
+
+			try {
+				unMatchedLeaderFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+				fail("Should have failed because we are using a wrongly fenced ResourceManagerGateway.");
+			} catch (ExecutionException e) {
+				assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenMismatchException);
+			}
 		} finally {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {
 				testingFatalErrorHandler.rethrowError();
@@ -139,7 +152,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 			// test throw exception when receive a registration from taskExecutor which takes invalid address
 			String invalidAddress = "/taskExecutor2";
 			CompletableFuture<RegistrationResponse> invalidAddressFuture =
-				rmGateway.registerTaskExecutor(leaderSessionId, invalidAddress, taskExecutorResourceID, slotReport, timeout);
+				rmGateway.registerTaskExecutor(invalidAddress, taskExecutorResourceID, slotReport, timeout);
 			assertTrue(invalidAddressFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS) instanceof RegistrationResponse.Decline);
 		} finally {
 			if (testingFatalErrorHandler.hasExceptionOccurred()) {
@@ -155,7 +168,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 		return taskExecutorResourceID;
 	}
 
-	private StandaloneResourceManager createAndStartResourceManager(TestingLeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
+	private StandaloneResourceManager createAndStartResourceManager(LeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception {
 		TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices();
 		HeartbeatServices heartbeatServices = new HeartbeatServices(5L, 5L);
 		highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
@@ -187,14 +200,15 @@ public class ResourceManagerTaskExecutorTest extends TestLogger {
 				metricRegistry,
 				jobLeaderIdService,
 				fatalErrorHandler);
+
 		resourceManager.start();
+
 		return resourceManager;
 	}
 
-	private UUID grantLeadership(TestingLeaderElectionService leaderElectionService) {
+	private CompletableFuture<UUID> grantLeadership(TestingLeaderElectionService leaderElectionService) {
 		UUID leaderSessionId = UUID.randomUUID();
-		leaderElectionService.isLeader(leaderSessionId);
-		return leaderSessionId;
+		return leaderElectionService.isLeader(leaderSessionId);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 93e96a7..80b445f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
@@ -43,7 +44,6 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -74,7 +74,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testTaskManagerRegistration() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
@@ -88,7 +88,7 @@ public class SlotManagerTest extends TestLogger {
 		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
 		final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
 			assertTrue("The number registered slots does not equal the expected number.",2 == slotManager.getNumberRegisteredSlots());
@@ -103,7 +103,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testTaskManagerUnregistration() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 		final JobID jobId = new JobID();
 
@@ -113,7 +113,7 @@ public class SlotManagerTest extends TestLogger {
 			any(JobID.class),
 			any(AllocationID.class),
 			anyString(),
-			eq(leaderId),
+			eq(resourceManagerId),
 			any(Time.class))).thenReturn(new CompletableFuture<>());
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -134,7 +134,7 @@ public class SlotManagerTest extends TestLogger {
 			resourceProfile,
 			"foobar");
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
 			assertTrue("The number registered slots does not equal the expected number.",2 == slotManager.getNumberRegisteredSlots());
@@ -166,7 +166,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testSlotRequestWithoutFreeSlots() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
 		final SlotRequest slotRequest = new SlotRequest(
 			new JobID(),
@@ -176,7 +176,7 @@ public class SlotManagerTest extends TestLogger {
 
 		ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 
 			slotManager.registerSlotRequest(slotRequest);
 
@@ -189,7 +189,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testSlotRequestWithResourceAllocationFailure() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceProfile resourceProfile = new ResourceProfile(42.0, 1337);
 		final SlotRequest slotRequest = new SlotRequest(
 			new JobID(),
@@ -200,7 +200,7 @@ public class SlotManagerTest extends TestLogger {
 		ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 		doThrow(new ResourceManagerException("Test exception")).when(resourceManagerActions).allocateResource(any(ResourceProfile.class));
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 
 			slotManager.registerSlotRequest(slotRequest);
 
@@ -216,7 +216,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testSlotRequestWithFreeSlot() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceID resourceID = ResourceID.generate();
 		final JobID jobId = new JobID();
 		final SlotID slotId = new SlotID(resourceID, 0);
@@ -231,7 +231,7 @@ public class SlotManagerTest extends TestLogger {
 
 		ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 
 			// accept an incoming slot request
 			final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
@@ -240,7 +240,7 @@ public class SlotManagerTest extends TestLogger {
 				eq(jobId),
 				eq(allocationId),
 				anyString(),
-				eq(leaderId),
+				eq(resourceManagerId),
 				any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 			final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -254,7 +254,7 @@ public class SlotManagerTest extends TestLogger {
 
 			assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest));
 
-			verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(leaderId), any(Time.class));
+			verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class));
 
 			TaskManagerSlot slot = slotManager.getSlot(slotId);
 
@@ -268,7 +268,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testUnregisterPendingSlotRequest() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 		final SlotID slotId = new SlotID(ResourceID.generate(), 0);
 		final AllocationID allocationId = new AllocationID();
@@ -279,7 +279,7 @@ public class SlotManagerTest extends TestLogger {
 			any(JobID.class),
 			any(AllocationID.class),
 			anyString(),
-			eq(leaderId),
+			eq(resourceManagerId),
 			any(Time.class))).thenReturn(new CompletableFuture<>());
 
 		final ResourceProfile resourceProfile = new ResourceProfile(1.0, 1);
@@ -290,7 +290,7 @@ public class SlotManagerTest extends TestLogger {
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
 			TaskManagerSlot slot = slotManager.getSlot(slotId);
@@ -315,7 +315,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testFulfillingPendingSlotRequest() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceID resourceID = ResourceID.generate();
 		final JobID jobId = new JobID();
 		final SlotID slotId = new SlotID(resourceID, 0);
@@ -337,7 +337,7 @@ public class SlotManagerTest extends TestLogger {
 			eq(jobId),
 			eq(allocationId),
 			anyString(),
-			eq(leaderId),
+			eq(resourceManagerId),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskExecutorConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -345,7 +345,7 @@ public class SlotManagerTest extends TestLogger {
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile);
 		final SlotReport slotReport = new SlotReport(slotStatus);
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 
 			assertTrue("The slot request should be accepted", slotManager.registerSlotRequest(slotRequest));
 
@@ -355,7 +355,7 @@ public class SlotManagerTest extends TestLogger {
 				taskExecutorConnection,
 				slotReport);
 
-			verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(leaderId), any(Time.class));
+			verify(taskExecutorGateway).requestSlot(eq(slotId), eq(jobId), eq(allocationId), eq(targetAddress), eq(resourceManagerId), any(Time.class));
 
 			TaskManagerSlot slot = slotManager.getSlot(slotId);
 
@@ -368,7 +368,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testFreeSlot() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceID resourceID = ResourceID.generate();
 		final JobID jobId = new JobID();
 		final SlotID slotId = new SlotID(resourceID, 0);
@@ -385,7 +385,7 @@ public class SlotManagerTest extends TestLogger {
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile, jobId, allocationId);
 		final SlotReport slotReport = new SlotReport(slotStatus);
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 
 			slotManager.registerTaskManager(
 				taskExecutorConnection,
@@ -414,8 +414,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testDuplicatePendingSlotRequest() throws Exception {
-
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
@@ -423,7 +422,7 @@ public class SlotManagerTest extends TestLogger {
 		final SlotRequest slotRequest1 = new SlotRequest(new JobID(), allocationId, resourceProfile1, "foobar");
 		final SlotRequest slotRequest2 = new SlotRequest(new JobID(), allocationId, resourceProfile2, "barfoo");
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			assertTrue(slotManager.registerSlotRequest(slotRequest1));
 			assertFalse(slotManager.registerSlotRequest(slotRequest2));
 		}
@@ -439,7 +438,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testDuplicatePendingSlotRequestAfterSlotReport() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
@@ -454,7 +453,7 @@ public class SlotManagerTest extends TestLogger {
 
 		final SlotRequest slotRequest = new SlotRequest(jobId, allocationId, resourceProfile, "foobar");
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
 			assertFalse(slotManager.registerSlotRequest(slotRequest));
@@ -467,7 +466,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testDuplicatePendingSlotRequestAfterSuccessfulAllocation() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
@@ -481,7 +480,7 @@ public class SlotManagerTest extends TestLogger {
 			any(JobID.class),
 			any(AllocationID.class),
 			anyString(),
-			eq(leaderId),
+			eq(resourceManagerId),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -490,7 +489,7 @@ public class SlotManagerTest extends TestLogger {
 		final SlotStatus slotStatus = new SlotStatus(slotId, resourceProfile1);
 		final SlotReport slotReport = new SlotReport(slotStatus);
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 			assertTrue(slotManager.registerSlotRequest(slotRequest1));
 
@@ -512,7 +511,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testAcceptingDuplicateSlotRequestAfterAllocationRelease() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 		final AllocationID allocationId = new AllocationID();
 		final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 2);
@@ -526,7 +525,7 @@ public class SlotManagerTest extends TestLogger {
 			any(JobID.class),
 			any(AllocationID.class),
 			anyString(),
-			eq(leaderId),
+			eq(resourceManagerId),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -535,7 +534,7 @@ public class SlotManagerTest extends TestLogger {
 		final SlotStatus slotStatus = new SlotStatus(slotId, new ResourceProfile(2.0, 2));
 		final SlotReport slotReport = new SlotReport(slotStatus);
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 			assertTrue(slotManager.registerSlotRequest(slotRequest1));
 
@@ -565,7 +564,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testReceivingUnknownSlotReport() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
 		final InstanceID unknownInstanceID = new InstanceID();
@@ -574,7 +573,7 @@ public class SlotManagerTest extends TestLogger {
 		final SlotStatus unknownSlotStatus = new SlotStatus(unknownSlotId, unknownResourceProfile);
 		final SlotReport unknownSlotReport = new SlotReport(unknownSlotStatus);
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			// check that we don't have any slots registered
 			assertTrue(0 == slotManager.getNumberRegisteredSlots());
 
@@ -591,7 +590,7 @@ public class SlotManagerTest extends TestLogger {
 	 */
 	@Test
 	public void testUpdateSlotReport() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
 		final JobID jobId = new JobID();
@@ -614,7 +613,7 @@ public class SlotManagerTest extends TestLogger {
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 			// check that we don't have any slots registered
 			assertTrue(0 == slotManager.getNumberRegisteredSlots());
 
@@ -651,7 +650,7 @@ public class SlotManagerTest extends TestLogger {
 		final long tmTimeout = 500L;
 
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 
 		final TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -669,7 +668,7 @@ public class SlotManagerTest extends TestLogger {
 			TestingUtils.infiniteTime(),
 			Time.milliseconds(tmTimeout))) {
 
-			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
+			slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
 
 			mainThreadExecutor.execute(new Runnable() {
 				@Override
@@ -693,7 +692,7 @@ public class SlotManagerTest extends TestLogger {
 		final long allocationTimeout = 50L;
 
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
 
@@ -708,7 +707,7 @@ public class SlotManagerTest extends TestLogger {
 			Time.milliseconds(allocationTimeout),
 			TestingUtils.infiniteTime())) {
 
-			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
+			slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
 
 			final AtomicReference<Exception> atomicException = new AtomicReference<>(null);
 
@@ -740,7 +739,7 @@ public class SlotManagerTest extends TestLogger {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testTaskManagerSlotRequestTimeoutHandling() throws Exception {
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
 		final JobID jobId = new JobID();
@@ -756,7 +755,7 @@ public class SlotManagerTest extends TestLogger {
 			any(JobID.class),
 			eq(allocationId),
 			anyString(),
-			any(UUID.class),
+			any(ResourceManagerId.class),
 			any(Time.class))).thenReturn(slotRequestFuture1, slotRequestFuture2);
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -768,7 +767,7 @@ public class SlotManagerTest extends TestLogger {
 		final SlotStatus slotStatus2 = new SlotStatus(slotId2, resourceProfile);
 		final SlotReport slotReport = new SlotReport(Arrays.asList(slotStatus1, slotStatus2));
 
-		try (SlotManager slotManager = createSlotManager(leaderId, resourceManagerActions)) {
+		try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) {
 
 			slotManager.registerTaskManager(taskManagerConnection, slotReport);
 
@@ -781,7 +780,7 @@ public class SlotManagerTest extends TestLogger {
 				eq(jobId),
 				eq(allocationId),
 				anyString(),
-				eq(leaderId),
+				eq(resourceManagerId),
 				any(Time.class));
 
 			TaskManagerSlot failedSlot = slotManager.getSlot(slotIdCaptor.getValue());
@@ -794,7 +793,7 @@ public class SlotManagerTest extends TestLogger {
 				eq(jobId),
 				eq(allocationId),
 				anyString(),
-				eq(leaderId),
+				eq(resourceManagerId),
 				any(Time.class));
 
 			// the second attempt succeeds
@@ -819,7 +818,7 @@ public class SlotManagerTest extends TestLogger {
 	@SuppressWarnings("unchecked")
 	public void testSlotReportWhileActiveSlotRequest() throws Exception {
 		final long verifyTimeout = 1000L;
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 
 		final JobID jobId = new JobID();
@@ -834,7 +833,7 @@ public class SlotManagerTest extends TestLogger {
 			any(JobID.class),
 			eq(allocationId),
 			anyString(),
-			any(UUID.class),
+			any(ResourceManagerId.class),
 			any(Time.class))).thenReturn(slotRequestFuture1, CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -854,7 +853,7 @@ public class SlotManagerTest extends TestLogger {
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime())) {
 
-			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
+			slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
 
 			CompletableFuture<Void> registrationFuture = CompletableFuture.supplyAsync(
 				() -> {
@@ -882,7 +881,7 @@ public class SlotManagerTest extends TestLogger {
 				eq(jobId),
 				eq(allocationId),
 				anyString(),
-				eq(leaderId),
+				eq(resourceManagerId),
 				any(Time.class));
 
 			final SlotID requestedSlotId = slotIdCaptor.getValue();
@@ -908,7 +907,7 @@ public class SlotManagerTest extends TestLogger {
 				eq(jobId),
 				eq(allocationId),
 				anyString(),
-				eq(leaderId),
+				eq(resourceManagerId),
 				any(Time.class));
 
 			final SlotID requestedSlotId2 = slotIdCaptor.getValue();
@@ -935,7 +934,7 @@ public class SlotManagerTest extends TestLogger {
 		final long taskManagerTimeout = 50L;
 		final long verifyTimeout = taskManagerTimeout * 10L;
 
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final ResourceManagerActions resourceManagerActions = mock(ResourceManagerActions.class);
 		final ScheduledExecutor scheduledExecutor = TestingUtils.defaultScheduledExecutor();
 
@@ -952,7 +951,7 @@ public class SlotManagerTest extends TestLogger {
 			eq(jobId),
 			eq(allocationId),
 			anyString(),
-			eq(leaderId),
+			eq(resourceManagerId),
 			any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 		final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway);
@@ -971,7 +970,7 @@ public class SlotManagerTest extends TestLogger {
 			TestingUtils.infiniteTime(),
 			Time.of(taskManagerTimeout, TimeUnit.MILLISECONDS))) {
 
-			slotManager.start(leaderId, mainThreadExecutor, resourceManagerActions);
+			slotManager.start(resourceManagerId, mainThreadExecutor, resourceManagerActions);
 
 			CompletableFuture.supplyAsync(
 				() -> {
@@ -991,7 +990,7 @@ public class SlotManagerTest extends TestLogger {
 				eq(jobId),
 				eq(allocationId),
 				anyString(),
-				eq(leaderId),
+				eq(resourceManagerId),
 				any(Time.class));
 
 			CompletableFuture<Boolean> idleFuture = CompletableFuture.supplyAsync(
@@ -1023,14 +1022,14 @@ public class SlotManagerTest extends TestLogger {
 		}
 	}
 
-	private SlotManager createSlotManager(UUID leaderId, ResourceManagerActions resourceManagerActions) {
+	private SlotManager createSlotManager(ResourceManagerId resourceManagerId, ResourceManagerActions resourceManagerActions) {
 		SlotManager slotManager = new SlotManager(
 			TestingUtils.defaultScheduledExecutor(),
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime(),
 			TestingUtils.infiniteTime());
 
-		slotManager.start(leaderId, Executors.directExecutor(), resourceManagerActions);
+		slotManager.start(resourceManagerId, Executors.directExecutor(), resourceManagerActions);
 
 		return slotManager;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 844e159..6de4d52 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
@@ -39,7 +40,6 @@ import org.junit.Test;
 import org.mockito.Mockito;
 
 import java.util.Collections;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -77,7 +77,7 @@ public class SlotProtocolTest extends TestLogger {
 	public void testSlotsUnavailableRequest() throws Exception {
 		final JobID jobID = new JobID();
 
-		final UUID rmLeaderID = UUID.randomUUID();
+		final ResourceManagerId rmLeaderID = ResourceManagerId.generate();
 
 		try (SlotManager slotManager = new SlotManager(
 			scheduledExecutor,
@@ -103,7 +103,7 @@ public class SlotProtocolTest extends TestLogger {
 			TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 			Mockito.when(
 				taskExecutorGateway
-					.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
+					.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class)))
 				.thenReturn(mock(CompletableFuture.class));
 
 			final ResourceID resourceID = ResourceID.generate();
@@ -118,7 +118,7 @@ public class SlotProtocolTest extends TestLogger {
 
 			// 4) Slot becomes available and TaskExecutor gets a SlotRequest
 			verify(taskExecutorGateway, timeout(5000L))
-				.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class));
+				.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class));
 		}
 	}
 
@@ -133,12 +133,12 @@ public class SlotProtocolTest extends TestLogger {
 	public void testSlotAvailableRequest() throws Exception {
 		final JobID jobID = new JobID();
 
-		final UUID rmLeaderID = UUID.randomUUID();
+		final ResourceManagerId rmLeaderID = ResourceManagerId.generate();
 
 		TaskExecutorGateway taskExecutorGateway = mock(TaskExecutorGateway.class);
 		Mockito.when(
 			taskExecutorGateway
-				.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), any(Time.class)))
+				.requestSlot(any(SlotID.class), any(JobID.class), any(AllocationID.class), any(String.class), any(ResourceManagerId.class), any(Time.class)))
 			.thenReturn(mock(CompletableFuture.class));
 
 		try (SlotManager slotManager = new SlotManager(
@@ -171,7 +171,7 @@ public class SlotProtocolTest extends TestLogger {
 
 			// a SlotRequest is routed to the TaskExecutor
 			verify(taskExecutorGateway, timeout(5000))
-				.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(UUID.class), any(Time.class));
+				.requestSlot(eq(slotID), eq(jobID), eq(allocationID), any(String.class), any(ResourceManagerId.class), any(Time.class));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 90f731d..945cbf3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -193,7 +193,6 @@ public class TaskExecutorITCase extends TestLogger {
 			rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId);
 
 			CompletableFuture<RegistrationResponse> registrationResponseFuture = rmGateway.registerJobManager(
-				rmLeaderId,
 				jmLeaderId,
 				jmResourceId,
 				jmAddress,
@@ -204,7 +203,7 @@ public class TaskExecutorITCase extends TestLogger {
 
 			assertTrue(registrationResponse instanceof JobMasterRegistrationSuccess);
 
-			CompletableFuture<Acknowledge> slotAck = rmGateway.requestSlot(jmLeaderId, rmLeaderId, slotRequest, timeout);
+			CompletableFuture<Acknowledge> slotAck = rmGateway.requestSlot(jmLeaderId, slotRequest, timeout);
 
 			slotAck.get();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 6ab52ed..2112c1b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -65,6 +65,7 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
@@ -252,7 +253,7 @@ public class TaskExecutorTest extends TestLogger {
 		// register the mock resource manager gateway
 		ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
 		when(rmGateway.registerTaskExecutor(
-			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+			anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
 			.thenReturn(
 				CompletableFuture.completedFuture(
 					new TaskExecutorRegistrationSuccess(
@@ -327,7 +328,7 @@ public class TaskExecutorTest extends TestLogger {
 
 			// register resource manager success will trigger monitoring heartbeat target between tm and rm
 			verify(rmGateway, Mockito.timeout(timeout.toMilliseconds()).atLeast(1)).registerTaskExecutor(
-					eq(rmLeaderId), eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), any(Time.class));
+					eq(taskManager.getAddress()), eq(tmResourceId), any(SlotReport.class), any(Time.class));
 
 			// heartbeat timeout should trigger disconnect TaskManager from ResourceManager
 			verify(rmGateway, timeout(heartbeatTimeout * 50L)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class));
@@ -356,7 +357,7 @@ public class TaskExecutorTest extends TestLogger {
 		// register the mock resource manager gateway
 		ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
 		when(rmGateway.registerTaskExecutor(
-			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+			anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
 			.thenReturn(
 				CompletableFuture.completedFuture(
 					new TaskExecutorRegistrationSuccess(
@@ -448,7 +449,7 @@ public class TaskExecutorTest extends TestLogger {
 
 			// register resource manager success will trigger monitoring heartbeat target between tm and rm
 			verify(rmGateway, timeout(verificationTimeout).atLeast(1)).registerTaskExecutor(
-				eq(rmLeaderId), eq(taskManager.getAddress()), eq(tmResourceId), eq(slotReport1), any(Time.class));
+				eq(taskManager.getAddress()), eq(tmResourceId), eq(slotReport1), any(Time.class));
 
 			verify(heartbeatManager, timeout(verificationTimeout)).monitorTarget(any(ResourceID.class), any(HeartbeatTarget.class));
 
@@ -489,7 +490,7 @@ public class TaskExecutorTest extends TestLogger {
 		// register a mock resource manager gateway
 		ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
 		when(rmGateway.registerTaskExecutor(
-			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+					anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
 				new InstanceID(), resourceManagerResourceId, 10L)));
 
@@ -535,7 +536,7 @@ public class TaskExecutorTest extends TestLogger {
 			String taskManagerAddress = taskManager.getAddress();
 
 			verify(rmGateway, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-					any(UUID.class), eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
+					eq(taskManagerAddress), eq(resourceID), eq(slotReport), any(Time.class));
 
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
@@ -562,11 +563,11 @@ public class TaskExecutorTest extends TestLogger {
 		ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
 
 		when(rmGateway1.registerTaskExecutor(
-			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+					anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture(
 				new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L)));
 		when(rmGateway2.registerTaskExecutor(
-			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+					anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
 			.thenReturn(CompletableFuture.completedFuture(
 				new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L)));
 
@@ -624,7 +625,7 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(address1, leaderId1);
 
 			verify(rmGateway1, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-					eq(leaderId1), eq(taskManagerAddress), eq(tmResourceID), any(SlotReport.class), any(Time.class));
+					eq(taskManagerAddress), eq(tmResourceID), any(SlotReport.class), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 
 			// cancel the leader 
@@ -634,7 +635,7 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(address2, leaderId2);
 
 			verify(rmGateway2, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-					eq(leaderId2), eq(taskManagerAddress), eq(tmResourceID), eq(slotReport), any(Time.class));
+					eq(taskManagerAddress), eq(tmResourceID), eq(slotReport), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 
 			// check if a concurrent error occurred
@@ -810,14 +811,13 @@ public class TaskExecutorTest extends TestLogger {
 		haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService);
 
 		final String resourceManagerAddress = "rm";
-		final UUID resourceManagerLeaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerLeaderId = ResourceManagerId.generate();
 		final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress);
 
 		final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class);
 		final InstanceID registrationId = new InstanceID();
 
 		when(resourceManagerGateway.registerTaskExecutor(
-			eq(resourceManagerLeaderId),
 			any(String.class),
 			eq(resourceId),
 			any(SlotReport.class),
@@ -874,7 +874,7 @@ public class TaskExecutorTest extends TestLogger {
 			final TaskExecutorGateway tmGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);
 
 			// tell the task manager about the rm leader
-			resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerLeaderId);
+			resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerLeaderId.toUUID());
 
 			// request slots from the task manager under the given allocation id
 			CompletableFuture<Acknowledge> slotRequestAck = tmGateway.requestSlot(
@@ -940,7 +940,6 @@ public class TaskExecutorTest extends TestLogger {
 		final InstanceID registrationId = new InstanceID();
 
 		when(resourceManagerGateway.registerTaskExecutor(
-			eq(resourceManagerLeaderId),
 			any(String.class),
 			eq(resourceId),
 			any(SlotReport.class),
@@ -1000,7 +999,6 @@ public class TaskExecutorTest extends TestLogger {
 			jobLeaderService.addJob(jobId, jobManagerAddress);
 
 			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).notifySlotAvailable(
-				eq(resourceManagerLeaderId),
 				eq(registrationId),
 				eq(new SlotID(resourceId, 1)),
 				eq(allocationId2));
@@ -1029,7 +1027,7 @@ public class TaskExecutorTest extends TestLogger {
 		final ResourceID resourceID = ResourceID.generate();
 
 		final String address1 = "/resource/manager/address/one";
-		final UUID leaderId = UUID.randomUUID();
+		final ResourceManagerId resourceManagerId = ResourceManagerId.generate();
 		final JobID jobId = new JobID();
 		final String jobManagerAddress = "foobar";
 
@@ -1055,7 +1053,7 @@ public class TaskExecutorTest extends TestLogger {
 		when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(new SlotReport());
 		when(taskSlotTable.getCurrentAllocation(1)).thenReturn(new AllocationID());
 
-		when(rmGateway1.registerTaskExecutor(any(UUID.class), anyString(), eq(resourceID), any(SlotReport.class), any(Time.class))).thenReturn(
+			when(rmGateway1.registerTaskExecutor(anyString(), eq(resourceID), any(SlotReport.class), any(Time.class))).thenReturn(
 			CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 1000L)));
 
 		TaskExecutor taskManager = new TaskExecutor(
@@ -1087,15 +1085,15 @@ public class TaskExecutorTest extends TestLogger {
 			assertNull(taskManager.getResourceManagerConnection());
 
 			// define a leader and see that a registration happens
-			testLeaderService.notifyListener(address1, leaderId);
+			testLeaderService.notifyListener(address1, resourceManagerId.toUUID());
 
 			verify(rmGateway1, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-				eq(leaderId), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+				eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
 			assertNotNull(taskManager.getResourceManagerConnection());
 
 			// test that allocating a slot works
 			final SlotID slotID = new SlotID(resourceID, 0);
-			tmGateway.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, leaderId, timeout);
+			tmGateway.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, resourceManagerId, timeout);
 
 			// TODO: Figure out the concrete allocation behaviour between RM and TM. Maybe we don't need the SlotID...
 			// test that we can't allocate slots which are blacklisted due to pending confirmation of the RM
@@ -1106,7 +1104,7 @@ public class TaskExecutorTest extends TestLogger {
 				jobId,
 				new AllocationID(),
 				jobManagerAddress,
-				leaderId,
+				resourceManagerId,
 				timeout);
 
 			try {
@@ -1119,8 +1117,8 @@ public class TaskExecutorTest extends TestLogger {
 
 			// re-register
 			verify(rmGateway1, Mockito.timeout(timeout.toMilliseconds())).registerTaskExecutor(
-				eq(leaderId), eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
-			testLeaderService.notifyListener(address1, leaderId);
+				eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
+			testLeaderService.notifyListener(address1, resourceManagerId.toUUID());
 
 			// now we should be successful because the slots status has been synced
 			// test that we can't allocate slots which are blacklisted due to pending confirmation of the RM
@@ -1129,7 +1127,7 @@ public class TaskExecutorTest extends TestLogger {
 				jobId,
 				new AllocationID(),
 				jobManagerAddress,
-				leaderId,
+				resourceManagerId,
 				timeout);
 
 			// check if a concurrent error occurred
@@ -1176,7 +1174,6 @@ public class TaskExecutorTest extends TestLogger {
 		final InstanceID registrationId = new InstanceID();
 
 		when(resourceManagerGateway.registerTaskExecutor(
-			eq(resourceManagerLeaderId),
 			any(String.class),
 			eq(resourceId),
 			any(SlotReport.class),
@@ -1317,7 +1314,6 @@ public class TaskExecutorTest extends TestLogger {
 			offerResultFuture.complete(Collections.singleton(offer1));
 
 			verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).notifySlotAvailable(
-				eq(resourceManagerLeaderId),
 				eq(registrationId),
 				eq(new SlotID(resourceId, 1)),
 				any(AllocationID.class));


Mime
View raw message