flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4853) Clean up JobManager registration at the ResourceManager
Date Mon, 24 Oct 2016 13:40:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15602007#comment-15602007
] 

ASF GitHub Bot commented on FLINK-4853:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2657#discussion_r84689078
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
---
    @@ -202,101 +205,125 @@ public void shutDown() throws Exception {
     	//  RPC methods
     	// ------------------------------------------------------------------------
     
    -	/**
    -	 * Register a {@link JobMaster} at the resource manager.
    -	 *
    -	 * @param resourceManagerLeaderId The fencing token for the ResourceManager leader
    -	 * @param jobMasterAddress        The address of the JobMaster that registers
    -	 * @param jobID                   The Job ID of the JobMaster that registers
    -	 * @return Future registration response
    -	 */
     	@RpcMethod
    -	public Future<RegistrationResponse> registerJobMaster(
    -		final UUID resourceManagerLeaderId, final UUID jobMasterLeaderId,
    -		final String jobMasterAddress, final JobID jobID) {
    +	public Future<RegistrationResponse> registerJobManager(
    +			final UUID resourceManagerLeaderId,
    +			final UUID jobManagerLeaderId,
    +			final String jobManagerAddress,
    +			final JobID jobId) {
    +
    +		checkNotNull(resourceManagerLeaderId);
    +		checkNotNull(jobManagerLeaderId);
    +		checkNotNull(jobManagerAddress);
    +		checkNotNull(jobId);
    +
    +		if (isValid(resourceManagerLeaderId)) {
    +			if (!jobLeaderIdService.containsJob(jobId)) {
    +				try {
    +					jobLeaderIdService.addJob(jobId);
    +				} catch (Exception e) {
    +					// This should actually never happen because, it should always be possible to add
a new job
    +					ResourceManagerException exception = new ResourceManagerException("Could not add
the job " +
    +						jobId + " to the job id leader service. This should never happen.", e);
    +
    +					onFatalErrorAsync(exception);
    +
    +					log.debug("Could not add job {} to job leader id service.", jobId, e);
    +					return FlinkCompletableFuture.completedExceptionally(exception);
    +				}
    +			}
     
    -		checkNotNull(jobMasterAddress);
    -		checkNotNull(jobID);
    +			log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress,
jobId);
    +
    +			Future<UUID> jobLeaderIdFuture;
     
    -		// create a leader retriever in case it doesn't exist
    -		final JobIdLeaderListener jobIdLeaderListener;
    -		if (leaderListeners.containsKey(jobID)) {
    -			jobIdLeaderListener = leaderListeners.get(jobID);
    -		} else {
     			try {
    -				LeaderRetrievalService jobMasterLeaderRetriever =
    -					highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
    -				jobIdLeaderListener = new JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
    +				jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId);
     			} catch (Exception e) {
    -				log.warn("Failed to start JobMasterLeaderRetriever for job id {}", jobID, e);
    +				// we cannot check the job leader id so let's fail
    +				// TODO: Maybe it's also ok to skip this check in case that we cannot check the leader
id
    +				ResourceManagerException exception = new ResourceManagerException("Cannot obtain
the " +
    +					"job leader id future to verify the correct job leader.", e);
    +
    +				onFatalErrorAsync(exception);
     
    -				return FlinkCompletableFuture.<RegistrationResponse>completed(
    -					new RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
    +				log.debug("Could not obtain the job leader id future to verify the correct job leader.");
    +				return FlinkCompletableFuture.completedExceptionally(exception);
     			}
     
    -			leaderListeners.put(jobID, jobIdLeaderListener);
    -		}
    +			Future<JobMasterGateway> jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress,
JobMasterGateway.class);
     
    -		return getRpcService()
    -			.execute(new Callable<JobMasterGateway>() {
    +			Future<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(jobLeaderIdFuture,
new BiFunction<JobMasterGateway, UUID, RegistrationResponse>() {
     				@Override
    -				public JobMasterGateway call() throws Exception {
    +				public RegistrationResponse apply(JobMasterGateway jobMasterGateway, UUID jobLeaderId)
{
    +					if (isValid(resourceManagerLeaderId)) {
    +						if (jobLeaderId.equals(jobManagerLeaderId)) {
    +							if (jobManagerRegistrations.containsKey(jobId)) {
    +								JobManagerRegistration oldJobManagerRegistration = jobManagerRegistrations.get(jobId);
    +
    +								if (oldJobManagerRegistration.getLeaderID().equals(jobLeaderId)) {
    +									// same registration
    +									log.debug("Job manager {}@{} was already registered.", jobManagerLeaderId, jobManagerAddress);
    +								} else {
    +									// tell old job manager that he is no longer the job leader
    +									disconnectJobManager(
    +										oldJobManagerRegistration.getJobID(),
    +										new Exception("New job leader for job " + jobId + " found."));
    --- End diff --
    
    I see. That seems like the only way to resolve the leader id in a non-blocking fashion
while ensuring eventually correct registration.


> Clean up JobManager registration at the ResourceManager
> -------------------------------------------------------
>
>                 Key: FLINK-4853
>                 URL: https://issues.apache.org/jira/browse/FLINK-4853
>             Project: Flink
>          Issue Type: Sub-task
>          Components: ResourceManager
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>
> The current {{JobManager}} registration at the {{ResourceManager}} blocks threads in
the {{RpcService.execute}} pool. This is not ideal and can be avoided by not waiting on a
{{Future}} in this call.
> I propose to encapsulate the leader id retrieval operation in a distinct service so that
it can be separated from the {{ResourceManager}}. This will reduce the complexity of the {{ResourceManager}}
and make the individual components easier to test.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message