Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 28B82175D5 for ; Fri, 20 Mar 2015 17:41:28 +0000 (UTC) Received: (qmail 58898 invoked by uid 500); 20 Mar 2015 17:41:28 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 58865 invoked by uid 500); 20 Mar 2015 17:41:27 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 58856 invoked by uid 99); 20 Mar 2015 17:41:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 20 Mar 2015 17:41:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AA8E3E1102; Fri, 20 Mar 2015 17:41:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: hsaputra@apache.org To: commits@flink.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: Small changes to make code more consistent. Date: Fri, 20 Mar 2015 17:41:27 +0000 (UTC) Repository: flink Updated Branches: refs/heads/master 79000c85b -> 3838dd19c Small changes to make code more consistent. Change System.err to System.out calls for regular flow messages in FlinkYarnSessionCli. Code style add spaces between if-else and open parentheses and curly braces. Wrap very long lines in some classes. Remove unnecessary return statement at the end of a method. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3838dd19 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3838dd19 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3838dd19 Branch: refs/heads/master Commit: 3838dd19c5c59efe8d19ee177e0180edb2995a8b Parents: 79000c8 Author: Henry Saputra Authored: Fri Mar 20 10:37:10 2015 -0700 Committer: Henry Saputra Committed: Fri Mar 20 10:37:10 2015 -0700 ---------------------------------------------------------------------- .../flink/client/FlinkYarnSessionCli.java | 92 +++++++++++--------- .../runtime/operators/RegularPactTask.java | 3 +- .../flink/yarn/ApplicationMasterActor.scala | 4 +- 3 files changed, 53 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3838dd19/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java index 8e632f1..340b878 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java +++ b/flink-clients/src/main/java/org/apache/flink/client/FlinkYarnSessionCli.java @@ -100,11 +100,11 @@ public class FlinkYarnSessionCli { public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) { AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient(); - if(flinkYarnClient == null) { + if (flinkYarnClient == null) { return null; } - if(!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option! + if (!cmd.hasOption(CONTAINER.getOpt())) { // number of containers is required option! LOG.error("Missing required argument " + CONTAINER.getOpt()); printUsage(); return null; @@ -113,7 +113,7 @@ public class FlinkYarnSessionCli { // Jar Path Path localJarPath; - if(cmd.hasOption(FLINK_JAR.getOpt())) { + if (cmd.hasOption(FLINK_JAR.getOpt())) { String userPath = cmd.getOptionValue(FLINK_JAR.getOpt()); if(!userPath.startsWith("file://")) { userPath = "file://" + userPath; @@ -139,7 +139,7 @@ public class FlinkYarnSessionCli { flinkYarnClient.setFlinkConfigurationObject(flinkConfiguration); flinkYarnClient.setConfigurationDirectory(confDirPath); File confFile = new File(confDirPath + File.separator + CONFIG_FILE_NAME); - if(!confFile.exists()) { + if (!confFile.exists()) { LOG.error("Unable to locate configuration file in "+confFile); return null; } @@ -149,10 +149,10 @@ public class FlinkYarnSessionCli { List shipFiles = new ArrayList(); // path to directory to ship - if(cmd.hasOption(SHIP_PATH.getOpt())) { + if (cmd.hasOption(SHIP_PATH.getOpt())) { String shipPath = cmd.getOptionValue(SHIP_PATH.getOpt()); File shipDir = new File(shipPath); - if(shipDir.isDirectory()) { + if (shipDir.isDirectory()) { shipFiles = new ArrayList(Arrays.asList(shipDir.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { @@ -165,19 +165,19 @@ public class FlinkYarnSessionCli { } //check if there is a logback or log4j file - if(confDirPath.length() > 0) { + if (confDirPath.length() > 0) { File logback = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOGBACK_NAME); - if(logback.exists()) { + if (logback.exists()) { shipFiles.add(logback); flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(logback.toURI())); } File log4j = new File(confDirPath + File.pathSeparator + CONFIG_FILE_LOG4J_NAME); - if(log4j.exists()) { + if (log4j.exists()) { shipFiles.add(log4j); - if(flinkYarnClient.getFlinkLoggingConfigurationPath() != null) { + if (flinkYarnClient.getFlinkLoggingConfigurationPath() != null) { // this means there is already a logback configuration file --> fail - LOG.warn("The configuration directory ('" + confDirPath + "') contains both LOG4J and Logback configuration files." + - "Please delete or rename one of them."); + LOG.warn("The configuration directory ('" + confDirPath + "') contains both LOG4J and " + + "Logback configuration files. Please delete or rename one of them."); } // else flinkYarnClient.setFlinkLoggingConfigurationPath(new Path(log4j.toURI())); } @@ -186,36 +186,37 @@ public class FlinkYarnSessionCli { flinkYarnClient.setShipFiles(shipFiles); // queue - if(cmd.hasOption(QUEUE.getOpt())) { + if (cmd.hasOption(QUEUE.getOpt())) { flinkYarnClient.setQueue(cmd.getOptionValue(QUEUE.getOpt())); } // JobManager Memory - if(cmd.hasOption(JM_MEMORY.getOpt())) { + if (cmd.hasOption(JM_MEMORY.getOpt())) { int jmMemory = Integer.valueOf(cmd.getOptionValue(JM_MEMORY.getOpt())); flinkYarnClient.setJobManagerMemory(jmMemory); } // Task Managers memory - if(cmd.hasOption(TM_MEMORY.getOpt())) { + if (cmd.hasOption(TM_MEMORY.getOpt())) { int tmMemory = Integer.valueOf(cmd.getOptionValue(TM_MEMORY.getOpt())); flinkYarnClient.setTaskManagerMemory(tmMemory); } - if(cmd.hasOption(SLOTS.getOpt())) { + if (cmd.hasOption(SLOTS.getOpt())) { int slots = Integer.valueOf(cmd.getOptionValue(SLOTS.getOpt())); flinkYarnClient.setTaskManagerSlots(slots); } String[] dynamicProperties = null; - if(cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) { + if (cmd.hasOption(DYNAMIC_PROPERTIES.getOpt())) { dynamicProperties = cmd.getOptionValues(DYNAMIC_PROPERTIES.getOpt()); } - String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR); + String dynamicPropertiesEncoded = StringUtils.join(dynamicProperties, + CliFrontend.YARN_DYNAMIC_PROPERTIES_SEPARATOR); flinkYarnClient.setDynamicPropertiesEncoded(dynamicPropertiesEncoded); - if(cmd.hasOption(DETACHED.getOpt())) { + if (cmd.hasOption(DETACHED.getOpt())) { detachedMode = true; flinkYarnClient.setDetachedMode(detachedMode); } @@ -248,11 +249,14 @@ public class FlinkYarnSessionCli { public static AbstractFlinkYarnClient getFlinkYarnClient() { AbstractFlinkYarnClient yarnClient = null; try { - Class yarnClientClass = Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class); + Class yarnClientClass = + Class.forName("org.apache.flink.yarn.FlinkYarnClient").asSubclass(AbstractFlinkYarnClient.class); yarnClient = InstantiationUtil.instantiate(yarnClientClass, AbstractFlinkYarnClient.class); } catch (ClassNotFoundException e) { - System.err.println("Unable to locate the Flink YARN Client. Please ensure that you are using a Flink build with Hadoop2/YARN support. Message: "+e.getMessage()); + System.err.println("Unable to locate the Flink YARN Client. " + + "Please ensure that you are using a Flink build with Hadoop2/YARN support. Message: " + + e.getMessage()); e.printStackTrace(System.err); return null; // make it obvious } @@ -281,21 +285,21 @@ public class FlinkYarnSessionCli { // ------------------ check if there are updates by the cluster ----------- FlinkYarnClusterStatus status = yarnCluster.getClusterStatus(); - if(status != null && numTaskmanagers != status.getNumberOfTaskManagers()) { - System.err.println("Number of connected TaskManagers changed to "+status.getNumberOfTaskManagers()+". " - + "Slots available: "+status.getNumberOfSlots()); + if (status != null && numTaskmanagers != status.getNumberOfTaskManagers()) { + System.err.println("Number of connected TaskManagers changed to " + + status.getNumberOfTaskManagers() + ". Slots available: " + status.getNumberOfSlots()); numTaskmanagers = status.getNumberOfTaskManagers(); } List messages = yarnCluster.getNewMessages(); - if(messages != null && messages.size() > 0) { + if (messages != null && messages.size() > 0) { System.err.println("New messages from the YARN cluster: "); for(String msg : messages) { System.err.println(msg); } } - if(yarnCluster.hasFailed()) { + if (yarnCluster.hasFailed()) { System.err.println("The YARN cluster has failed"); yarnCluster.shutdown(); } @@ -310,22 +314,21 @@ public class FlinkYarnSessionCli { if (in.ready()) { String command = in.readLine(); - if(command.equals("quit") || command.equals("stop")) { + if (command.equals("quit") || command.equals("stop")) { break; // leave loop, cli will stop cluster. - } else if(command.equals("help")) { + } else if (command.equals("help")) { System.err.println(HELP); } else { System.err.println("Unknown command '"+command+"'. Showing help: \n"+HELP); } } - if(yarnCluster.hasBeenStopped()) { + if (yarnCluster.hasBeenStopped()) { LOG.info("Stopping interactive command line interface, YARN cluster has been stopped."); break; } } } catch(Exception e) { LOG.warn("Exception while running the interactive command line interface", e); - return; } } @@ -366,7 +369,7 @@ public class FlinkYarnSessionCli { } // Query cluster for metrics - if(cmd.hasOption(QUERY.getOpt())) { + if (cmd.hasOption(QUERY.getOpt())) { AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient(); String description = null; try { @@ -381,7 +384,7 @@ public class FlinkYarnSessionCli { } else { AbstractFlinkYarnClient flinkYarnClient = createFlinkYarnClient(cmd); - if(flinkYarnClient == null) { + if (flinkYarnClient == null) { System.err.println("Error while starting the YARN Client. Please check log output!"); return 1; } @@ -395,32 +398,36 @@ public class FlinkYarnSessionCli { return 1; } //------------------ Cluster deployed, handle connection details - String jobManagerAddress = yarnCluster.getJobManagerAddress().getHostName() + ":" +yarnCluster.getJobManagerAddress().getPort(); - System.err.println("Flink JobManager is now running on " + jobManagerAddress); - System.err.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL()); + String jobManagerAddress = yarnCluster.getJobManagerAddress().getHostName() + ":" + yarnCluster.getJobManagerAddress().getPort(); + System.out.println("Flink JobManager is now running on " + jobManagerAddress); + System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL()); // file that we write into the conf/ dir containing the jobManager address and the dop. String confDirPath = CliFrontend.getConfigurationDirectoryFromEnv(); File yarnPropertiesFile = new File(confDirPath + File.separator + CliFrontend.YARN_PROPERTIES_FILE); Properties yarnProps = new Properties(); yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_JOBMANAGER_KEY, jobManagerAddress); - if(flinkYarnClient.getTaskManagerSlots() != -1) { - yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DOP, Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount()) ); + if (flinkYarnClient.getTaskManagerSlots() != -1) { + String degreeOfParallelism = + Integer.toString(flinkYarnClient.getTaskManagerSlots() * flinkYarnClient.getTaskManagerCount()); + yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DOP, degreeOfParallelism); } // add dynamic properties - if(flinkYarnClient.getDynamicPropertiesEncoded() != null) { - yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, flinkYarnClient.getDynamicPropertiesEncoded()); + if (flinkYarnClient.getDynamicPropertiesEncoded() != null) { + yarnProps.setProperty(CliFrontend.YARN_PROPERTIES_DYNAMIC_PROPERTIES_STRING, + flinkYarnClient.getDynamicPropertiesEncoded()); } writeYarnProperties(yarnProps, yarnPropertiesFile); //------------------ Cluster running, let user control it ------------ - if(detachedMode) { + if (detachedMode) { // print info and quit: LOG.info("The Flink YARN client has been started in detached mode. In order to stop" + "Flink on YARN, use the following command or a YARN web interface to stop it:\n" + "yarn application -kill "+yarnCluster.getApplicationId()+"\n" + - "Please also note that the temporary files of the YARN session in {} will not be removed.", flinkYarnClient.getSessionFilesDir()); + "Please also note that the temporary files of the YARN session in {} will not be removed.", + flinkYarnClient.getSessionFilesDir()); } else { runInteractiveCli(yarnCluster); @@ -443,10 +450,9 @@ public class FlinkYarnSessionCli { * Utility method for tests. */ public void stop() { - if(yarnCluster != null) { + if (yarnCluster != null) { LOG.info("Command line interface is shutting down the yarnCluster"); yarnCluster.shutdown(); } } } - http://git-wip-us.apache.org/repos/asf/flink/blob/3838dd19/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java index 465acb8..081d498 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RegularPactTask.java @@ -513,7 +513,8 @@ public class RegularPactTask extends AbstractInvokable i if (this.stub != null) { // collect the counters from the stub if (FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext) != null) { - Map> accumulators = FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators(); + Map> accumulators = + FunctionUtils.getFunctionRuntimeContext(this.stub, this.runtimeUdfContext).getAllAccumulators(); RegularPactTask.reportAndClearAccumulators(getEnvironment(), accumulators, this.chainedTasks); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3838dd19/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala index 6a6a6e4..f88685c 100644 --- a/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala +++ b/flink-yarn/src/main/scala/org/apache/flink/yarn/ApplicationMasterActor.scala @@ -311,10 +311,10 @@ trait ApplicationMasterActor extends ActorLogMessages { } private def runningContainerIds(): mutable.MutableList[ContainerId] = { - return runningContainersList map { runningCont => runningCont.getId} + runningContainersList map { runningCont => runningCont.getId} } private def allocatedContainerIds(): mutable.MutableList[ContainerId] = { - return allocatedContainersList map { runningCont => runningCont.getId} + allocatedContainersList map { runningCont => runningCont.getId} } private def startYarnSession(conf: Configuration,