Return-Path: X-Original-To: apmail-flink-issues-archive@minotaur.apache.org Delivered-To: apmail-flink-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5CFF818768 for ; Mon, 18 Jan 2016 12:52:57 +0000 (UTC) Received: (qmail 2450 invoked by uid 500); 18 Jan 2016 12:52:57 -0000 Delivered-To: apmail-flink-issues-archive@flink.apache.org Received: (qmail 2404 invoked by uid 500); 18 Jan 2016 12:52:57 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 2395 invoked by uid 99); 18 Jan 2016 12:52:57 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Jan 2016 12:52:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id C6BBA1A04EC for ; Mon, 18 Jan 2016 12:52:56 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.447 X-Spam-Level: X-Spam-Status: No, score=0.447 tagged_above=-999 required=6.31 tests=[KAM_LAZY_DOMAIN_SECURITY=1, RP_MATCHES_RCVD=-0.554, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id AywmIk2TNQUT for ; Mon, 18 Jan 2016 12:52:45 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with SMTP id 292D442A21 for ; Mon, 18 Jan 2016 12:52:45 +0000 (UTC) Received: (qmail 2263 invoked by uid 99); 18 Jan 2016 12:52:44 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Jan 2016 12:52:44 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7E7DDE00FF; Mon, 18 Jan 2016 12:52:44 +0000 (UTC) From: tillrohrmann To: issues@flink.incubator.apache.org Reply-To: issues@flink.incubator.apache.org References: In-Reply-To: Subject: [GitHub] flink pull request: [FLINK-2935] [scala-shell] Allow Scala shell t... Content-Type: text/plain Message-Id: <20160118125244.7E7DDE00FF@git1-us-west.apache.org> Date: Mon, 18 Jan 2016 12:52:44 +0000 (UTC) Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1500#discussion_r49994979 --- Diff: flink-staging/flink-scala-shell/src/main/scala/org/apache/flink/api/scala/FlinkShell.scala --- @@ -61,99 +73,217 @@ object FlinkShell { ) cmd("remote") action { (_, c) => - c.copy(flinkShellExecutionMode = ExecutionMode.REMOTE) + c.copy(executionMode = ExecutionMode.REMOTE) } text("starts Flink scala shell connecting to a remote cluster\n") children( arg[String]("") action { (h, c) => - c.copy(host = h) } + c.copy(host = Some(h)) } text("remote host name as string"), arg[Int]("") action { (p, c) => - c.copy(port = p) } + c.copy(port = Some(p)) } text("remote port as integer\n"), opt[(String)]("addclasspath") abbr("a") valueName("") action { case (x, c) => val xArray = x.split(":") c.copy(externalJars = Option(xArray)) - } text("specifies additional jars to be used in Flink") + } text ("specifies additional jars to be used in Flink") ) - help("help") abbr("h") text("prints this usage text\n") + + cmd("yarn") action { + (_, c) => c.copy(executionMode = ExecutionMode.YARN, yarnConfig = None) + } text ("starts Flink scala shell connecting to a yarn cluster\n") children( + opt[Int]("container") abbr ("n") valueName ("arg") action { + (x, c) => + c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(containers = Some(x)))) + } text ("Number of YARN container to allocate (= Number of Task Managers)"), + opt[Int]("jobManagerMemory") abbr ("jm") valueName ("arg") action { + (x, c) => + c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(jobManagerMemory = Some(x)))) + } text ("Memory for JobManager Container [in MB]"), + opt[String]("name") abbr ("nm") action { + (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(name = Some(x)))) + } text ("Set a custom name for the application on YARN"), + opt[String]("queue") abbr ("qu") valueName ("") action { + (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(queue = Some(x)))) + } text ("Specify YARN queue"), + opt[Int]("slots") abbr ("s") valueName ("") action { + (x, c) => c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(slots = Some(x)))) + } text ("Number of slots per TaskManager"), + opt[Int]("taskManagerMemory") abbr ("tm") valueName ("") action { + (x, c) => + c.copy(yarnConfig = Some(ensureYarnConfig(c).copy(taskManagerMemory = Some(x)))) + } text ("Memory per TaskManager Container [in MB]"), + opt[(String)] ("addclasspath") abbr("a") valueName("") action { + case (x, c) => + val xArray = x.split(":") + c.copy(externalJars = Option(xArray)) + } text("specifies additional jars to be used in Flink\n") + ) + + help("help") abbr ("h") text ("prints this usage text\n") } // parse arguments - parser.parse (args, Config()) match { - case Some(config) => - startShell(config.host, - config.port, - config.flinkShellExecutionMode, - config.externalJars) - - case _ => System.out.println("Could not parse program arguments") + parser.parse(args, Config()) match { + case Some(config) => startShell(config) + case _ => println("Could not parse program arguments") } } + def fetchConnectionInfo( + config: Config + ): (String, Int, Option[Either[FlinkMiniCluster, AbstractFlinkYarnCluster]]) = { + config.executionMode match { + case ExecutionMode.LOCAL => // Local mode + val config = new Configuration() + config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) - def startShell( - userHost: String, - userPort: Int, - executionMode: ExecutionMode.Value, - externalJars: Option[Array[String]] = None): Unit ={ - - System.out.println("Starting Flink Shell:") - - // either port or userhost not specified by user, create new minicluster - val (host: String, port: Int, cluster: Option[LocalFlinkMiniCluster]) = - executionMode match { - case ExecutionMode.LOCAL => - val config = new Configuration() - config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 0) - val miniCluster = new LocalFlinkMiniCluster(config, false) - miniCluster.start() - val port = miniCluster.getLeaderRPCPort - System.out.println(s"\nStarting local Flink cluster (host: localhost, port: $port).\n") - ("localhost", port, Some(miniCluster)) - - case ExecutionMode.REMOTE => - if (userHost == "none" || userPort == -1) { - System.out.println("Error: or not specified!") - return - } else { - System.out.println( - s"\nConnecting to Flink cluster (host: $userHost, port: $userPort).\n") - (userHost, userPort, None) - } - - case ExecutionMode.UNDEFINED => - System.out.println("Error: please specify execution mode:") - System.out.println("[local | remote ]") - return - } + val miniCluster = new LocalFlinkMiniCluster(config, false) + miniCluster.start() - var repl: Option[FlinkILoop] = None + println("\nStarting local Flink cluster (host: localhost, " + + s"port: ${miniCluster.getLeaderRPCPort}).\n") + ("localhost", miniCluster.getLeaderRPCPort, Some(Left(miniCluster))) - try { - // custom shell - repl = Some( - bufferedReader match { + case ExecutionMode.REMOTE => // Remote mode + if (config.host.isEmpty || config.port.isEmpty) { + throw new IllegalArgumentException(" or is not specified!") + } + (config.host.get, config.port.get, None) - case Some(br) => - val out = new StringWriter() - new FlinkILoop(host, port, externalJars, bufferedReader, new JPrintWriter(out)) + case ExecutionMode.YARN => // YARN mode + config.yarnConfig match { + case Some(yarnConfig) => // if there is information for new cluster + deployNewYarnCluster(yarnConfig) + case None => // there is no information for new cluster. Then we use yarn properties. + fetchDeployedYarnClusterInfo() + } - case None => - new FlinkILoop(host, port, externalJars) - }) + case ExecutionMode.UNDEFINED => // Wrong input + throw new IllegalArgumentException("please specify execution mode:\n" + + "[local | remote | yarn]") + } + } - val settings = new Settings() + def startShell(config: Config): Unit = { + println("Starting Flink Shell:") - settings.usejavacp.value = true - settings.Yreplsync.value = true + val (repl, cluster) = try { + val (host, port, cluster) = fetchConnectionInfo(config) + println(s"\nConnecting to Flink cluster (host: $host, port: $port).\n") + val repl: Option[FlinkILoop] = bufferedReader match { + case Some(reader) => + val out = new StringWriter() + Some(new FlinkILoop(host, port, config.externalJars, reader, new JPrintWriter(out))) + case None => + Some(new FlinkILoop(host, port, config.externalJars)) + } + + (repl, cluster) + } catch { + case e: IllegalArgumentException => + println(s"Error: ${e.getMessage}") + sys.exit() + } - // start scala interpreter shell + val settings = new Settings() + settings.usejavacp.value = true + settings.Yreplsync.value = true + + try { repl.foreach(_.process(settings)) } finally { repl.foreach(_.closeInterpreter()) - cluster.foreach(_.stop()) + cluster match { + case Some(Left(miniCluster)) => miniCluster.stop() + case Some(Right(yarnCluster)) => yarnCluster.shutdown(false) + case _ => + } + } + + println(" good bye ..") + } + + def deployNewYarnCluster(yarnConfig: YarnConfig) = { + val yarnClient = FlinkYarnSessionCli.getFlinkYarnClient + + // use flink-dist.jar for scala shell + val jarPath = new Path("file://" + + s"${yarnClient.getClass.getProtectionDomain.getCodeSource.getLocation.getPath}") + yarnClient.setLocalJarPath(jarPath) + + // load configuration + val confDirPath = CliFrontend.getConfigurationDirectoryFromEnv + val flinkConfiguration = GlobalConfiguration.getConfiguration + val confFile = new File(confDirPath + File.separator + "flink-conf.yaml") + val confPath = new Path(confFile.getAbsolutePath) + GlobalConfiguration.loadConfiguration(confDirPath) + yarnClient.setFlinkConfigurationObject(flinkConfiguration) + yarnClient.setConfigurationDirectory(confDirPath) + yarnClient.setConfigurationFilePath(confPath) + + // number of task managers is required. + yarnConfig.containers match { + case Some(containers) => yarnClient.setTaskManagerCount(containers) + case None => + throw new IllegalArgumentException("Number of taskmanagers must be specified.") + } + + // set configuration from user input + yarnConfig.jobManagerMemory.foreach(yarnClient.setJobManagerMemory) + yarnConfig.name.foreach(yarnClient.setName) + yarnConfig.queue.foreach(yarnClient.setQueue) + yarnConfig.slots.foreach(yarnClient.setTaskManagerSlots) + yarnConfig.taskManagerMemory.foreach(yarnClient.setTaskManagerMemory) + + // deploy + val cluster = yarnClient.deploy() + val address = cluster.getJobManagerAddress.getAddress.getHostAddress + val port = cluster.getJobManagerAddress.getPort + cluster.connectToCluster() + + (address, port, Some(Right(cluster))) + } + + def fetchDeployedYarnClusterInfo() = { + // load configuration + val globalConfig = GlobalConfiguration.getConfiguration + val defaultPropertiesLocation = System.getProperty("java.io.tmpdir") + val currentUser = System.getProperty("user.name") + val propertiesLocation = globalConfig.getString( + ConfigConstants.YARN_PROPERTIES_FILE_LOCATION, defaultPropertiesLocation) + val propertiesName = CliFrontend.YARN_PROPERTIES_FILE + currentUser + val propertiesFile = new File(propertiesLocation, propertiesName) --- End diff -- Maybe we could unify the `propertiesName` generation somewhere. It is also implemented in `ExecutionEnvironment`. If it changes at one place, then one has to remember that it also has to be changed here. This will most definitely be forgotten. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastructure@apache.org or file a JIRA ticket with INFRA. ---