flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #5801: [FLINK-9121] [flip6] Remove Flip6 prefixes and oth...
Date Wed, 18 Jul 2018 12:08:30 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5801#discussion_r203352693
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ---
    @@ -49,30 +54,45 @@ public YarnClusterDescriptor(
     
     	@Override
     	protected String getYarnSessionClusterEntrypoint() {
    -		return YarnApplicationMasterRunner.class.getName();
    +		return YarnSessionClusterEntrypoint.class.getName();
     	}
     
     	@Override
     	protected String getYarnJobClusterEntrypoint() {
    -		throw new UnsupportedOperationException("The old Yarn descriptor does not support proper
per-job mode.");
    +		return YarnJobClusterEntrypoint.class.getName();
     	}
     
     	@Override
    -	public YarnClusterClient deployJobCluster(
    -			ClusterSpecification clusterSpecification,
    -			JobGraph jobGraph,
    -			boolean detached) {
    -		throw new UnsupportedOperationException("Cannot deploy a per-job yarn cluster yet.");
    +	public ClusterClient<ApplicationId> deployJobCluster(
    +		ClusterSpecification clusterSpecification,
    +		JobGraph jobGraph,
    +		boolean detached) throws ClusterDeploymentException {
    +
    +		// this is required because the slots are allocated lazily
    +		jobGraph.setAllowQueuedScheduling(true);
    +
    +		try {
    +			return deployInternal(
    +				clusterSpecification,
    +				"Flink per-job cluster",
    +				getYarnJobClusterEntrypoint(),
    +				jobGraph,
    +				detached);
    +		} catch (Exception e) {
    +			throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
    +		}
     	}
     
     	@Override
    -	protected ClusterClient<ApplicationId> createYarnClusterClient(AbstractYarnClusterDescriptor
descriptor, int numberTaskManagers, int slotsPerTaskManager, ApplicationReport report, Configuration
flinkConfiguration, boolean perJobCluster) throws Exception {
    -		return new YarnClusterClient(
    -			descriptor,
    -			numberTaskManagers,
    -			slotsPerTaskManager,
    -			report,
    +	protected ClusterClient<ApplicationId> createYarnClusterClient(
    +			AbstractYarnClusterDescriptor descriptor,
    +			int numberTaskManagers,
    +			int slotsPerTaskManager,
    +			ApplicationReport report,
    +			Configuration flinkConfiguration,
    +			boolean perJobCluster) throws Exception {
    +		return new RestClusterClient<>(
    --- End diff --
    
    Because the new Flink architecture will use REST calls for the client-server communication.
This is unlike how it was done in the legacy architecture (pre Flip-6).


---

Mime
View raw message