Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 95286200B17 for ; Tue, 21 Jun 2016 14:19:29 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 93A93160A4F; Tue, 21 Jun 2016 12:19:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B2DA5160A36 for ; Tue, 21 Jun 2016 14:19:28 +0200 (CEST) Received: (qmail 76823 invoked by uid 500); 21 Jun 2016 12:19:27 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 76814 invoked by uid 99); 21 Jun 2016 12:19:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Jun 2016 12:19:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C55C9E020A; Tue, 21 Jun 2016 12:19:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxm@apache.org To: commits@flink.apache.org Message-Id: <996cb19c3765477f9c769c2c3c3a7d6f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-4079] do not load Yarn properties file for per-job Yarn clusters Date: Tue, 21 Jun 2016 12:19:27 +0000 (UTC) archived-at: Tue, 21 Jun 2016 12:19:29 -0000 Repository: flink Updated Branches: refs/heads/release-1.0 e7d6952f1 -> 3163638a6 [FLINK-4079] do not load Yarn properties file for per-job Yarn clusters Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3163638a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3163638a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3163638a Branch: refs/heads/release-1.0 Commit: 3163638a61f49dd2df55467615096918bec3a890 Parents: e7d6952 Author: Maximilian Michels Authored: Tue Jun 21 11:03:25 2016 +0200 Committer: Maximilian Michels Committed: Tue Jun 21 14:20:20 2016 +0200 ---------------------------------------------------------------------- .../org/apache/flink/client/CliFrontend.java | 126 ++++++++++--------- 1 file changed, 67 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3163638a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java index 508b2b8..5ac8c5d 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java @@ -173,65 +173,6 @@ public class CliFrontend { GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath()); this.config = GlobalConfiguration.getConfiguration(); - // load the YARN properties - String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); - String currentUser = System.getProperty("user.name"); - String propertiesFileLocation = config.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); - - File propertiesFile = new File(propertiesFileLocation, CliFrontend.YARN_PROPERTIES_FILE + currentUser); - if (propertiesFile.exists()) { - - logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath()); - - Properties yarnProperties = new Properties(); - try { - try (InputStream is = new FileInputStream(propertiesFile)) { - yarnProperties.load(is); - } - } - catch (IOException e) { - throw new Exception("Cannot read the YARN properties file", e); - } - - // configure the default parallelism from YARN - String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM); - if (propParallelism != null) { // maybe the property is not set - try { - int parallelism = Integer.parseInt(propParallelism); - this.config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism); - - logAndSysout("YARN properties set default parallelism to " + parallelism); - } - catch (NumberFormatException e) { - throw new Exception("Error while parsing the YARN properties: " + - "Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer."); - } - } - - // get the JobManager address from the YARN properties - String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY); - InetSocketAddress jobManagerAddress; - if (address != null) { - try { - jobManagerAddress = parseHostPortAddress(address); - // store address in config from where it is retrieved by the retrieval service - writeJobManagerAddressToConfig(jobManagerAddress); - } - catch (Exception e) { - throw new Exception("YARN properties contain an invalid entry for JobManager address.", e); - } - - logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress); - } - - // handle the YARN client's dynamic properties - String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING); - Map dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); - for (Map.Entry dynamicProperty : dynamicProperties.entrySet()) { - this.config.setString(dynamicProperty.getKey(), dynamicProperty.getValue()); - } - } - try { FileSystem.setDefaultScheme(config); } catch (IOException e) { @@ -944,6 +885,13 @@ public class CliFrontend { * @param options Command line options */ protected void updateConfig(CommandLineOptions options) { + + try { + loadYarnProperties(); + } catch (Exception e) { + LOG.error("Couldn't load Yarn properties file", e); + } + if(options.getJobManagerAddress() != null){ InetSocketAddress jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress()); writeJobManagerAddressToConfig(jobManagerAddress); @@ -1082,6 +1030,13 @@ public class CliFrontend { } } else { + + try { + loadYarnProperties(); + } catch (Exception e) { + LOG.error("Couldn't load Yarn properties file", e); + } + if(options.getJobManagerAddress() != null) { jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress()); writeJobManagerAddressToConfig(jobManagerAddress); @@ -1323,4 +1278,57 @@ public class CliFrontend { return Collections.emptyMap(); } } + + private void loadYarnProperties() throws Exception { + + // load the YARN properties + String defaultPropertiesFileLocation = System.getProperty("java.io.tmpdir"); + String currentUser = System.getProperty("user.name"); + String propertiesFileLocation = config.getString(ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesFileLocation); + + File propertiesFile = new File(propertiesFileLocation, CliFrontend.YARN_PROPERTIES_FILE + currentUser); + if (propertiesFile.exists()) { + + logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath()); + + Properties yarnProperties = new Properties(); + try (InputStream is = new FileInputStream(propertiesFile)) { + yarnProperties.load(is); + } + + // configure the default parallelism from YARN + String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM); + if (propParallelism != null) { // maybe the property is not set + try { + int parallelism = Integer.parseInt(propParallelism); + this.config.setInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, parallelism); + + logAndSysout("YARN properties set default parallelism to " + parallelism); + } + catch (NumberFormatException e) { + throw new IOException("Error while parsing the YARN properties: " + + "Property " + YARN_PROPERTIES_PARALLELISM + " is not an integer."); + } + } + + // get the JobManager address from the YARN properties + String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY); + InetSocketAddress jobManagerAddress; + if (address != null) { + jobManagerAddress = parseHostPortAddress(address); + // store address in config from where it is retrieved by the retrieval service + writeJobManagerAddressToConfig(jobManagerAddress); + + logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress); + } + + // handle the YARN client's dynamic properties + String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING); + Map dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); + for (Map.Entry dynamicProperty : dynamicProperties.entrySet()) { + this.config.setString(dynamicProperty.getKey(), dynamicProperty.getValue()); + } + } + + } }