flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4364) Implement TaskManager side of heartbeat from JobManager
Date Fri, 24 Feb 2017 14:07:46 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4364?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882734#comment-15882734
] 

ASF GitHub Bot commented on FLINK-4364:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3151#discussion_r102945400
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
---
    @@ -105,6 +110,105 @@
     	@Rule
     	public TestName name = new TestName();
     
    +	@Test
    +	public void testHeartbeatTimeoutWithJobManager() throws Exception {
    +		final JobID jobId = new JobID();
    +		final Configuration configuration = new Configuration();
    +		final TaskManagerConfiguration tmConfig = TaskManagerConfiguration.fromConfiguration(configuration);
    +		final ResourceID tmResourceId = new ResourceID("tm");
    +		final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(tmResourceId,
InetAddress.getLoopbackAddress(), 1234);
    +		final TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)),
mock(TimerService.class));
    +
    +		final TestingSerialRpcService rpc = new TestingSerialRpcService();
    +		final JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
    +		final TestingHighAvailabilityServices haServices = new TestingHighAvailabilityServices();
    +		final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService();
    +		haServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService);
    +		haServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
    +
    +		final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
    +
    +		final long heartbeatTimeout = 1000L;
    +		final HeartbeatManagerImpl<Object, Object> tmHeartbeatManager = new HeartbeatManagerImpl<>(
    +				heartbeatTimeout,
    +				tmResourceId,
    +				rpc.getExecutor(),
    +				Executors.newSingleThreadScheduledExecutor(),
    +				log);
    +
    +		final String jobMasterAddress = "jm";
    +		final UUID jmLeaderId = UUID.randomUUID();
    +		final ResourceID jmResourceId = new ResourceID(jobMasterAddress);
    +		final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class);
    +		final int blobPort = 42;
    +
    +		when(jobMasterGateway.registerTaskManager(
    +				any(String.class),
    +				eq(taskManagerLocation),
    +				eq(jmLeaderId),
    +				any(Time.class)
    +		)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new JMTMRegistrationSuccess(jmResourceId,
blobPort)));
    +		when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
    +
    +		try {
    +			final TaskExecutor taskManager = new TaskExecutor(
    +					tmConfig,
    +					taskManagerLocation,
    +					rpc,
    +					mock(MemoryManager.class),
    +					mock(IOManager.class),
    +					mock(NetworkEnvironment.class),
    +					haServices,
    +					mock(MetricRegistry.class),
    +					tmHeartbeatManager,
    +					mock(TaskManagerMetricGroup.class),
    +					mock(BroadcastVariableManager.class),
    +					mock(FileCache.class),
    +					taskSlotTable,
    +					new JobManagerTable(),
    +					jobLeaderService,
    +					testingFatalErrorHandler);
    +
    +			taskManager.start();
    +
    +			rpc.registerGateway(jobMasterAddress, jobMasterGateway);
    +
    +			// we have to add the job after the TaskExecutor, because otherwise the service has
not
    +			// been properly started.
    +			jobLeaderService.addJob(jobId, jobMasterAddress);
    +
    +			// now inform the task manager about the new job leader
    +			jmLeaderRetrievalService.notifyListener(jobMasterAddress, jmLeaderId);
    +
    +			// register task manager success will trigger monitoring heartbeat target between
tm and jm
    +			verify(jobMasterGateway).registerTaskManager(
    +					eq(taskManager.getAddress()), eq(taskManagerLocation), eq(jmLeaderId), any(Time.class));
    +
    +			final ConcurrentHashMap<ResourceID, Object> heartbeatTargets = Whitebox.getInternalState(tmHeartbeatManager,
"heartbeatTargets");
    +			final JobManagerTable jobManagerTable = Whitebox.getInternalState(taskManager, "jobManagerTable");
    +			final Map<ResourceID, JobManagerConnection> jobManagerConnections = Whitebox.getInternalState(taskManager,
"jobManagerConnections");
    +
    +			// before heartbeat timeout
    +			assertTrue(heartbeatTargets.containsKey(jmResourceId));
    +			assertTrue(jobManagerTable.contains(jobId));
    +			assertTrue(jobManagerConnections.containsKey(jmResourceId));
    +
    +			// the job manager will not schedule heartbeat because of mock and the task manager
will be notified heartbeat timeout
    +			Thread.sleep(heartbeatTimeout);
    --- End diff --
    
    `sleep` is not good.


> Implement TaskManager side of heartbeat from JobManager
> -------------------------------------------------------
>
>                 Key: FLINK-4364
>                 URL: https://issues.apache.org/jira/browse/FLINK-4364
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: zhijiang
>            Assignee: zhijiang
>
> The {{JobManager}} initiates heartbeat messages via (JobID, JmLeaderID), and the {{TaskManager}}
will report metrics info for each heartbeat.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message