flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-3712] Make all dynamic properties available to the CLI frontend
Date Mon, 11 Apr 2016 09:41:51 GMT
Repository: flink
Updated Branches:
  refs/heads/flink3712 [created] b368cb2d5


[FLINK-3712] Make all dynamic properties available to the CLI frontend

This closes #1863


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b368cb2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b368cb2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b368cb2d

Branch: refs/heads/flink3712
Commit: b368cb2d5c3bc67f1fa82f9c2b77b46ce1684962
Parents: a234719
Author: Robert Metzger <rmetzger@apache.org>
Authored: Thu Apr 7 16:44:48 2016 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Mon Apr 11 11:41:12 2016 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/client/CliFrontend.java |  6 +++++-
 .../apache/flink/client/FlinkYarnSessionCli.java  |  2 +-
 .../StandaloneLeaderRetrievalService.java         |  2 ++
 .../runtime/yarn/AbstractFlinkYarnClient.java     |  5 ++++-
 .../org/apache/flink/api/scala/FlinkShell.scala   |  2 +-
 .../flink/yarn/YARNHighAvailabilityITCase.java    |  2 +-
 .../apache/flink/yarn/YARNSessionFIFOITCase.java  |  2 +-
 .../apache/flink/yarn/FlinkYarnClientBase.java    | 18 ++++++++++++++----
 8 files changed, 29 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index b5dfbe5..6d972bc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -1044,6 +1044,9 @@ public class CliFrontend {
 
 			jobManagerAddress = yarnCluster.getJobManagerAddress();
 			writeJobManagerAddressToConfig(jobManagerAddress);
+			
+			// overwrite the yarn client config (because the client parses the dynamic properties)
+			this.config.addAll(flinkYarnClient.getFlinkConfiguration());
 
 			logAndSysout("YARN cluster started");
 			logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL());
@@ -1180,8 +1183,9 @@ public class CliFrontend {
 					catch (Exception e) {
 						return handleError(e);
 					}
+				} else {
+					return run(params);
 				}
-				return run(params);
 			case ACTION_LIST:
 				return list(params);
 			case ACTION_INFO:

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 94de5c4..91f8df2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -142,7 +142,7 @@ public class FlinkYarnSessionCli {
 		String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
 		GlobalConfiguration.loadConfiguration(confDirPath);
 		Configuration flinkConfiguration = GlobalConfiguration.getConfiguration();
-		flinkYarnClient.setFlinkConfigurationObject(flinkConfiguration);
+		flinkYarnClient.setFlinkConfiguration(flinkConfiguration);
 		flinkYarnClient.setConfigurationDirectory(confDirPath);
 		File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME);
 		if (!confFile.exists()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
index dbab41c..1be879c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderretrieval/StandaloneLeaderRetrievalService.java
@@ -44,6 +44,7 @@ public class StandaloneLeaderRetrievalService implements LeaderRetrievalService
 		this.jobManagerAddress = jobManagerAddress;
 	}
 
+	@Override
 	public void start(LeaderRetrievalListener listener) {
 		Preconditions.checkNotNull(listener, "Listener must not be null.");
 		Preconditions.checkState(leaderListener == null, "StandaloneLeaderRetrievalService can
" +
@@ -55,5 +56,6 @@ public class StandaloneLeaderRetrievalService implements LeaderRetrievalService
 		leaderListener.notifyLeaderAddress(jobManagerAddress, null);
 	}
 
+	@Override
 	public void stop() {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
index 83a976d..c1498c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/yarn/AbstractFlinkYarnClient.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.runtime.yarn;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.hadoop.fs.Path;
 import java.io.File;
 import java.util.List;
@@ -43,7 +44,9 @@ public abstract class AbstractFlinkYarnClient {
 	/**
 	 * Flink configuration
 	 */
-	public abstract void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration
conf);
+	public abstract void setFlinkConfiguration(org.apache.flink.configuration.Configuration
conf);
+
+	public abstract Configuration getFlinkConfiguration();
 
 	/**
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
----------------------------------------------------------------------
diff --git a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
index 6937e1b..2c2fbb3 100644
--- a/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
+++ b/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala
@@ -223,7 +223,7 @@ object FlinkShell {
     val confFile = new File(confDirPath + File.separator + "flink-conf.yaml")
     val confPath = new Path(confFile.getAbsolutePath)
     GlobalConfiguration.loadConfiguration(confDirPath)
-    yarnClient.setFlinkConfigurationObject(flinkConfiguration)
+    yarnClient.setFlinkConfiguration(flinkConfiguration)
     yarnClient.setConfigurationDirectory(confDirPath)
     yarnClient.setConfigurationFilePath(confPath)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index f68b141..a93abf0 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -111,7 +111,7 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 
 		String fsStateHandlePath = tmp.getRoot().getPath();
 
-		flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
+		flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
 		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum="
+
 			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts
+
 			"@@" + ConfigConstants.STATE_BACKEND + "=FILESYSTEM" +

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index d713b73..cb402a3 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -227,7 +227,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 		String confDirPath = System.getenv("FLINK_CONF_DIR");
 		flinkYarnClient.setConfigurationDirectory(confDirPath);
-		flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
+		flinkYarnClient.setFlinkConfiguration(GlobalConfiguration.getConfiguration());
 		flinkYarnClient.setConfigurationFilePath(new Path(confDirPath + File.separator + "flink-conf.yaml"));
 
 		// deploy

http://git-wip-us.apache.org/repos/asf/flink/blob/b368cb2d/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
index ef02be3..6f81d09 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClientBase.java
@@ -124,7 +124,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 
 	private String dynamicPropertiesEncoded;
 
-	private List<File> shipFiles = new ArrayList<File>();
+	private List<File> shipFiles = new ArrayList<>();
 	private org.apache.flink.configuration.Configuration flinkConfiguration;
 
 	private boolean detached;
@@ -174,11 +174,16 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 	}
 
 	@Override
-	public void setFlinkConfigurationObject(org.apache.flink.configuration.Configuration conf)
{
+	public void setFlinkConfiguration(org.apache.flink.configuration.Configuration conf) {
 		this.flinkConfiguration = conf;
 	}
 
 	@Override
+	public org.apache.flink.configuration.Configuration getFlinkConfiguration() {
+		return flinkConfiguration;
+	}
+
+	@Override
 	public void setTaskManagerSlots(int slots) {
 		if(slots <= 0) {
 			throw new IllegalArgumentException("Number of TaskManager slots must be positive");
@@ -209,6 +214,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 		flinkConfigurationPath = confPath;
 	}
 
+	@Override
 	public void setConfigurationDirectory(String configurationDirectory) {
 		this.configurationDirectory = configurationDirectory;
 	}
@@ -247,6 +253,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 		}
 	}
 
+	@Override
 	public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {
 		this.dynamicPropertiesEncoded = dynamicPropertiesEncoded;
 	}
@@ -303,6 +310,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 		return detached;
 	}
 
+	@Override
 	public AbstractFlinkYarnCluster deploy() throws Exception {
 
 		UserGroupInformation.setConfiguration(conf);
@@ -542,7 +550,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 		LocalResource flinkConf = Records.newRecord(LocalResource.class);
 		Path remotePathJar = Utils.setupLocalResource(fs, appId.toString(), flinkJarPath, appMasterJar,
fs.getHomeDirectory());
 		Path remotePathConf = Utils.setupLocalResource(fs, appId.toString(), flinkConfigurationPath,
flinkConf, fs.getHomeDirectory());
-		Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(2);
+		Map<String, LocalResource> localResources = new HashMap<>(2);
 		localResources.put("flink.jar", appMasterJar);
 		localResources.put("flink-conf.yaml", flinkConf);
 
@@ -578,7 +586,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 		fs.close();
 
 		// Setup CLASSPATH for ApplicationMaster
-		Map<String, String> appMasterEnv = new HashMap<String, String>();
+		Map<String, String> appMasterEnv = new HashMap<>();
 		// set user specified app master environment variables
 		appMasterEnv.putAll(Utils.getEnvironmentVariables(ConfigConstants.YARN_APPLICATION_MASTER_ENV_PREFIX,
flinkConfiguration));
 		// set classpath from YARN configuration
@@ -728,6 +736,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 		return new ClusterResourceDescription(totalFreeMemory, containerLimit, nodeManagersFree);
 	}
 
+	@Override
 	public String getClusterDescription() throws Exception {
 
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -763,6 +772,7 @@ public abstract class FlinkYarnClientBase extends AbstractFlinkYarnClient
{
 		return baos.toString();
 	}
 
+	@Override
 	public String getSessionFilesDir() {
 		return sessionFilesDir.toString();
 	}


Mime
View raw message