flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-8332) Move dispose savepoint into ClusterClient
Date Wed, 10 Jan 2018 14:51:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320363#comment-16320363
] 

ASF GitHub Bot commented on FLINK-8332:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5219#discussion_r160697901
  
    --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -659,128 +655,107 @@ protected int savepoint(String[] args) {
     			return 0;
     		}
     
    -		if (options.isDispose()) {
    -			// Discard
    -			return disposeSavepoint(options);
    -		} else {
    -			// Trigger
    -			String[] cleanedArgs = options.getArgs();
    -			JobID jobId;
    +		CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(options.getCommandLine());
     
    -			if (cleanedArgs.length >= 1) {
    -				String jobIdString = cleanedArgs[0];
    -				try {
    -					jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
    -				} catch (Exception e) {
    -					return handleArgException(new IllegalArgumentException(
    -							"Error: The value for the Job ID is not a valid ID."));
    -				}
    +		ClusterClient clusterClient = customCommandLine.retrieveCluster(options.getCommandLine(),
config, configurationDirectory);
    +
    +		try {
    +			if (options.isDispose()) {
    +				// Discard
    +				return disposeSavepoint(clusterClient, options.getSavepointPath());
     			} else {
    -				return handleArgException(new IllegalArgumentException(
    +				// Trigger
    +				String[] cleanedArgs = options.getArgs();
    +				JobID jobId;
    +
    +				if (cleanedArgs.length >= 1) {
    +					String jobIdString = cleanedArgs[0];
    +					try {
    +						jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
    +					} catch (Exception e) {
    +						return handleArgException(new IllegalArgumentException(
    +							"Error: The value for the Job ID is not a valid ID."));
    +					}
    +				} else {
    +					return handleArgException(new IllegalArgumentException(
     						"Error: The value for the Job ID is not a valid ID. " +
    -								"Specify a Job ID to trigger a savepoint."));
    -			}
    +							"Specify a Job ID to trigger a savepoint."));
    +				}
     
    -			String savepointDirectory = null;
    -			if (cleanedArgs.length >= 2) {
    -				savepointDirectory = cleanedArgs[1];
    -			}
    +				String savepointDirectory = null;
    +				if (cleanedArgs.length >= 2) {
    +					savepointDirectory = cleanedArgs[1];
    +				}
     
    -			// Print superfluous arguments
    -			if (cleanedArgs.length >= 3) {
    -				logAndSysout("Provided more arguments than required. Ignoring not needed arguments.");
    -			}
    +				// Print superfluous arguments
    +				if (cleanedArgs.length >= 3) {
    +					logAndSysout("Provided more arguments than required. Ignoring not needed arguments.");
    +				}
     
    -			return triggerSavepoint(options, jobId, savepointDirectory);
    +				return triggerSavepoint(clusterClient, jobId, savepointDirectory);
    +			}
    +		} catch (Exception e) {
    +			return handleError(e);
    +		} finally {
    +			try {
    +				clusterClient.shutdown();
    +			} catch (Exception e) {
    +				LOG.info("Could not shutdown the cluster client.", e);
    +			}
     		}
     	}
     
     	/**
     	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint}
     	 * message to the job manager.
     	 */
    -	private int triggerSavepoint(SavepointOptions options, JobID jobId, String savepointDirectory)
{
    -		try {
    -			CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine());
    -			ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(),
config, configurationDirectory);
    -			try {
    -				logAndSysout("Triggering savepoint for job " + jobId + ".");
    -				CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobId,
savepointDirectory);
    -
    -				String savepointPath;
    -				try {
    -					logAndSysout("Waiting for response...");
    -					savepointPath = savepointPathFuture.get();
    -				}
    -				catch (ExecutionException ee) {
    -					Throwable cause = ExceptionUtils.stripExecutionException(ee);
    -					throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.",
cause);
    -				}
    -
    -				logAndSysout("Savepoint completed. Path: " + savepointPath);
    -				logAndSysout("You can resume your program from this savepoint with the run command.");
    +	private int triggerSavepoint(ClusterClient clusterClient, JobID jobId, String savepointDirectory)
throws Exception {
    +		logAndSysout("Triggering savepoint for job " + jobId + ".");
    +		CompletableFuture<String> savepointPathFuture = clusterClient.triggerSavepoint(jobId,
savepointDirectory);
     
    -				return 0;
    -			}
    -			finally {
    -				client.shutdown();
    -			}
    +		String savepointPath;
    +		try {
    +			logAndSysout("Waiting for response...");
    +			savepointPath = savepointPathFuture.get();
     		}
    -		catch (Throwable t) {
    -			return handleError(t);
    +		catch (ExecutionException ee) {
    +			Throwable cause = ExceptionUtils.stripExecutionException(ee);
    +			throw new FlinkException("Triggering a savepoint for the job " + jobId + " failed.",
cause);
     		}
    +
    +		logAndSysout("Savepoint completed. Path: " + savepointPath);
    +		logAndSysout("You can resume your program from this savepoint with the run command.");
    +
    +		return 0;
     	}
     
     	/**
     	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
     	 * message to the job manager.
     	 */
    -	private int disposeSavepoint(SavepointOptions options) {
    -		try {
    -			String savepointPath = options.getSavepointPath();
    -			if (savepointPath == null) {
    -				throw new IllegalArgumentException("Missing required argument: savepoint path. "
+
    -						"Usage: bin/flink savepoint -d <savepoint-path>");
    -			}
    -
    -			ActorGateway jobManager = getJobManagerGateway(options);
    +	private int disposeSavepoint(ClusterClient clusterClient, String savepointPath) {
    +		Preconditions.checkNotNull(savepointPath, "Missing required argument: savepoint path.
" +
    +			"Usage: bin/flink savepoint -d <savepoint-path>");
     
    -			logAndSysout("Disposing savepoint '" + savepointPath + "'.");
    +		logAndSysout("Disposing savepoint '" + savepointPath + "'.");
     
    -			Object msg = new DisposeSavepoint(savepointPath);
    -			Future<Object> response = jobManager.ask(msg, clientTimeout);
    +		CompletableFuture<Acknowledge> disposeFuture = null;
    --- End diff --
    
    You're right. Will change it.


> Move dispose savepoint into ClusterClient
> -----------------------------------------
>
>                 Key: FLINK-8332
>                 URL: https://issues.apache.org/jira/browse/FLINK-8332
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> Currently, the {{CliFrontend}} sends the command for disposing a savepoint. In order
to better abstract this functionality we should move it to the {{ClusterClient}}. That way
we can have different implementations of the {{ClusterClient}} (Flip-6 and old code) which
are used by the same {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message