flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #5318: [FLINK-8462] [flip6] Filter invalid heartbeat time...
Date Mon, 22 Jan 2018 09:59:20 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5318#discussion_r162890000
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
---
    @@ -1425,4 +1440,137 @@ public void testFilterOutDuplicateJobMasterRegistrations() throws
Exception {
     			taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
     		}
     	}
    +
    +	/**
    +	 * Tests that the heartbeat is stopped once the TaskExecutor detects that the RM is
no longer leader.
    +	 *
    +	 * <p>See FLINK-8462
    +	 */
    +	@Test
    +	public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {
    +		final long heartbeatInterval = 1L;
    +		final long heartbeatTimeout = 1000L;
    +		final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(new
Configuration());
    +		final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
    +		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
    +		final RecordingHeartbeatServices heartbeatServices = new RecordingHeartbeatServices(heartbeatInterval,
heartbeatTimeout);
    +		final ResourceID rmResourceID = ResourceID.generate();
    +
    +		final TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN),
timerService);
    +
    +		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    +		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +
    +		final String rmAddress = "rm";
    +		final TestingResourceManagerGateway rmGateway = new TestingResourceManagerGateway(
    +			ResourceManagerId.generate(),
    +			rmResourceID,
    +			heartbeatInterval,
    +			rmAddress,
    +			rmAddress);
    +
    +		final CompletableFuture<ResourceID> registeredTaskManagerFuture = new CompletableFuture<>();
    +
    +		rmGateway.setRegisterTaskExecutorFunction(
    +			info -> {
    +				registeredTaskManagerFuture.complete(info.f1);
    +				return CompletableFuture.completedFuture(
    +					new TaskExecutorRegistrationSuccess(
    +						new InstanceID(),
    +						rmResourceID,
    +						heartbeatInterval));
    +			});
    +
    +		rpc.registerGateway(rmAddress, rmGateway);
    +
    +		final TaskExecutor taskExecutor = new TaskExecutor(
    +			rpc,
    +			taskManagerConfiguration,
    +			taskManagerLocation,
    +			mock(MemoryManager.class),
    +			mock(IOManager.class),
    +			mock(NetworkEnvironment.class),
    +			haServices,
    +			heartbeatServices,
    +			mock(TaskManagerMetricGroup.class),
    +			mock(BroadcastVariableManager.class),
    +			mock(FileCache.class),
    +			taskSlotTable,
    +			mock(JobManagerTable.class),
    +			mock(JobLeaderService.class),
    +			testingFatalErrorHandler);
    +
    +		try {
    +			taskExecutor.start();
    +
    +			rmLeaderRetrievalService.notifyListener(rmAddress, rmGateway.getFencingToken().toUUID());
    +
    +			// wait for TM registration
    +			assertThat(
    +				registeredTaskManagerFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS),
    +				org.hamcrest.Matchers.equalTo(taskManagerLocation.getResourceID()));
    +
    +			final BlockingQueue<ResourceID> unmonitoredTargets = heartbeatServices.getUnmonitoredTargets();
    +
    +			// let RM lose leadership
    +			rmLeaderRetrievalService.notifyListener(null, null);
    +
    +			// the timeout should not have triggered since it is much higher
    +			assertThat(unmonitoredTargets.poll(100L, TimeUnit.MILLISECONDS), org.hamcrest.Matchers.equalTo(rmResourceID));
    --- End diff --
    
    Will change it.


---

Mime
View raw message