flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetzger <...@git.apache.org>
Subject [GitHub] flink pull request #2085: [FLINK-3937] programmatic resuming of clusters
Date Mon, 13 Jun 2016 13:58:35 GMT
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2085#discussion_r66795439
  
    --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
---
    @@ -108,24 +131,83 @@ public FlinkYarnSessionCli(String shortPrefix, String longPrefix,
boolean accept
     		NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name
for the application on YARN");
     	}
     
    +
     	/**
    -	 * Creates a new Yarn Client.
    -	 * @param cmd the command line to parse options from
    -	 * @return an instance of the client or null if there was an error
    +	 * Resumes from a Flink Yarn properties file
    +	 * @param flinkConfiguration The flink configuration
    +	 * @return True if the properties were loaded, false otherwise
     	 */
    -	public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
    +	private boolean resumeFromYarnProperties(Configuration flinkConfiguration) {
    +		// load the YARN properties
    +		File propertiesFile = new File(getYarnPropertiesLocation(flinkConfiguration));
    +		if (!propertiesFile.exists()) {
    +			return false;
    +		}
    +
    +		logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());
    +
    +		Properties yarnProperties = new Properties();
    +		try {
    +			try (InputStream is = new FileInputStream(propertiesFile)) {
    +				yarnProperties.load(is);
    +			}
    +		}
    +		catch (IOException e) {
    +			throw new RuntimeException("Cannot read the YARN properties file", e);
    +		}
    +
    +		// configure the default parallelism from YARN
    +		String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
    +		if (propParallelism != null) { // maybe the property is not set
    +			try {
    +				int parallelism = Integer.parseInt(propParallelism);
    +				flinkConfiguration.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism);
    +
    +				logAndSysout("YARN properties set default parallelism to " + parallelism);
    +			}
    +			catch (NumberFormatException e) {
    +				throw new RuntimeException("Error while parsing the YARN properties: " +
    +					"Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer.");
    +			}
    +		}
    +
    +		// get the JobManager address from the YARN properties
    +		String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
    +		InetSocketAddress jobManagerAddress;
    +		if (address != null) {
    +			try {
    +				jobManagerAddress = ClientUtils.parseHostPortAddress(address);
    +				// store address in config from where it is retrieved by the retrieval service
    +				CliFrontend.writeJobManagerAddressToConfig(flinkConfiguration, jobManagerAddress);
    +			}
    +			catch (Exception e) {
    +				throw new RuntimeException("YARN properties contain an invalid entry for JobManager
address.", e);
    +			}
    +
    +			logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress);
    +		}
     
    -		AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
    -		if (flinkYarnClient == null) {
    -			return null;
    +		// handle the YARN client's dynamic properties
    +		String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
    +		Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
    +		for (Map.Entry<String, String> dynamicProperty : dynamicProperties.entrySet())
{
    +			flinkConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
     		}
     
    +		return true;
    +	}
    +
    +	public YarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine
cmd) {
    +
    +
    +		YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor();
    --- End diff --
    
    A bit too many blank lines ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message