flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tillrohrmann <...@git.apache.org>
Subject [GitHub] flink pull request #4271: [FLINK-7113] Make ClusterDescriptor independent of...
Date Wed, 26 Jul 2017 10:08:52 GMT
Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4271#discussion_r129532134
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
---
    @@ -520,72 +439,128 @@ protected YarnClusterClient deployInternal() throws Exception {
     			taskManagerMemoryMb =  yarnMinAllocationMB;
     		}
     
    -		// Create application via yarnClient
    -		final YarnClientApplication yarnApplication = yarnClient.createApplication();
    -		GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();
    -
    -		Resource maxRes = appResponse.getMaximumResourceCapability();
     		final String note = "Please check the 'yarn.scheduler.maximum-allocation-mb' and the
'yarn.nodemanager.resource.memory-mb' configuration values\n";
    -		if (jobManagerMemoryMb > maxRes.getMemory()) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    +		if (jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
     			throw new YarnDeploymentException("The cluster does not have the requested resources
for the JobManager available!\n"
    -				+ "Maximum Memory: " + maxRes.getMemory() + "MB Requested: " + jobManagerMemoryMb
+ "MB. " + note);
    +				+ "Maximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " +
jobManagerMemoryMb + "MB. " + note);
     		}
     
    -		if (taskManagerMemoryMb > maxRes.getMemory()) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    +		if (taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
     			throw new YarnDeploymentException("The cluster does not have the requested resources
for the TaskManagers available!\n"
    -				+ "Maximum Memory: " + maxRes.getMemory() + " Requested: " + taskManagerMemoryMb
+ "MB. " + note);
    +				+ "Maximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + taskManagerMemoryMb
+ "MB. " + note);
     		}
     
     		final String noteRsc = "\nThe Flink YARN client will try to allocate the YARN session,
but maybe not all TaskManagers are " +
     			"connecting from the beginning because the resources are currently not available in
the cluster. " +
     			"The allocation might take more time than usual because the Flink YARN client needs
to wait until " +
     			"the resources become available.";
     		int totalMemoryRequired = jobManagerMemoryMb + taskManagerMemoryMb * taskManagerCount;
    -		ClusterResourceDescription freeClusterMem;
    -		try {
    -			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
    -		} catch (YarnException | IOException e) {
    -			failSessionDuringDeployment(yarnClient, yarnApplication);
    -			throw new YarnDeploymentException("Could not retrieve information about free cluster
resources.", e);
    -		}
     
    -		if (freeClusterMem.totalFreeMemory < totalMemoryRequired) {
    +
    +		if (freeClusterResources.totalFreeMemory < totalMemoryRequired) {
     			LOG.warn("This YARN session requires " + totalMemoryRequired + "MB of memory in the
cluster. "
    -				+ "There are currently only " + freeClusterMem.totalFreeMemory + "MB available."
+ noteRsc);
    +				+ "There are currently only " + freeClusterResources.totalFreeMemory + "MB available."
+ noteRsc);
     
     		}
    -		if (taskManagerMemoryMb > freeClusterMem.containerLimit) {
    +		if (taskManagerMemoryMb > freeClusterResources.containerLimit) {
     			LOG.warn("The requested amount of memory for the TaskManagers (" + taskManagerMemoryMb
+ "MB) is more than "
    -				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
    +				+ "the largest possible YARN container: " + freeClusterResources.containerLimit +
noteRsc);
     		}
    -		if (jobManagerMemoryMb > freeClusterMem.containerLimit) {
    +		if (jobManagerMemoryMb > freeClusterResources.containerLimit) {
     			LOG.warn("The requested amount of memory for the JobManager (" + jobManagerMemoryMb
+ "MB) is more than "
    -				+ "the largest possible YARN container: " + freeClusterMem.containerLimit + noteRsc);
    +				+ "the largest possible YARN container: " + freeClusterResources.containerLimit +
noteRsc);
     		}
     
     		// ----------------- check if the requested containers fit into the cluster.
     
    -		int[] nmFree = Arrays.copyOf(freeClusterMem.nodeManagersFree, freeClusterMem.nodeManagersFree.length);
    +		int[] nmFree = Arrays.copyOf(freeClusterResources.nodeManagersFree, freeClusterResources.nodeManagersFree.length);
     		// first, allocate the jobManager somewhere.
     		if (!allocateResource(nmFree, jobManagerMemoryMb)) {
     			LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master.
" +
     				"The JobManager requires " + jobManagerMemoryMb + "MB. NodeManagers available: "
+
    -				Arrays.toString(freeClusterMem.nodeManagersFree) + noteRsc);
    +				Arrays.toString(freeClusterResources.nodeManagersFree) + noteRsc);
     		}
     		// allocate TaskManagers
     		for (int i = 0; i < taskManagerCount; i++) {
     			if (!allocateResource(nmFree, taskManagerMemoryMb)) {
     				LOG.warn("There is not enough memory available in the YARN cluster. " +
     					"The TaskManager(s) require " + taskManagerMemoryMb + "MB each. " +
    -					"NodeManagers available: " + Arrays.toString(freeClusterMem.nodeManagersFree) +
"\n" +
    +					"NodeManagers available: " + Arrays.toString(freeClusterResources.nodeManagersFree)
+ "\n" +
     					"After allocating the JobManager (" + jobManagerMemoryMb + "MB) and (" + i + "/"
+ taskManagerCount + ") TaskManagers, " +
     					"the following NodeManagers are available: " + Arrays.toString(nmFree)  + noteRsc);
     			}
     		}
     
    -		ApplicationReport report = startAppMaster(null, yarnClient, yarnApplication);
    +		return new ClusterSpecification(
    +			jobManagerMemoryMb,
    +			taskManagerMemoryMb,
    +			clusterSpecification.getNumberTaskManagers(),
    +			clusterSpecification.getSlotsPerTaskManager());
    +
    +	}
    +
    +	protected void logClusterSpecification(ClusterSpecification clusterSpecification) {
    --- End diff --
    
    True. Will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message