Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E82ED200B46 for ; Fri, 1 Jul 2016 20:11:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E6D99160A61; Fri, 1 Jul 2016 18:11:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E731B160A6C for ; Fri, 1 Jul 2016 20:11:41 +0200 (CEST) Received: (qmail 56790 invoked by uid 500); 1 Jul 2016 18:11:41 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 56721 invoked by uid 99); 1 Jul 2016 18:11:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 01 Jul 2016 18:11:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id EA0DDE0B66; Fri, 1 Jul 2016 18:11:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxm@apache.org To: commits@flink.apache.org Date: Fri, 01 Jul 2016 18:11:42 -0000 Message-Id: <16ce6488c9e44fe08ad5803c3b60bde0@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] flink git commit: [FLINK-4144] Yarn properties file: replace hostname/port with Yarn application id archived-at: Fri, 01 Jul 2016 18:11:43 -0000 [FLINK-4144] Yarn properties file: replace hostname/port with Yarn application id This closes #2191 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ab6837f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ab6837f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ab6837f Branch: refs/heads/master Commit: 7ab6837fde3adb588273ef6bb8f4f7a215fe9c03 Parents: f722b73 Author: Maximilian Michels Authored: Fri Jul 1 18:54:44 2016 +0200 Committer: Maximilian Michels Committed: Fri Jul 1 20:12:46 2016 +0200 ---------------------------------------------------------------------- ...CliFrontendYarnAddressConfigurationTest.java | 3 +- .../yarn/AbstractYarnClusterDescriptor.java | 38 ----------- .../flink/yarn/cli/FlinkYarnSessionCli.java | 70 +++++++++++--------- 3 files changed, 38 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7ab6837f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java index a99c835..c3328a2 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java @@ -109,8 +109,7 @@ public class CliFrontendYarnAddressConfigurationTest { private static final ApplicationId TEST_YARN_APPLICATION_ID = ApplicationId.newInstance(System.currentTimeMillis(), 42); - private static final String validPropertiesFile = - "jobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":" + TEST_YARN_JOB_MANAGER_PORT; + private static final String validPropertiesFile = "applicationID=" + TEST_YARN_APPLICATION_ID; private static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33"; http://git-wip-us.apache.org/repos/asf/flink/blob/7ab6837f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index 641182e..5d47b13 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -22,7 +22,6 @@ import org.apache.flink.client.CliFrontend; import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.RecoveryMode; @@ -302,43 +301,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor return yarnClient; } - /** - * Retrieves the Yarn application and cluster from the config - * @param config The config with entries to retrieve the cluster - * @return YarnClusterClient - * @deprecated This should be removed in the future - */ - public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration config) - throws UnsupportedOperationException { - String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null); - int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1); - - if (jobManagerHost != null && jobManagerPort != -1) { - - YarnClient yarnClient = getYarnClient(); - final List applicationReports; - try { - applicationReports = yarnClient.getApplications(); - } catch (Exception e) { - throw new RuntimeException("Couldn't get Yarn application reports", e); - } - for (ApplicationReport report : applicationReports) { - if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort) { - LOG.info("Found application '{}' " + - "with JobManager host name '{}' and port '{}' from Yarn properties file.", - report.getApplicationId(), jobManagerHost, jobManagerPort); - return retrieve(report.getApplicationId().toString()); - } - } - - } - - LOG.warn("Couldn't retrieve Yarn cluster from Flink configuration using JobManager address '{}:{}'", - jobManagerHost, jobManagerPort); - - throw new IllegalConfigurationException("Could not resume Yarn cluster from config."); - } - @Override public YarnClusterClient retrieve(String applicationID) { http://git-wip-us.apache.org/repos/asf/flink/blob/7ab6837f/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java index 126f0f1..989bee4 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java @@ -24,18 +24,18 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.PosixParser; import org.apache.commons.lang3.StringUtils; -import org.apache.flink.client.CliFrontend; -import org.apache.flink.client.ClientUtils; import org.apache.flink.client.cli.CliFrontendParser; import org.apache.flink.client.cli.CustomCommandLine; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.yarn.AbstractYarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.YarnClusterClient; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.util.ConverterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,7 +47,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; -import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -75,7 +74,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine // YARN-session related constants private static final String YARN_PROPERTIES_FILE = ".yarn-properties-"; - private static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager"; + static final String YARN_APPLICATION_ID_KEY = "applicationID"; private static final String YARN_PROPERTIES_PARALLELISM = "parallelism"; private static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString"; @@ -152,24 +151,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine /** - * Resumes from a Flink Yarn properties file + * Tries to load a Flink Yarn properties file and returns the Yarn application id if successful * @param cmdLine The command-line parameters * @param flinkConfiguration The flink configuration - * @return True if the properties were loaded, false otherwise + * @return Yarn application id or null if none could be retrieved */ - private boolean resumeFromYarnProperties(CommandLine cmdLine, Configuration flinkConfiguration) { + private String loadYarnPropertiesFile(CommandLine cmdLine, Configuration flinkConfiguration) { String jobManagerOption = cmdLine.getOptionValue(ADDRESS_OPTION.getOpt(), null); if (jobManagerOption != null) { // don't resume from properties file if a JobManager has been specified - return false; + return null; } for (Option option : cmdLine.getOptions()) { if (ALL_OPTIONS.hasOption(option.getOpt())) { if (!option.getOpt().equals(DETACHED.getOpt())) { // don't resume from properties file if yarn options have been specified - return false; + return null; } } } @@ -177,7 +176,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine // load the YARN properties File propertiesFile = getYarnPropertiesLocation(flinkConfiguration); if (!propertiesFile.exists()) { - return false; + return null; } logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath()); @@ -192,6 +191,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine throw new RuntimeException("Cannot read the YARN properties file", e); } + // get the Yarn application id from the properties file + String applicationID = yarnProperties.getProperty(YARN_APPLICATION_ID_KEY); + if (applicationID == null) { + throw new IllegalConfigurationException("Yarn properties file found but doesn't contain a " + + "Yarn applicaiton id. Please delete the file at " + propertiesFile.getAbsolutePath()); + } + + try { + // try converting id to ApplicationId + ConverterUtils.toApplicationId(applicationID); + } + catch (Exception e) { + throw new RuntimeException("YARN properties contains an invalid entry for " + + "application id: " + applicationID, e); + } + + logAndSysout("Using Yarn application id from YARN properties " + applicationID); + // configure the default parallelism from YARN String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM); if (propParallelism != null) { // maybe the property is not set @@ -207,22 +224,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine } } - // get the JobManager address from the YARN properties - String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY); - InetSocketAddress jobManagerAddress; - if (address != null) { - try { - jobManagerAddress = ClientUtils.parseHostPortAddress(address); - // store address in config from where it is retrieved by the retrieval service - CliFrontend.setJobManagerAddressInConfig(flinkConfiguration, jobManagerAddress); - } - catch (Exception e) { - throw new RuntimeException("YARN properties contain an invalid entry for JobManager address.", e); - } - - logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress); - } - // handle the YARN client's dynamic properties String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING); Map dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded); @@ -230,7 +231,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine flinkConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue()); } - return true; + return applicationID; } public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine cmd) { @@ -449,7 +450,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null); boolean yarnJobManager = ID.equals(jobManagerOption); boolean yarnAppId = commandLine.hasOption(APPLICATION_ID.getOpt()); - return yarnJobManager || yarnAppId || resumeFromYarnProperties(commandLine, configuration); + return yarnJobManager || yarnAppId || loadYarnPropertiesFile(commandLine, configuration) != null; } @Override @@ -481,10 +482,13 @@ public class FlinkYarnSessionCli implements CustomCommandLine yarnDescriptor.setFlinkConfiguration(config); return yarnDescriptor.retrieve(applicationID); // then try to load from yarn properties - } else if (resumeFromYarnProperties(cmdLine, config)) { - AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); - yarnDescriptor.setFlinkConfiguration(config); - return yarnDescriptor.retrieveFromConfig(config); + } else { + String applicationId = loadYarnPropertiesFile(cmdLine, config); + if (applicationId != null) { + AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor(); + yarnDescriptor.setFlinkConfiguration(config); + return yarnDescriptor.retrieve(applicationId); + } } throw new UnsupportedOperationException("Could not resume a Yarn cluster."); @@ -581,7 +585,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine File yarnPropertiesFile = getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration()); Properties yarnProps = new Properties(); - yarnProps.setProperty(YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress); + yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnCluster.getApplicationId().toString()); if (yarnDescriptor.getTaskManagerSlots() != -1) { String parallelism = Integer.toString(yarnDescriptor.getTaskManagerSlots() * yarnDescriptor.getTaskManagerCount());