flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3937) Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
Date Fri, 10 Jun 2016 15:02:21 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3937?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15324585#comment-15324585
] 

ASF GitHub Bot commented on FLINK-3937:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2085#discussion_r66628410
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -980,110 +845,41 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions
options) throws E
     	}
     
     	/**
    -	 * Retrieves a {@link Client} object from the given command line options and other parameters.
    +	 * Retrieves a {@link ClusterClient} object from the given command line options and
other parameters.
     	 *
     	 * @param options Command line options which contain JobManager address
     	 * @param programName Program name
    -	 * @param userParallelism Given user parallelism
     	 * @throws Exception
     	 */
    -	protected Client getClient(
    +	protected ClusterClient getClient(
     			CommandLineOptions options,
    -			String programName,
    -			int userParallelism,
    -			boolean detachedMode)
    -		throws Exception {
    -		InetSocketAddress jobManagerAddress;
    -		int maxSlots = -1;
    +			String programName) throws Exception {
     
    -		if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
    -			logAndSysout("YARN cluster mode detected. Switching Log4j output to console");
    -
    -			// Default yarn application name to use, if nothing is specified on the command line
    -			String applicationName = "Flink Application: " + programName;
    -
    -			// user wants to run Flink in YARN cluster.
    -			CommandLine commandLine = options.getCommandLine();
    -			AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser
    -														.getFlinkYarnSessionCli()
    -														.withDefaultApplicationName(applicationName)
    -														.createFlinkYarnClient(commandLine);
    -
    -			if (flinkYarnClient == null) {
    -				throw new RuntimeException("Unable to create Flink YARN Client. Check previous log
messages");
    -			}
    -
    -			// in case the main detached mode wasn't set, we don't wanna overwrite the one loaded
    -			// from yarn options.
    -			if (detachedMode) {
    -				flinkYarnClient.setDetachedMode(true);
    -			}
    -
    -			// the number of slots available from YARN:
    -			int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
    -			if (yarnTmSlots == -1) {
    -				yarnTmSlots = 1;
    -			}
    -			maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount();
    -			if (userParallelism != -1) {
    -				int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount();
    -				logAndSysout("The YARN cluster has " + maxSlots + " slots available, " +
    -						"but the user requested a parallelism of " + userParallelism + " on YARN. " +
    -						"Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
    -						"will get "+slotsPerTM+" slots.");
    -				flinkYarnClient.setTaskManagerSlots(slotsPerTM);
    -			}
    -
    -			try {
    -				yarnCluster = flinkYarnClient.deploy();
    -				yarnCluster.connectToCluster();
    -			}
    -			catch (Exception e) {
    -				throw new RuntimeException("Error deploying the YARN cluster", e);
    -			}
    +		// Get the custom command-line (e.g. Standalone/Yarn/Mesos)
    +		CustomCommandLine<?> activeCommandLine =
    +			CliFrontendParser.getActiveCustomCommandLine(options.getJobManagerAddress());
    --- End diff --
    
    Well spotted. I think I'll move this logic to the implementation of the CustomCommandLine.

    
    I don't quite understand your renaming suggestion, are you suggesting to break up the
CustomCommandLine into CustomParser and CustomCLI?


> Make flink cli list, savepoint, cancel and stop work on Flink-on-YARN clusters
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-3937
>                 URL: https://issues.apache.org/jira/browse/FLINK-3937
>             Project: Flink
>          Issue Type: Improvement
>            Reporter: Sebastian Klemke
>            Assignee: Maximilian Michels
>            Priority: Trivial
>         Attachments: improve_flink_cli_yarn_integration.patch
>
>
> Currently, flink cli can't figure out JobManager RPC location for Flink-on-YARN clusters.
Therefore, list, savepoint, cancel and stop subcommands are hard to invoke if you only know
the YARN application ID. As an improvement, I suggest adding a -yid <yarnApplicationId>
option to the mentioned subcommands that can be used together with -m yarn-cluster. Flink
cli would then retrieve JobManager RPC location from YARN ResourceManager.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message