flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-7526] [TaskExecutor] Filter out duplicate JobManager gained leadership messages
Date Mon, 04 Sep 2017 06:41:00 GMT
Repository: flink
Updated Branches:
  refs/heads/master a1578710a -> 64e8de97d


[FLINK-7526] [TaskExecutor] Filter out duplicate JobManager gained leadership messages

This commit filters out duplicate JobManager gained leadership messges coming from
the JobLeaderService. This avoid opening multiple connections to the JobManager
which consumes resources. Moreover, this commit properly closes all
JobManagerConnections in case of a shut down.

This closes #4595.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/64e8de97
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/64e8de97
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/64e8de97

Branch: refs/heads/master
Commit: 64e8de97d02a747da52a3e69472147b64fd2dd91
Parents: a157871
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Sat Aug 26 12:40:22 2017 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Mon Sep 4 08:38:15 2017 +0200

----------------------------------------------------------------------
 .../runtime/taskexecutor/TaskExecutor.java      |  29 +-
 .../runtime/taskexecutor/slot/TaskSlot.java     |   6 +
 .../taskexecutor/slot/TaskSlotTable.java        |  12 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 696 +++++++++++--------
 4 files changed, 429 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/64e8de97/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 21bdeec..ef47ad4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -85,6 +85,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -248,7 +249,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 	public void postStop() throws Exception {
 		log.info("Stopping TaskManager {}.", getAddress());
 
-		Exception exception = null;
+		Throwable throwable = null;
 
 		taskSlotTable.stop();
 
@@ -256,6 +257,14 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 			resourceManagerConnection.close();
 		}
 
+		for (JobManagerConnection jobManagerConnection : jobManagerConnections.values()) {
+			try {
+				disassociateFromJobManager(jobManagerConnection, new FlinkException("The TaskExecutor
is shutting down."));
+			} catch (Throwable t) {
+				throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
+			}
+		}
+
 		jobManagerHeartbeatManager.stop();
 
 		resourceManagerHeartbeatManager.stop();
@@ -270,12 +279,12 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 
 		try {
 			super.postStop();
-		} catch (Exception e) {
-			exception = ExceptionUtils.firstOrSuppressed(e, exception);
+		} catch (Throwable e) {
+			throwable = ExceptionUtils.firstOrSuppressed(e, throwable);
 		}
 
-		if (exception != null) {
-			ExceptionUtils.rethrowException(exception, "Error while shutting the TaskExecutor down.");
+		if (throwable != null) {
+			ExceptionUtils.rethrowException(throwable, "Error while shutting the TaskExecutor down.");
 		}
 
 		log.info("Stopped TaskManager {}.", getAddress());
@@ -841,15 +850,21 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway
{
 	}
 
 	private void establishJobManagerConnection(JobID jobId, final JobMasterGateway jobMasterGateway,
UUID jobManagerLeaderId, JMTMRegistrationSuccess registrationSuccess) {
-		log.info("Establish JobManager connection for job {}.", jobId);
 
 		if (jobManagerTable.contains(jobId)) {
 			JobManagerConnection oldJobManagerConnection = jobManagerTable.get(jobId);
-			if (!oldJobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) {
+
+			if (Objects.equals(oldJobManagerConnection.getLeaderId(), jobManagerLeaderId)) {
+				// we already are connected to the given job manager
+				log.debug("Ignore JobManager gained leadership message for {} because we are already
connected to it.", jobManagerLeaderId);
+				return;
+			} else {
 				closeJobManagerConnection(jobId, new Exception("Found new job leader for job id " + jobId
+ '.'));
 			}
 		}
 
+		log.info("Establish JobManager connection for job {}.", jobId);
+
 		ResourceID jobManagerResourceID = registrationSuccess.getResourceID();
 		JobManagerConnection newJobManagerConnection = associateWithJobManager(
 				jobId,

http://git-wip-us.apache.org/repos/asf/flink/blob/64e8de97/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
index e12c15b..6f5230c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -299,4 +299,10 @@ public class TaskSlot {
 
 		return new SlotOffer(allocationId, index, resourceProfile);
 	}
+
+	@Override
+	public String toString() {
+		return "TaskSlot(index:" + index + ", state:" + state + ", resource profile: " + resourceProfile
+
+			", allocationId: " + (allocationId != null ? allocationId.toString() : "none") + ", jobId:
" + (jobId != null ? jobId.toString() : "none") + ')';
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/64e8de97/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 3634df0..799f639 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -282,15 +282,15 @@ public class TaskSlotTable implements TimeoutListener<AllocationID>
{
 	public int freeSlot(AllocationID allocationId, Throwable cause) throws SlotNotFoundException
{
 		checkInit();
 
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Free slot {}.", allocationId, cause);
-		} else {
-			LOG.info("Free slot {}.", allocationId);
-		}
-
 		TaskSlot taskSlot = getTaskSlot(allocationId);
 
 		if (taskSlot != null) {
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Free slot {}.", taskSlot, cause);
+			} else {
+				LOG.info("Free slot {}.", taskSlot);
+			}
+
 			final JobID jobId = taskSlot.getJobId();
 
 			if (taskSlot.markFree()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/64e8de97/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 41e5629..6ab52ed 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -65,6 +65,7 @@ import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
@@ -72,12 +73,16 @@ import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
+
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
@@ -97,6 +102,7 @@ import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.Matchers.contains;
@@ -112,6 +118,21 @@ public class TaskExecutorTest extends TestLogger {
 
 	private final Time timeout = Time.milliseconds(10000L);
 
+	private TestingRpcService rpc;
+
+	@Before
+	public void setup() {
+		rpc = new TestingRpcService();
+	}
+
+	@After
+	public void teardown() {
+		if (rpc != null) {
+			rpc.stopService();
+			rpc = null;
+		}
+	}
+
 	@Rule
 	public TestName name = new TestName();
 
@@ -124,7 +145,6 @@ public class TaskExecutorTest extends TestLogger {
 		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId, InetAddress.getLoopbackAddress(),
1234);
 		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)),
mock(TimerService.class));
 
-		final TestingRpcService rpc = new TestingRpcService();
 		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
 		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
 		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(
@@ -175,25 +195,25 @@ public class TaskExecutorTest extends TestLogger {
 		when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
 		when(jobMasterGateway.getHostname()).thenReturn("localhost");
 
-		try {
-			final TaskExecutor taskManager = new TaskExecutor(
-				rpc,
-				tmConfig,
-				taskManagerLocation,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				mock(NetworkEnvironment.class),
-				haServices,
-				heartbeatServices,
-				mock(MetricRegistry.class),
-				mock(TaskManagerMetricGroup.class),
-				mock(BroadcastVariableManager.class),
-				mock(FileCache.class),
-				taskSlotTable,
-				new JobManagerTable(),
-				jobLeaderService,
-				testingFatalErrorHandler);
+		final TaskExecutor taskManager = new TaskExecutor(
+			rpc,
+			tmConfig,
+			taskManagerLocation,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			mock(NetworkEnvironment.class),
+			haServices,
+			heartbeatServices,
+			mock(MetricRegistry.class),
+			mock(TaskManagerMetricGroup.class),
+			mock(BroadcastVariableManager.class),
+			mock(FileCache.class),
+			taskSlotTable,
+			new JobManagerTable(),
+			jobLeaderService,
+			testingFatalErrorHandler);
 
+		try {
 			taskManager.start();
 
 			rpc.registerGateway(jobMasterAddress, jobMasterGateway);
@@ -216,7 +236,8 @@ public class TaskExecutorTest extends TestLogger {
 			testingFatalErrorHandler.rethrowError();
 
 		} finally {
-			rpc.stopService();
+			taskManager.shutDown();
+			taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		}
 	}
 
@@ -239,7 +260,6 @@ public class TaskExecutorTest extends TestLogger {
 						rmResourceId,
 						10L)));
 
-		final TestingRpcService rpc = new TestingRpcService();
 		rpc.registerGateway(rmAddress, rmGateway);
 
 		final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(
@@ -281,25 +301,25 @@ public class TaskExecutorTest extends TestLogger {
 				}
 		);
 
-		try {
-			final TaskExecutor taskManager = new TaskExecutor(
-				rpc,
-				taskManagerConfiguration,
-				taskManagerLocation,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				mock(NetworkEnvironment.class),
-				haServices,
-				heartbeatServices,
-				mock(MetricRegistry.class),
-				mock(TaskManagerMetricGroup.class),
-				mock(BroadcastVariableManager.class),
-				mock(FileCache.class),
-				taskSlotTable,
-				mock(JobManagerTable.class),
-				mock(JobLeaderService.class),
-				testingFatalErrorHandler);
+		final TaskExecutor taskManager = new TaskExecutor(
+			rpc,
+			taskManagerConfiguration,
+			taskManagerLocation,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			mock(NetworkEnvironment.class),
+			haServices,
+			heartbeatServices,
+			mock(MetricRegistry.class),
+			mock(TaskManagerMetricGroup.class),
+			mock(BroadcastVariableManager.class),
+			mock(FileCache.class),
+			taskSlotTable,
+			mock(JobManagerTable.class),
+			mock(JobLeaderService.class),
+			testingFatalErrorHandler);
 
+		try {
 			taskManager.start();
 
 			// define a leader and see that a registration happens
@@ -316,7 +336,8 @@ public class TaskExecutorTest extends TestLogger {
 			testingFatalErrorHandler.rethrowError();
 
 		} finally {
-			rpc.stopService();
+			taskManager.shutDown();
+			taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		}
 	}
 
@@ -325,7 +346,7 @@ public class TaskExecutorTest extends TestLogger {
 	 */
 	@Test
 	public void testHeartbeatSlotReporting() throws Exception {
-		final long timeout = 1000L;
+		final long verificationTimeout = 1000L;
 		final String rmAddress = "rm";
 		final String tmAddress = "tm";
 		final ResourceID rmResourceId = new ResourceID(rmAddress);
@@ -343,7 +364,6 @@ public class TaskExecutorTest extends TestLogger {
 						rmResourceId,
 						10L)));
 
-		final TestingRpcService rpc = new TestingRpcService();
 		rpc.registerGateway(rmAddress, rmGateway);
 
 		final TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(
@@ -399,25 +419,25 @@ public class TaskExecutorTest extends TestLogger {
 			}
 		);
 
-		try {
-			final TaskExecutor taskManager = new TaskExecutor(
-				rpc,
-				taskManagerConfiguration,
-				taskManagerLocation,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				mock(NetworkEnvironment.class),
-				haServices,
-				heartbeatServices,
-				mock(MetricRegistry.class),
-				mock(TaskManagerMetricGroup.class),
-				mock(BroadcastVariableManager.class),
-				mock(FileCache.class),
-				taskSlotTable,
-				mock(JobManagerTable.class),
-				mock(JobLeaderService.class),
-				testingFatalErrorHandler);
+		final TaskExecutor taskManager = new TaskExecutor(
+			rpc,
+			taskManagerConfiguration,
+			taskManagerLocation,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			mock(NetworkEnvironment.class),
+			haServices,
+			heartbeatServices,
+			mock(MetricRegistry.class),
+			mock(TaskManagerMetricGroup.class),
+			mock(BroadcastVariableManager.class),
+			mock(FileCache.class),
+			taskSlotTable,
+			mock(JobManagerTable.class),
+			mock(JobLeaderService.class),
+			testingFatalErrorHandler);
 
+		try {
 			taskManager.start();
 
 			// wait for spied heartbeat manager instance
@@ -427,10 +447,10 @@ public class TaskExecutorTest extends TestLogger {
 			testLeaderService.notifyListener(rmAddress, rmLeaderId);
 
 			// register resource manager success will trigger monitoring heartbeat target between
tm and rm
-			verify(rmGateway, timeout(timeout).atLeast(1)).registerTaskExecutor(
+			verify(rmGateway, timeout(verificationTimeout).atLeast(1)).registerTaskExecutor(
 				eq(rmLeaderId), eq(taskManager.getAddress()), eq(tmResourceId), eq(slotReport1), any(Time.class));
 
-			verify(heartbeatManager, timeout(timeout)).monitorTarget(any(ResourceID.class), any(HeartbeatTarget.class));
+			verify(heartbeatManager, timeout(verificationTimeout)).monitorTarget(any(ResourceID.class),
any(HeartbeatTarget.class));
 
 			TaskExecutorGateway taskExecutorGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);
 
@@ -440,7 +460,7 @@ public class TaskExecutorTest extends TestLogger {
 			ArgumentCaptor<SlotReport> slotReportArgumentCaptor = ArgumentCaptor.forClass(SlotReport.class);
 
 			// wait for heartbeat response
-			verify(rmGateway, timeout(timeout)).heartbeatFromTaskManager(
+			verify(rmGateway, timeout(verificationTimeout)).heartbeatFromTaskManager(
 				eq(taskManagerLocation.getResourceID()),
 				slotReportArgumentCaptor.capture());
 
@@ -453,7 +473,8 @@ public class TaskExecutorTest extends TestLogger {
 			testingFatalErrorHandler.rethrowError();
 
 		} finally {
-			rpc.stopService();
+			taskManager.shutDown();
+			taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		}
 	}
 
@@ -465,52 +486,51 @@ public class TaskExecutorTest extends TestLogger {
 		final String dispatcherAddress = "localhost";
 		final String jobManagerAddress = "localhost";
 
-		final TestingRpcService rpc = new TestingRpcService();
-		try {
-			// register a mock resource manager gateway
-			ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
-			when(rmGateway.registerTaskExecutor(
-					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-				.thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
-					new InstanceID(), resourceManagerResourceId, 10L)));
+		// register a mock resource manager gateway
+		ResourceManagerGateway rmGateway = mock(ResourceManagerGateway.class);
+		when(rmGateway.registerTaskExecutor(
+			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(
+				new InstanceID(), resourceManagerResourceId, 10L)));
 
-			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
-			when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+		TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
+		when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
 
-			rpc.registerGateway(resourceManagerAddress, rmGateway);
+		rpc.registerGateway(resourceManagerAddress, rmGateway);
 
-			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
-			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+		TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
+		when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
 
-			StandaloneHaServices haServices = new StandaloneHaServices(
-				resourceManagerAddress,
+		StandaloneHaServices haServices = new StandaloneHaServices(
+			resourceManagerAddress,
 				dispatcherAddress,
-				jobManagerAddress);
-
-			final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
-			final SlotReport slotReport = new SlotReport();
-			when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
-
-			final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-
-			TaskExecutor taskManager = new TaskExecutor(
-				rpc,
-				taskManagerServicesConfiguration,
-				taskManagerLocation,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				mock(NetworkEnvironment.class),
-				haServices,
-				mock(HeartbeatServices.class, RETURNS_MOCKS),
-				mock(MetricRegistry.class),
-				mock(TaskManagerMetricGroup.class),
-				mock(BroadcastVariableManager.class),
-				mock(FileCache.class),
-				taskSlotTable,
-				mock(JobManagerTable.class),
-				mock(JobLeaderService.class),
-				testingFatalErrorHandler);
+			jobManagerAddress);
+
+		final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+		final SlotReport slotReport = new SlotReport();
+		when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
 
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		TaskExecutor taskManager = new TaskExecutor(
+			rpc,
+			taskManagerServicesConfiguration,
+			taskManagerLocation,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			mock(NetworkEnvironment.class),
+			haServices,
+			mock(HeartbeatServices.class, RETURNS_MOCKS),
+			mock(MetricRegistry.class),
+			mock(TaskManagerMetricGroup.class),
+			mock(BroadcastVariableManager.class),
+			mock(FileCache.class),
+			taskSlotTable,
+			mock(JobManagerTable.class),
+			mock(JobLeaderService.class),
+			testingFatalErrorHandler);
+
+		try {
 			taskManager.start();
 			String taskManagerAddress = taskManager.getAddress();
 
@@ -521,7 +541,8 @@ public class TaskExecutorTest extends TestLogger {
 			testingFatalErrorHandler.rethrowError();
 		}
 		finally {
-			rpc.stopService();
+			taskManager.shutDown();
+			taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		}
 	}
 
@@ -536,64 +557,63 @@ public class TaskExecutorTest extends TestLogger {
 		final ResourceID rmResourceId1 = new ResourceID(address1);
 		final ResourceID rmResourceId2 = new ResourceID(address2);
 
-		final TestingRpcService rpc = new TestingRpcService();
-		try {
-			// register the mock resource manager gateways
-			ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
-			ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
-
-			when(rmGateway1.registerTaskExecutor(
-					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-					.thenReturn(CompletableFuture.completedFuture(
-						new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L)));
-			when(rmGateway2.registerTaskExecutor(
-					any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
-					.thenReturn(CompletableFuture.completedFuture(
-						new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L)));
-
-			rpc.registerGateway(address1, rmGateway1);
-			rpc.registerGateway(address2, rmGateway2);
-
-			TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(
-				null,
-				null);
-
-			TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
-			haServices.setResourceManagerLeaderRetriever(testLeaderService);
-
-			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
-			when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
-			when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
-			when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]);
-
-			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
-			when(taskManagerLocation.getResourceID()).thenReturn(tmResourceID);
-			when(taskManagerLocation.getHostname()).thenReturn("foobar");
-
-			final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
-			final SlotReport slotReport = new SlotReport();
-			when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
-
-			final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-
-			TaskExecutor taskManager = new TaskExecutor(
-				rpc,
-				taskManagerServicesConfiguration,
-				taskManagerLocation,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				mock(NetworkEnvironment.class),
-				haServices,
-				mock(HeartbeatServices.class, RETURNS_MOCKS),
-				mock(MetricRegistry.class),
-				mock(TaskManagerMetricGroup.class),
-				mock(BroadcastVariableManager.class),
-				mock(FileCache.class),
-				taskSlotTable,
-				mock(JobManagerTable.class),
-				mock(JobLeaderService.class),
-				testingFatalErrorHandler);
+		// register the mock resource manager gateways
+		ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
+		ResourceManagerGateway rmGateway2 = mock(ResourceManagerGateway.class);
+
+		when(rmGateway1.registerTaskExecutor(
+			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(
+				new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId1, 10L)));
+		when(rmGateway2.registerTaskExecutor(
+			any(UUID.class), anyString(), any(ResourceID.class), any(SlotReport.class), any(Time.class)))
+			.thenReturn(CompletableFuture.completedFuture(
+				new TaskExecutorRegistrationSuccess(new InstanceID(), rmResourceId2, 10L)));
 
+		rpc.registerGateway(address1, rmGateway1);
+		rpc.registerGateway(address2, rmGateway2);
+
+		TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(
+			null,
+			null);
+
+		TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		haServices.setResourceManagerLeaderRetriever(testLeaderService);
+
+		TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
+		when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+		when(taskManagerServicesConfiguration.getConfiguration()).thenReturn(new Configuration());
+		when(taskManagerServicesConfiguration.getTmpDirectories()).thenReturn(new String[1]);
+
+		TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
+		when(taskManagerLocation.getResourceID()).thenReturn(tmResourceID);
+		when(taskManagerLocation.getHostname()).thenReturn("foobar");
+
+		final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+		final SlotReport slotReport = new SlotReport();
+		when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(slotReport);
+
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		TaskExecutor taskManager = new TaskExecutor(
+			rpc,
+			taskManagerServicesConfiguration,
+			taskManagerLocation,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			mock(NetworkEnvironment.class),
+			haServices,
+			mock(HeartbeatServices.class, RETURNS_MOCKS),
+			mock(MetricRegistry.class),
+			mock(TaskManagerMetricGroup.class),
+			mock(BroadcastVariableManager.class),
+			mock(FileCache.class),
+			taskSlotTable,
+			mock(JobManagerTable.class),
+			mock(JobLeaderService.class),
+			testingFatalErrorHandler);
+
+		try {
 			taskManager.start();
 			String taskManagerAddress = taskManager.getAddress();
 
@@ -621,7 +641,8 @@ public class TaskExecutorTest extends TestLogger {
 			testingFatalErrorHandler.rethrowError();
 		}
 		finally {
-			rpc.stopService();
+			taskManager.shutDown();
+			taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		}
 	}
 
@@ -632,7 +653,6 @@ public class TaskExecutorTest extends TestLogger {
 	public void testTaskSubmission() throws Exception {
 		final Configuration configuration = new Configuration();
 
-		final TestingRpcService rpc = new TestingRpcService();
 		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
 		final JobID jobId = new JobID();
 		final AllocationID allocationId = new AllocationID();
@@ -709,27 +729,27 @@ public class TaskExecutorTest extends TestLogger {
 		final HighAvailabilityServices haServices = mock(HighAvailabilityServices.class);
 		when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class));
 
-		try {
-			final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-
-			TaskExecutor taskManager = new TaskExecutor(
-				rpc,
-				taskManagerConfiguration,
-				mock(TaskManagerLocation.class),
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				networkEnvironment,
-				haServices,
-				mock(HeartbeatServices.class, RETURNS_MOCKS),
-				mock(MetricRegistry.class),
-				taskManagerMetricGroup,
-				mock(BroadcastVariableManager.class),
-				mock(FileCache.class),
-				taskSlotTable,
-				jobManagerTable,
-				mock(JobLeaderService.class),
-				testingFatalErrorHandler);
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+
+		TaskExecutor taskManager = new TaskExecutor(
+			rpc,
+			taskManagerConfiguration,
+			mock(TaskManagerLocation.class),
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			networkEnvironment,
+			haServices,
+			mock(HeartbeatServices.class, RETURNS_MOCKS),
+			mock(MetricRegistry.class),
+			taskManagerMetricGroup,
+			mock(BroadcastVariableManager.class),
+			mock(FileCache.class),
+			taskSlotTable,
+			jobManagerTable,
+			mock(JobLeaderService.class),
+			testingFatalErrorHandler);
 
+		try {
 			taskManager.start();
 
 			final TaskExecutorGateway tmGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);
@@ -743,7 +763,8 @@ public class TaskExecutorTest extends TestLogger {
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
 		} finally {
-			rpc.stopService();
+			taskManager.shutDown();
+			taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		}
 	}
 
@@ -768,7 +789,6 @@ public class TaskExecutorTest extends TestLogger {
 	public void testJobLeaderDetection() throws Exception {
 		final JobID jobId = new JobID();
 
-		final TestingRpcService rpc = new TestingRpcService();
 		final Configuration configuration = new Configuration();
 		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
 		final ResourceID resourceId = new ResourceID("foobar");
@@ -830,25 +850,25 @@ public class TaskExecutorTest extends TestLogger {
 		final SlotID slotId = new SlotID(resourceId, 0);
 		final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN);
 
-		try {
-			TaskExecutor taskManager = new TaskExecutor(
-				rpc,
-				taskManagerConfiguration,
-				taskManagerLocation,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				mock(NetworkEnvironment.class),
-				haServices,
-				mock(HeartbeatServices.class, RETURNS_MOCKS),
-				mock(MetricRegistry.class),
-				mock(TaskManagerMetricGroup.class),
-				mock(BroadcastVariableManager.class),
-				mock(FileCache.class),
-				taskSlotTable,
-				jobManagerTable,
-				jobLeaderService,
-				testingFatalErrorHandler);
+		TaskExecutor taskManager = new TaskExecutor(
+			rpc,
+			taskManagerConfiguration,
+			taskManagerLocation,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			mock(NetworkEnvironment.class),
+			haServices,
+			mock(HeartbeatServices.class, RETURNS_MOCKS),
+			mock(MetricRegistry.class),
+			mock(TaskManagerMetricGroup.class),
+			mock(BroadcastVariableManager.class),
+			mock(FileCache.class),
+			taskSlotTable,
+			jobManagerTable,
+			jobLeaderService,
+			testingFatalErrorHandler);
 
+		try {
 			taskManager.start();
 
 			final TaskExecutorGateway tmGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);
@@ -880,7 +900,8 @@ public class TaskExecutorTest extends TestLogger {
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
 		} finally {
-			rpc.stopService();
+			taskManager.shutDown();
+			taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		}
 	}
 
@@ -892,7 +913,6 @@ public class TaskExecutorTest extends TestLogger {
 	public void testSlotAcceptance() throws Exception {
 		final JobID jobId = new JobID();
 
-		final TestingRpcService rpc = new TestingRpcService();
 		final Configuration configuration = new Configuration();
 		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
 		final ResourceID resourceId = new ResourceID("foobar");
@@ -951,25 +971,25 @@ public class TaskExecutorTest extends TestLogger {
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
 		rpc.registerGateway(jobManagerAddress, jobMasterGateway);
 
-		try {
-			TaskExecutor taskManager = new TaskExecutor(
-				rpc,
-				taskManagerConfiguration,
-				taskManagerLocation,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				mock(NetworkEnvironment.class),
-				haServices,
-				mock(HeartbeatServices.class, RETURNS_MOCKS),
-				mock(MetricRegistry.class),
-				mock(TaskManagerMetricGroup.class),
-				mock(BroadcastVariableManager.class),
-				mock(FileCache.class),
-				taskSlotTable,
-				jobManagerTable,
-				jobLeaderService,
-				testingFatalErrorHandler);
+		TaskExecutor taskManager = new TaskExecutor(
+			rpc,
+			taskManagerConfiguration,
+			taskManagerLocation,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			mock(NetworkEnvironment.class),
+			haServices,
+			mock(HeartbeatServices.class, RETURNS_MOCKS),
+			mock(MetricRegistry.class),
+			mock(TaskManagerMetricGroup.class),
+			mock(BroadcastVariableManager.class),
+			mock(FileCache.class),
+			taskSlotTable,
+			jobManagerTable,
+			jobLeaderService,
+			testingFatalErrorHandler);
 
+		try {
 			taskManager.start();
 
 			taskSlotTable.allocateSlot(0, jobId, allocationId1, Time.milliseconds(10000L));
@@ -992,7 +1012,8 @@ public class TaskExecutorTest extends TestLogger {
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
 		} finally {
-			rpc.stopService();
+			taskManager.shutDown();
+			taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		}
 	}
 
@@ -1012,51 +1033,50 @@ public class TaskExecutorTest extends TestLogger {
 		final JobID jobId = new JobID();
 		final String jobManagerAddress = "foobar";
 
-		final TestingRpcService rpc = new TestingRpcService();
-		try {
-			// register the mock resource manager gateways
-			ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
-			rpc.registerGateway(address1, rmGateway1);
-
-			TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(
-				address1,
-				HighAvailabilityServices.DEFAULT_LEADER_ID);
-
-			TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
-			haServices.setResourceManagerLeaderRetriever(testLeaderService);
-
-			TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
-			when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
-
-			TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
-			when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
-
-			final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
-			final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
-			when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(new SlotReport());
-			when(taskSlotTable.getCurrentAllocation(1)).thenReturn(new AllocationID());
-
-			when(rmGateway1.registerTaskExecutor(any(UUID.class), anyString(), eq(resourceID), any(SlotReport.class),
any(Time.class))).thenReturn(
-				CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(),
ResourceID.generate(), 1000L)));
-
-			TaskExecutor taskManager = new TaskExecutor(
-				rpc,
-				taskManagerServicesConfiguration,
-				taskManagerLocation,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				mock(NetworkEnvironment.class),
-				haServices,
-				mock(HeartbeatServices.class, RETURNS_MOCKS),
-				mock(MetricRegistry.class),
-				mock(TaskManagerMetricGroup.class),
-				mock(BroadcastVariableManager.class),
-				mock(FileCache.class),
-				taskSlotTable,
-				mock(JobManagerTable.class),
-				mock(JobLeaderService.class),
-				testingFatalErrorHandler);
+		// register the mock resource manager gateways
+		ResourceManagerGateway rmGateway1 = mock(ResourceManagerGateway.class);
+		rpc.registerGateway(address1, rmGateway1);
 
+		TestingLeaderRetrievalService testLeaderService = new TestingLeaderRetrievalService(
+			address1,
+			HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+		TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
+		haServices.setResourceManagerLeaderRetriever(testLeaderService);
+
+		TaskManagerConfiguration taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
+		when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+
+		TaskManagerLocation taskManagerLocation = mock(TaskManagerLocation.class);
+		when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+		final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+		when(taskSlotTable.createSlotReport(any(ResourceID.class))).thenReturn(new SlotReport());
+		when(taskSlotTable.getCurrentAllocation(1)).thenReturn(new AllocationID());
+
+		when(rmGateway1.registerTaskExecutor(any(UUID.class), anyString(), eq(resourceID), any(SlotReport.class),
any(Time.class))).thenReturn(
+			CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(),
ResourceID.generate(), 1000L)));
+
+		TaskExecutor taskManager = new TaskExecutor(
+			rpc,
+			taskManagerServicesConfiguration,
+			taskManagerLocation,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			mock(NetworkEnvironment.class),
+			haServices,
+			mock(HeartbeatServices.class, RETURNS_MOCKS),
+			mock(MetricRegistry.class),
+			mock(TaskManagerMetricGroup.class),
+			mock(BroadcastVariableManager.class),
+			mock(FileCache.class),
+			taskSlotTable,
+			mock(JobManagerTable.class),
+			mock(JobLeaderService.class),
+			testingFatalErrorHandler);
+
+		try {
 			taskManager.start();
 
 			final TaskExecutorGateway tmGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);
@@ -1116,7 +1136,8 @@ public class TaskExecutorTest extends TestLogger {
 			testingFatalErrorHandler.rethrowError();
 		}
 		finally {
-			rpc.stopService();
+			taskManager.shutDown();
+			taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		}
 
 	}
@@ -1128,7 +1149,6 @@ public class TaskExecutorTest extends TestLogger {
 	public void testSubmitTaskBeforeAcceptSlot() throws Exception {
 		final JobID jobId = new JobID();
 
-		final TestingRpcService rpc = new TestingRpcService();
 		final Configuration configuration = new Configuration();
 		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
 		final ResourceID resourceId = new ResourceID("foobar");
@@ -1180,6 +1200,9 @@ public class TaskExecutorTest extends TestLogger {
 			any(Time.class)
 		)).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId,
blobPort)));
 		when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
+		when(jobMasterGateway.updateTaskExecutionState(
+			any(UUID.class),
+			any(TaskExecutionState.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 
 		rpc.registerGateway(resourceManagerAddress, resourceManagerGateway);
@@ -1200,37 +1223,36 @@ public class TaskExecutorTest extends TestLogger {
 			mock(ResultPartitionConsumableNotifier.class),
 			mock(PartitionProducerStateChecker.class));
 
-		jobManagerTable.put(jobId, jobManagerConnection);
-
-		try {
-			final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
-			TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
-			when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
+		final TaskManagerMetricGroup taskManagerMetricGroup = mock(TaskManagerMetricGroup.class);
+		TaskMetricGroup taskMetricGroup = mock(TaskMetricGroup.class);
+		when(taskMetricGroup.getIOMetricGroup()).thenReturn(mock(TaskIOMetricGroup.class));
 
-			when(taskManagerMetricGroup.addTaskForJob(
-				any(JobID.class), anyString(), any(JobVertexID.class), any(ExecutionAttemptID.class),
-				anyString(), anyInt(), anyInt())
-			).thenReturn(taskMetricGroup);
+		when(taskManagerMetricGroup.addTaskForJob(
+			any(JobID.class), anyString(), any(JobVertexID.class), any(ExecutionAttemptID.class),
+			anyString(), anyInt(), anyInt())
+		).thenReturn(taskMetricGroup);
+
+		final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
+
+		final TaskExecutor taskManager = new TaskExecutor(
+			rpc,
+			taskManagerConfiguration,
+			taskManagerLocation,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			networkMock,
+			haServices,
+			mock(HeartbeatServices.class, RETURNS_MOCKS),
+			mock(MetricRegistry.class),
+			taskManagerMetricGroup,
+			mock(BroadcastVariableManager.class),
+			mock(FileCache.class),
+			taskSlotTable,
+			jobManagerTable,
+			jobLeaderService,
+			testingFatalErrorHandler);
 
-			final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
-
-			final TaskExecutor taskManager = new TaskExecutor(
-				rpc,
-				taskManagerConfiguration,
-				taskManagerLocation,
-				mock(MemoryManager.class),
-				mock(IOManager.class),
-				networkMock,
-				haServices,
-				mock(HeartbeatServices.class, RETURNS_MOCKS),
-				mock(MetricRegistry.class),
-				taskManagerMetricGroup,
-				mock(BroadcastVariableManager.class),
-				mock(FileCache.class),
-				taskSlotTable,
-				jobManagerTable,
-				jobLeaderService,
-				testingFatalErrorHandler);
+		try {
 			taskManager.start();
 
 			final TaskExecutorGateway tmGateway = taskManager.getSelfGateway(TaskExecutorGateway.class);
@@ -1307,8 +1329,80 @@ public class TaskExecutorTest extends TestLogger {
 			// check if a concurrent error occurred
 			testingFatalErrorHandler.rethrowError();
 		} finally {
-			rpc.stopService();
+			taskManager.shutDown();
+			taskManager.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
 		}
+	}
+
+	/**
+	 * This tests makes sure that duplicate JobMaster gained leadership messages are filtered
out
+	 * by the TaskExecutor.
+	 *
+	 * See FLINK-7526
+	 */
+	@Test
+	public void testFilterOutDuplicateJobMasterRegistrations() throws Exception {
+		final long verificationTimeout = 500L;
+		final Configuration configuration = new Configuration();
+		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
+		final JobLeaderService jobLeaderService = mock(JobLeaderService.class);
+		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
+		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(ResourceID.generate(),
InetAddress.getLocalHost(), 1234);
+
+		final HighAvailabilityServices haServicesMock = mock(HighAvailabilityServices.class, Mockito.RETURNS_MOCKS);
+		final HeartbeatServices heartbeatServicesMock = mock(HeartbeatServices.class, Mockito.RETURNS_MOCKS);
+
+		final JobID jobId = new JobID();
+		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
+		when(jobMasterGateway.getHostname()).thenReturn("localhost");
+		final UUID jobLeaderId = UUID.randomUUID();
+		final JMTMRegistrationSuccess registrationMessage = new JMTMRegistrationSuccess(ResourceID.generate(),
1);
+		final JobManagerTable jobManagerTableMock = spy(new JobManagerTable());
+
+		final TaskExecutor taskExecutor = new TaskExecutor(
+			rpc,
+			taskManagerConfiguration,
+			taskManagerLocation,
+			mock(MemoryManager.class),
+			mock(IOManager.class),
+			mock(NetworkEnvironment.class),
+			haServicesMock,
+			heartbeatServicesMock,
+			mock(MetricRegistry.class),
+			mock(TaskManagerMetricGroup.class),
+			mock(BroadcastVariableManager.class),
+			mock(FileCache.class),
+			mock(TaskSlotTable.class),
+			jobManagerTableMock,
+			jobLeaderService,
+			testingFatalErrorHandler);
 
+		try {
+			taskExecutor.start();
+
+			ArgumentCaptor<JobLeaderListener> jobLeaderListenerArgumentCaptor = ArgumentCaptor.forClass(JobLeaderListener.class);
+
+			verify(jobLeaderService).start(anyString(), any(RpcService.class), any(HighAvailabilityServices.class),
jobLeaderListenerArgumentCaptor.capture());
+
+			JobLeaderListener taskExecutorListener = jobLeaderListenerArgumentCaptor.getValue();
+
+			taskExecutorListener.jobManagerGainedLeadership(jobId, jobMasterGateway, jobLeaderId,
registrationMessage);
+
+			// duplicate job manager gained leadership message
+			taskExecutorListener.jobManagerGainedLeadership(jobId, jobMasterGateway, jobLeaderId,
registrationMessage);
+
+			ArgumentCaptor<JobManagerConnection> jobManagerConnectionArgumentCaptor = ArgumentCaptor.forClass(JobManagerConnection.class);
+
+			verify(jobManagerTableMock, Mockito.timeout(verificationTimeout).times(1)).put(eq(jobId),
jobManagerConnectionArgumentCaptor.capture());
+
+			JobManagerConnection jobManagerConnection = jobManagerConnectionArgumentCaptor.getValue();
+
+			assertEquals(jobMasterGateway, jobManagerConnection.getJobManagerGateway());
+
+			testingFatalErrorHandler.rethrowError();
+		} finally {
+			taskExecutor.shutDown();
+			taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+		}
 	}
 }


Mime
View raw message