flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From m..@apache.org
Subject [3/3] flink git commit: [FLINK-4144] Yarn properties file: replace hostname/port with Yarn application id
Date Fri, 01 Jul 2016 18:11:42 GMT
[FLINK-4144] Yarn properties file: replace hostname/port with Yarn application id

This closes #2191


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7ab6837f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7ab6837f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7ab6837f

Branch: refs/heads/master
Commit: 7ab6837fde3adb588273ef6bb8f4f7a215fe9c03
Parents: f722b73
Author: Maximilian Michels <mxm@apache.org>
Authored: Fri Jul 1 18:54:44 2016 +0200
Committer: Maximilian Michels <mxm@apache.org>
Committed: Fri Jul 1 20:12:46 2016 +0200

----------------------------------------------------------------------
 ...CliFrontendYarnAddressConfigurationTest.java |  3 +-
 .../yarn/AbstractYarnClusterDescriptor.java     | 38 -----------
 .../flink/yarn/cli/FlinkYarnSessionCli.java     | 70 +++++++++++---------
 3 files changed, 38 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7ab6837f/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index a99c835..c3328a2 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -109,8 +109,7 @@ public class CliFrontendYarnAddressConfigurationTest {
 	private static final ApplicationId TEST_YARN_APPLICATION_ID =
 		ApplicationId.newInstance(System.currentTimeMillis(), 42);
 
-	private static final String validPropertiesFile =
-		"jobManager=" + TEST_YARN_JOB_MANAGER_ADDRESS + ":" + TEST_YARN_JOB_MANAGER_PORT;
+	private static final String validPropertiesFile = "applicationID=" + TEST_YARN_APPLICATION_ID;
 
 
 	private static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33";

http://git-wip-us.apache.org/repos/asf/flink/blob/7ab6837f/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 641182e..5d47b13 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,7 +22,6 @@ import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.RecoveryMode;
 
@@ -302,43 +301,6 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		return yarnClient;
 	}
 
-	/**
-	 * Retrieves the Yarn application and cluster from the config
-	 * @param config The config with entries to retrieve the cluster
-	 * @return YarnClusterClient
-	 * @deprecated This should be removed in the future
-	 */
-	public YarnClusterClient retrieveFromConfig(org.apache.flink.configuration.Configuration
config)
-			throws UnsupportedOperationException {
-		String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);
-		int jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
-
-		if (jobManagerHost != null && jobManagerPort != -1) {
-
-			YarnClient yarnClient = getYarnClient();
-			final List<ApplicationReport> applicationReports;
-			try {
-				applicationReports = yarnClient.getApplications();
-			} catch (Exception e) {
-				throw new RuntimeException("Couldn't get Yarn application reports", e);
-			}
-			for (ApplicationReport report : applicationReports) {
-				if (report.getHost().equals(jobManagerHost) && report.getRpcPort() == jobManagerPort)
{
-					LOG.info("Found application '{}' " +
-						"with JobManager host name '{}' and port '{}' from Yarn properties file.",
-						report.getApplicationId(), jobManagerHost, jobManagerPort);
-					return retrieve(report.getApplicationId().toString());
-				}
-			}
-
-		}
-
-		LOG.warn("Couldn't retrieve Yarn cluster from Flink configuration using JobManager address
'{}:{}'",
-			jobManagerHost, jobManagerPort);
-
-		throw new IllegalConfigurationException("Could not resume Yarn cluster from config.");
-	}
-
 	@Override
 	public YarnClusterClient retrieve(String applicationID) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7ab6837f/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 126f0f1..989bee4 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -24,18 +24,18 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.client.CliFrontend;
-import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.yarn.AbstractYarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.YarnClusterClient;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +47,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
-import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -75,7 +74,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 	// YARN-session related constants
 	private static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
-	private static final String YARN_PROPERTIES_JOBMANAGER_KEY = "jobManager";
+	static final String YARN_APPLICATION_ID_KEY = "applicationID";
 	private static final String YARN_PROPERTIES_PARALLELISM = "parallelism";
 	private static final String YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING = "dynamicPropertiesString";
 
@@ -152,24 +151,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 
 
 	/**
-	 * Resumes from a Flink Yarn properties file
+	 * Tries to load a Flink Yarn properties file and returns the Yarn application id if successful
 	 * @param cmdLine The command-line parameters
 	 * @param flinkConfiguration The flink configuration
-	 * @return True if the properties were loaded, false otherwise
+	 * @return Yarn application id or null if none could be retrieved
 	 */
-	private boolean resumeFromYarnProperties(CommandLine cmdLine, Configuration flinkConfiguration)
{
+	private String loadYarnPropertiesFile(CommandLine cmdLine, Configuration flinkConfiguration)
{
 
 		String jobManagerOption = cmdLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
 		if (jobManagerOption != null) {
 			// don't resume from properties file if a JobManager has been specified
-			return false;
+			return null;
 		}
 
 		for (Option option : cmdLine.getOptions()) {
 			if (ALL_OPTIONS.hasOption(option.getOpt())) {
 				if (!option.getOpt().equals(DETACHED.getOpt())) {
 					// don't resume from properties file if yarn options have been specified
-					return false;
+					return null;
 				}
 			}
 		}
@@ -177,7 +176,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		// load the YARN properties
 		File propertiesFile = getYarnPropertiesLocation(flinkConfiguration);
 		if (!propertiesFile.exists()) {
-			return false;
+			return null;
 		}
 
 		logAndSysout("Found YARN properties file " + propertiesFile.getAbsolutePath());
@@ -192,6 +191,24 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			throw new RuntimeException("Cannot read the YARN properties file", e);
 		}
 
+		// get the Yarn application id from the properties file
+		String applicationID = yarnProperties.getProperty(YARN_APPLICATION_ID_KEY);
+		if (applicationID == null) {
+			throw new IllegalConfigurationException("Yarn properties file found but doesn't contain
a " +
+				"Yarn applicaiton id. Please delete the file at " + propertiesFile.getAbsolutePath());
+		}
+
+		try {
+			// try converting id to ApplicationId
+			ConverterUtils.toApplicationId(applicationID);
+		}
+		catch (Exception e) {
+			throw new RuntimeException("YARN properties contains an invalid entry for " +
+				"application id: " + applicationID, e);
+		}
+
+		logAndSysout("Using Yarn application id from YARN properties " + applicationID);
+
 		// configure the default parallelism from YARN
 		String propParallelism = yarnProperties.getProperty(YARN_PROPERTIES_PARALLELISM);
 		if (propParallelism != null) { // maybe the property is not set
@@ -207,22 +224,6 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			}
 		}
 
-		// get the JobManager address from the YARN properties
-		String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
-		InetSocketAddress jobManagerAddress;
-		if (address != null) {
-			try {
-				jobManagerAddress = ClientUtils.parseHostPortAddress(address);
-				// store address in config from where it is retrieved by the retrieval service
-				CliFrontend.setJobManagerAddressInConfig(flinkConfiguration, jobManagerAddress);
-			}
-			catch (Exception e) {
-				throw new RuntimeException("YARN properties contain an invalid entry for JobManager address.",
e);
-			}
-
-			logAndSysout("Using JobManager address from YARN properties " + jobManagerAddress);
-		}
-
 		// handle the YARN client's dynamic properties
 		String dynamicPropertiesEncoded = yarnProperties.getProperty(YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING);
 		Map<String, String> dynamicProperties = getDynamicProperties(dynamicPropertiesEncoded);
@@ -230,7 +231,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			flinkConfiguration.setString(dynamicProperty.getKey(), dynamicProperty.getValue());
 		}
 
-		return true;
+		return applicationID;
 	}
 
 	public AbstractYarnClusterDescriptor createDescriptor(String defaultApplicationName, CommandLine
cmd) {
@@ -449,7 +450,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 		String jobManagerOption = commandLine.getOptionValue(ADDRESS_OPTION.getOpt(), null);
 		boolean yarnJobManager = ID.equals(jobManagerOption);
 		boolean yarnAppId = commandLine.hasOption(APPLICATION_ID.getOpt());
-		return yarnJobManager || yarnAppId || resumeFromYarnProperties(commandLine, configuration);
+		return yarnJobManager || yarnAppId || loadYarnPropertiesFile(commandLine, configuration)
!= null;
 	}
 
 	@Override
@@ -481,10 +482,13 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			yarnDescriptor.setFlinkConfiguration(config);
 			return yarnDescriptor.retrieve(applicationID);
 		// then try to load from yarn properties
-		} else if (resumeFromYarnProperties(cmdLine, config)) {
-			AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
-			yarnDescriptor.setFlinkConfiguration(config);
-			return yarnDescriptor.retrieveFromConfig(config);
+		} else {
+			String applicationId = loadYarnPropertiesFile(cmdLine, config);
+			if (applicationId != null) {
+				AbstractYarnClusterDescriptor yarnDescriptor = getClusterDescriptor();
+				yarnDescriptor.setFlinkConfiguration(config);
+				return yarnDescriptor.retrieve(applicationId);
+			}
 		}
 
 		throw new UnsupportedOperationException("Could not resume a Yarn cluster.");
@@ -581,7 +585,7 @@ public class FlinkYarnSessionCli implements CustomCommandLine<YarnClusterClient>
 			File yarnPropertiesFile = getYarnPropertiesLocation(yarnCluster.getFlinkConfiguration());
 
 			Properties yarnProps = new Properties();
-			yarnProps.setProperty(YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);
+			yarnProps.setProperty(YARN_APPLICATION_ID_KEY, yarnCluster.getApplicationId().toString());
 			if (yarnDescriptor.getTaskManagerSlots() != -1) {
 				String parallelism =
 						Integer.toString(yarnDescriptor.getTaskManagerSlots() * yarnDescriptor.getTaskManagerCount());


Mime
View raw message