flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hsapu...@apache.org
Subject flink git commit: Small changes to make code more consistent.
Date Fri, 20 Mar 2015 17:41:27 GMT
Repository: flink
Updated Branches:
  refs/heads/master 79000c85b -> 3838dd19c


Small changes to make code more consistent.

Change System.err to System.out calls for regular flow messages in FlinkYarnSessionCli.
Code style add spaces between if-else and open parentheses and curly braces.
Wrap very long lines in some classes.
Remove unnecessary return statement at the end of a method.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3838dd19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3838dd19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3838dd19

Branch: refs/heads/master
Commit: 3838dd19c5c59efe8d19ee177e0180edb2995a8b
Parents: 79000c8
Author: Henry Saputra <henry.saputra@gmail.com>
Authored: Fri Mar 20 10:37:10 2015 -0700
Committer: Henry Saputra <henry.saputra@gmail.com>
Committed: Fri Mar 20 10:37:10 2015 -0700

----------------------------------------------------------------------
 .../flink/client/FlinkYarnSessionCli.java       | 92 +++++++++++---------
 .../runtime/operators/RegularPactTask.java      |  3 +-
 .../flink/yarn/ApplicationMasterActor.scala     |  4 +-
 3 files changed, 53 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3838dd19/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
index 8e632f1..340b878 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java
@@ -100,11 +100,11 @@ public class FlinkYarnSessionCli {
 	public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
 
 		AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
-		if(flinkYarnClient == null) {
+		if (flinkYarnClient == null) {
 			return null;
 		}
 
-		if(!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option!
+		if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option!
 			LOG.error("Missing required argument " + CONTAINER.getOpt());
 			printUsage();
 			return null;
@@ -113,7 +113,7 @@ public class FlinkYarnSessionCli {
 
 		// Jar Path
 		Path localJarPath;
-		if(cmd.hasOption(FLINK_JAR.getOpt())) {
+		if (cmd.hasOption(FLINK_JAR.getOpt())) {
 			String userPath = cmd.getOptionValue(FLINK_JAR.getOpt());
 			if(!userPath.startsWith("file://")) {
 				userPath = "file://" + userPath;
@@ -139,7 +139,7 @@ public class FlinkYarnSessionCli {
 		flinkYarnClient.setFlinkConfigurationObject(flinkConfiguration);
 		flinkYarnClient.setConfigurationDirectory(confDirPath);
 		File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME);
-		if(!confFile.exists()) {
+		if (!confFile.exists()) {
 			LOG.error("Unable to locate configuration file in "+confFile);
 			return null;
 		}
@@ -149,10 +149,10 @@ public class FlinkYarnSessionCli {
 
 		List<File> shipFiles = new ArrayList<File>();
 		// path to directory to ship
-		if(cmd.hasOption(SHIP_PATH.getOpt())) {
+		if (cmd.hasOption(SHIP_PATH.getOpt())) {
 			String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt());
 			File shipDir = new File(shipPath);
-			if(shipDir.isDirectory()) {
+			if (shipDir.isDirectory()) {
 				shipFiles = new ArrayList<File>(Arrays.asList(shipDir.listFiles(new FilenameFilter()
{
 					@Override
 					public boolean accept(File dir, String name) {
@@ -165,19 +165,19 @@ public class FlinkYarnSessionCli {
 		}
 
 		//check if there is a logback or log4j file
-		if(confDirPath.length() > 0) {
+		if (confDirPath.length() > 0) {
 			File logback = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME);
-			if(logback.exists()) {
+			if (logback.exists()) {
 				shipFiles.add(logback);
 				flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(logback.toURI()));
 			}
 			File log4j = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOG4J_NAME);
-			if(log4j.exists()) {
+			if (log4j.exists()) {
 				shipFiles.add(log4j);
-				if(flinkYarnClient.getFlinkLoggingConfigurationPath() != null) {
+				if (flinkYarnClient.getFlinkLoggingConfigurationPath() != null) {
 					// this means there is already a logback configuration file --> fail
-					LOG.warn("The configuration directory ('" + confDirPath + "') contains both LOG4J and
Logback configuration files." +
-							"Please delete or rename one of them.");
+					LOG.warn("The configuration directory ('" + confDirPath + "') contains both LOG4J and
" +
+							"Logback configuration files. Please delete or rename one of them.");
 				} // else
 				flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(log4j.toURI()));
 			}
@@ -186,36 +186,37 @@ public class FlinkYarnSessionCli {
 		flinkYarnClient.setShipFiles(shipFiles);
 
 		// queue
-		if(cmd.hasOption(QUEUE.getOpt())) {
+		if (cmd.hasOption(QUEUE.getOpt())) {
 			flinkYarnClient.setQueue(cmd.getOptionValue(QUEUE.getOpt()));
 		}
 
 		// JobManager Memory
-		if(cmd.hasOption(JM_MEMORY.getOpt())) {
+		if (cmd.hasOption(JM_MEMORY.getOpt())) {
 			int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt()));
 			flinkYarnClient.setJobManagerMemory(jmMemory);
 		}
 
 		// Task Managers memory
-		if(cmd.hasOption(TM_MEMORY.getOpt())) {
+		if (cmd.hasOption(TM_MEMORY.getOpt())) {
 			int tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt()));
 			flinkYarnClient.setTaskManagerMemory(tmMemory);
 		}
 
-		if(cmd.hasOption(SLOTS.getOpt())) {
+		if (cmd.hasOption(SLOTS.getOpt())) {
 			int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt()));
 			flinkYarnClient.setTaskManagerSlots(slots);
 		}
 
 		String[] dynamicProperties = null;
-		if(cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
+		if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) {
 			dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt());
 		}
-		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
+		String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties,
+				CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR);
 
 		flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded);
 
-		if(cmd.hasOption(DETACHED.getOpt())) {
+		if (cmd.hasOption(DETACHED.getOpt())) {
 			detachedMode = true;
 			flinkYarnClient.setDetachedMode(detachedMode);
 		}
@@ -248,11 +249,14 @@ public class FlinkYarnSessionCli {
 	public static AbstractFlinkYarnClient getFlinkYarnClient() {
 		AbstractFlinkYarnClient yarnClient = null;
 		try {
-			Class<? extends AbstractFlinkYarnClient> yarnClientClass = Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class);
+			Class<? extends AbstractFlinkYarnClient> yarnClientClass =
+					Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class);
 			yarnClient = InstantiationUtil.instantiate(yarnClientClass, AbstractFlinkYarnClient.class);
 		}
 		catch (ClassNotFoundException e) {
-			System.err.println("Unable to locate the Flink YARN Client. Please ensure that you are
using a Flink build with Hadoop2/YARN support. Message: "+e.getMessage());
+			System.err.println("Unable to locate the Flink YARN Client. " +
+					"Please ensure that you are using a Flink build with Hadoop2/YARN support. Message:
" +
+					e.getMessage());
 			e.printStackTrace(System.err);
 			return null; // make it obvious
 		}
@@ -281,21 +285,21 @@ public class FlinkYarnSessionCli {
 				// ------------------ check if there are updates by the cluster -----------
 
 				FlinkYarnClusterStatus status = yarnCluster.getClusterStatus();
-				if(status != null && numTaskmanagers != status.getNumberOfTaskManagers()) {
-					System.err.println("Number of connected TaskManagers changed to "+status.getNumberOfTaskManagers()+".
"
-							+ "Slots available: "+status.getNumberOfSlots());
+				if (status != null && numTaskmanagers != status.getNumberOfTaskManagers()) {
+					System.err.println("Number of connected TaskManagers changed to " +
+							status.getNumberOfTaskManagers() + ". Slots available: " +  status.getNumberOfSlots());
 					numTaskmanagers = status.getNumberOfTaskManagers();
 				}
 
 				List<String> messages = yarnCluster.getNewMessages();
-				if(messages != null && messages.size() > 0) {
+				if (messages != null && messages.size() > 0) {
 					System.err.println("New messages from the YARN cluster: ");
 					for(String msg : messages) {
 						System.err.println(msg);
 					}
 				}
 
-				if(yarnCluster.hasFailed()) {
+				if (yarnCluster.hasFailed()) {
 					System.err.println("The YARN cluster has failed");
 					yarnCluster.shutdown();
 				}
@@ -310,22 +314,21 @@ public class FlinkYarnSessionCli {
 
 				if (in.ready()) {
 					String command = in.readLine();
-					if(command.equals("quit") || command.equals("stop")) {
+					if (command.equals("quit") || command.equals("stop")) {
 						break; // leave loop, cli will stop cluster.
-					} else if(command.equals("help"))  {
+					} else if (command.equals("help"))  {
 						System.err.println(HELP);
 					} else {
 						System.err.println("Unknown command '"+command+"'. Showing help: \n"+HELP);
 					}
 				}
-				if(yarnCluster.hasBeenStopped()) {
+				if (yarnCluster.hasBeenStopped()) {
 					LOG.info("Stopping interactive command line interface, YARN cluster has been stopped.");
 					break;
 				}
 			}
 		} catch(Exception e) {
 			LOG.warn("Exception while running the interactive command line interface", e);
-			return;
 		}
 	}
 
@@ -366,7 +369,7 @@ public class FlinkYarnSessionCli {
 		}
 
 		// Query cluster for metrics
-		if(cmd.hasOption(QUERY.getOpt())) {
+		if (cmd.hasOption(QUERY.getOpt())) {
 			AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
 			String description = null;
 			try {
@@ -381,7 +384,7 @@ public class FlinkYarnSessionCli {
 		} else {
 			AbstractFlinkYarnClient flinkYarnClient = createFlinkYarnClient(cmd);
 
-			if(flinkYarnClient == null) {
+			if (flinkYarnClient == null) {
 				System.err.println("Error while starting the YARN Client. Please check log output!");
 				return 1;
 			}
@@ -395,32 +398,36 @@ public class FlinkYarnSessionCli {
 				return 1;
 			}
 			//------------------ Cluster deployed, handle connection details
-			String jobManagerAddress = yarnCluster.getJobManagerAddress().getHostName() + ":" +yarnCluster.getJobManagerAddress().getPort();
-			System.err.println("Flink JobManager is now running on " + jobManagerAddress);
-			System.err.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
+			String jobManagerAddress = yarnCluster.getJobManagerAddress().getHostName() + ":" + yarnCluster.getJobManagerAddress().getPort();
+			System.out.println("Flink JobManager is now running on " + jobManagerAddress);
+			System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
 			// file that we write into the conf/ dir containing the jobManager address and the dop.
 			String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv();
 			File yarnPropertiesFile = new File(confDirPath + File.separator + CliFrontend.YARN_PROPERTIES_FILE);
 
 			Properties yarnProps = new Properties();
 			yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress);
-			if(flinkYarnClient.getTaskManagerSlots() != -1) {
-				yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DOP, Integer.toString(flinkYarnClient.getTaskManagerSlots()
* flinkYarnClient.getTaskManagerCount()) );
+			if (flinkYarnClient.getTaskManagerSlots() != -1) {
+				String degreeOfParallelism =
+						Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount());
+				yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DOP, degreeOfParallelism);
 			}
 			// add dynamic properties
-			if(flinkYarnClient.getDynamicPropertiesEncoded() != null) {
-				yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, flinkYarnClient.getDynamicPropertiesEncoded());
+			if (flinkYarnClient.getDynamicPropertiesEncoded() != null) {
+				yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING,
+						flinkYarnClient.getDynamicPropertiesEncoded());
 			}
 			writeYarnProperties(yarnProps, yarnPropertiesFile);
 
 			//------------------ Cluster running, let user control it ------------
 
-			if(detachedMode) {
+			if (detachedMode) {
 				// print info and quit:
 				LOG.info("The Flink YARN client has been started in detached mode. In order to stop"
+
 						"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
 						"yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
-						"Please also note that the temporary files of the YARN session in {} will not be removed.",
flinkYarnClient.getSessionFilesDir());
+						"Please also note that the temporary files of the YARN session in {} will not be removed.",
+						flinkYarnClient.getSessionFilesDir());
 			} else {
 				runInteractiveCli(yarnCluster);
 
@@ -443,10 +450,9 @@ public class FlinkYarnSessionCli {
 	 * Utility method for tests.
 	 */
 	public void stop() {
-		if(yarnCluster != null) {
+		if (yarnCluster != null) {
 			LOG.info("Command line interface is shutting down the yarnCluster");
 			yarnCluster.shutdown();
 		}
 	}
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/3838dd19/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
index 465acb8..081d498 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java
@@ -513,7 +513,8 @@ public class RegularPactTask<S extends Function, OT> extends AbstractInvokable
i
 			if (this.stub != null) {
 				// collect the counters from the stub
 				if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null)
{
-					Map<String, Accumulator<?, ?>> accumulators = FunctionUtils.getFunctionRuntimeContext(this.stub,
this.runtimeUdfContext).getAllAccumulators();
+					Map<String, Accumulator<?, ?>> accumulators =
+							FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators();
 					RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks);
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/3838dd19/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
index 6a6a6e4..f88685c 100644
--- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
+++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala
@@ -311,10 +311,10 @@ trait ApplicationMasterActor extends ActorLogMessages {
   }
 
   private def runningContainerIds(): mutable.MutableList[ContainerId] = {
-    return runningContainersList map { runningCont => runningCont.getId}
+    runningContainersList map { runningCont => runningCont.getId}
   }
   private def allocatedContainerIds(): mutable.MutableList[ContainerId] = {
-    return allocatedContainersList map { runningCont => runningCont.getId}
+    allocatedContainersList map { runningCont => runningCont.getId}
   }
 
   private def startYarnSession(conf: Configuration,


Mime
View raw message