flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GJL <...@git.apache.org>
Subject [GitHub] flink pull request #5431: [FLINK-8608] [flip6] Implement MiniDispatcher for ...
Date Wed, 14 Feb 2018 10:03:42 GMT
Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5431#discussion_r168126480
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
---
    @@ -163,29 +210,130 @@ protected void runCluster(Configuration configuration) throws Exception
{
     				blobServer,
     				heartbeatServices,
     				metricRegistry);
    +
    +			// TODO: Make shutDownAndTerminate non blocking to not use the global executor
    +			dispatcher.getTerminationFuture().whenCompleteAsync(
    +				(Boolean success, Throwable throwable) -> {
    +					if (throwable != null) {
    +						LOG.info("Could not properly terminate the Dispatcher.", throwable);
    +					}
    +
    +					shutDownAndTerminate(
    +						SUCCESS_RETURN_CODE,
    +						ApplicationStatus.SUCCEEDED,
    +						true);
    +				});
     		}
     	}
     
     	protected void initializeServices(Configuration configuration) throws Exception {
    -		assert(Thread.holdsLock(lock));
     
     		LOG.info("Initializing cluster services.");
     
    -		final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS);
    -		// TODO: Add support for port ranges
    -		final String portRange = String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
    -
    -		commonRpcService = createRpcService(configuration, bindAddress, portRange);
    -		haServices = createHaServices(configuration, commonRpcService.getExecutor());
    -		blobServer = new BlobServer(configuration, haServices.createBlobStore());
    -		blobServer.start();
    -		heartbeatServices = createHeartbeatServices(configuration);
    -		metricRegistry = createMetricRegistry(configuration);
    -
    -		// TODO: This is a temporary hack until we have ported the MetricQueryService to the
new RpcEndpoint
    -		// start the MetricQueryService
    -		final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem();
    -		metricRegistry.startQueryService(actorSystem, null);
    +		synchronized (lock) {
    +			final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS);
    +			// TODO: Add support for port ranges
    +			final String portRange = String.valueOf(configuration.getInteger(JobManagerOptions.PORT));
    +
    +			commonRpcService = createRpcService(configuration, bindAddress, portRange);
    +			haServices = createHaServices(configuration, commonRpcService.getExecutor());
    +			blobServer = new BlobServer(configuration, haServices.createBlobStore());
    +			blobServer.start();
    +			heartbeatServices = createHeartbeatServices(configuration);
    +			metricRegistry = createMetricRegistry(configuration);
    +
    +			// TODO: This is a temporary hack until we have ported the MetricQueryService to the
new RpcEndpoint
    +			// start the MetricQueryService
    +			final ActorSystem actorSystem = ((AkkaRpcService) commonRpcService).getActorSystem();
    +			metricRegistry.startQueryService(actorSystem, null);
    +		}
    +	}
    +
    +	protected void startClusterComponents(
    +		Configuration configuration,
    --- End diff --
    
    nit: indentation


---

Mime
View raw message