flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [10/11] flink git commit: [FLINK-2976] [clients] Add savepoint commands to CliFrontend
Date Mon, 11 Jan 2016 15:31:31 GMT
[FLINK-2976] [clients] Add savepoint commands to CliFrontend

[comments] Use handleError(Throwable)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3607575a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3607575a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3607575a

Branch: refs/heads/master
Commit: 3607575a234f65b5ea50d987e9bd9f01c7f4a605
Parents: d739ee2
Author: Ufuk Celebi <uce@apache.org>
Authored: Tue Dec 1 18:49:14 2015 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Mon Jan 11 16:31:03 2016 +0100

----------------------------------------------------------------------
 .../org/apache/flink/client/CliFrontend.java    | 140 +++++++-
 .../flink/client/cli/CliFrontendParser.java     |  46 ++-
 .../apache/flink/client/cli/ProgramOptions.java |  23 +-
 .../flink/client/cli/SavepointOptions.java      |  51 +++
 .../org/apache/flink/client/program/Client.java |  95 +++---
 .../client/program/ContextEnvironment.java      |  21 +-
 .../program/ContextEnvironmentFactory.java      |   8 +-
 .../client/program/DetachedEnvironment.java     |  11 +-
 .../flink/client/program/PackagedProgram.java   |  14 +-
 .../apache/flink/client/CliFrontendRunTest.java |  12 +
 .../flink/client/CliFrontendSavepointTest.java  | 328 +++++++++++++++++++
 .../environment/StreamContextEnvironment.java   |   2 +-
 12 files changed, 689 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3607575a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index b201cf4..0363d6a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -101,6 +101,7 @@ public class CliFrontend {
 	public static final String ACTION_INFO = "info";
 	private static final String ACTION_LIST = "list";
 	private static final String ACTION_CANCEL = "cancel";
+	private static final String ACTION_SAVEPOINT = "savepoint";
 
 	// config dir parameters
 	private static final String ENV_CONFIG_DIRECTORY = "FLINK_CONF_DIR";
@@ -301,6 +302,8 @@ public class CliFrontend {
 			client.setPrintStatusDuringExecution(options.getStdoutLogging());
 			LOG.debug("Client slots is set to {}", client.getMaxSlots());
 
+			LOG.debug("Savepoint path is set to {}", options.getSavepointPath());
+
 			try {
 				if (client.getMaxSlots() != -1 && userParallelism == -1) {
 					logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
@@ -630,6 +633,135 @@ public class CliFrontend {
 		}
 	}
 
+	/**
+	 * Executes the SAVEPOINT action.
+	 *
+	 * @param args Command line arguments for the cancel action.
+	 */
+	protected int savepoint(String[] args) {
+		LOG.info("Running 'savepoint' command.");
+
+		SavepointOptions options;
+		try {
+			options = CliFrontendParser.parseSavepointCommand(args);
+		}
+		catch (CliArgsException e) {
+			return handleArgException(e);
+		}
+		catch (Throwable t) {
+			return handleError(t);
+		}
+
+		// evaluate help flag
+		if (options.isPrintHelp()) {
+			CliFrontendParser.printHelpForCancel();
+			return 0;
+		}
+
+		if (options.isDispose()) {
+			// Discard
+			return disposeSavepoint(options, options.getDisposeSavepointPath());
+		}
+		else {
+			// Trigger
+			String[] cleanedArgs = options.getArgs();
+			JobID jobId;
+
+			if (cleanedArgs.length > 0) {
+				String jobIdString = cleanedArgs[0];
+				try {
+					jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
+				}
+				catch (Exception e) {
+					return handleError(new IllegalArgumentException(
+							"Error: The value for the Job ID is not a valid ID."));
+				}
+			}
+			else {
+				return handleError(new IllegalArgumentException(
+						"Error: The value for the Job ID is not a valid ID. " +
+								"Specify a Job ID to trigger a savepoint."));
+			}
+
+			return triggerSavepoint(options, jobId);
+		}
+	}
+
+	/**
+	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint}
+	 * message to the job manager.
+	 */
+	private int triggerSavepoint(SavepointOptions options, JobID jobId) {
+		try {
+			ActorGateway jobManager = getJobManagerGateway(options);
+			Future<Object> response = jobManager.ask(new TriggerSavepoint(jobId), askTimeout);
+
+			Object result;
+			try {
+				logAndSysout("Triggering savepoint for job " + jobId + ". Waiting for response...");
+				result = Await.result(response, askTimeout);
+			}
+			catch (Exception e) {
+				throw new Exception("Triggering a savepoint for the job " + jobId + " failed.", e);
+			}
+
+			if (result instanceof TriggerSavepointSuccess) {
+				TriggerSavepointSuccess success = (TriggerSavepointSuccess) result;
+				logAndSysout("Savepoint completed. Path: " + success.savepointPath());
+				logAndSysout("You can resume your program from this savepoint with the run command.");
+
+				return 0;
+			}
+			else if (result instanceof TriggerSavepointFailure) {
+				TriggerSavepointFailure failure = (TriggerSavepointFailure) result;
+				throw failure.cause();
+			}
+			else {
+				throw new IllegalStateException("Unknown JobManager response of type " +
+						result.getClass());
+			}
+		}
+		catch (Throwable t) {
+			return handleError(t);
+		}
+	}
+
+	/**
+	 * Sends a {@link org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
+	 * message to the job manager.
+	 */
+	private int disposeSavepoint(SavepointOptions options, String savepointPath) {
+		try {
+			ActorGateway jobManager = getJobManagerGateway(options);
+			Future<Object> response = jobManager.ask(new DisposeSavepoint(savepointPath), askTimeout);
+
+			Object result;
+			try {
+				logAndSysout("Disposing savepoint '" + savepointPath + "'. Waiting for response...");
+				result = Await.result(response, askTimeout);
+			}
+			catch (Exception e) {
+				throw new Exception("Disposing the savepoint with path" + savepointPath + " failed.", e);
+			}
+
+			if (result.getClass() == JobManagerMessages.getDisposeSavepointSuccess().getClass()) {
+				logAndSysout("Savepoint '" + savepointPath + "' disposed.");
+				return 0;
+			}
+			else if (result instanceof DisposeSavepointFailure) {
+				DisposeSavepointFailure failure = (DisposeSavepointFailure) result;
+				throw failure.cause();
+			}
+			else {
+				throw new IllegalStateException("Unknown JobManager response of type " +
+						result.getClass());
+			}
+		}
+		catch (Throwable t) {
+			return handleError(t);
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Interaction with programs and JobManager
 	// --------------------------------------------------------------------------------------------
@@ -719,9 +851,13 @@ public class CliFrontend {
 		// Get assembler class
 		String entryPointClass = options.getEntryPointClassName();
 
-		return entryPointClass == null ?
+		PackagedProgram program = entryPointClass == null ?
 				new PackagedProgram(jarFile, classpaths, programArgs) :
 				new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
+
+		program.setSavepointPath(options.getSavepointPath());
+
+		return program;
 	}
 
 	/**
@@ -993,6 +1129,8 @@ public class CliFrontend {
 				return info(params);
 			case ACTION_CANCEL:
 				return cancel(params);
+			case ACTION_SAVEPOINT:
+				return savepoint(params)
 			case "-h":
 			case "--help":
 				CliFrontendParser.printHelp();

http://git-wip-us.apache.org/repos/asf/flink/blob/3607575a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 1226d48..4e081fd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -71,6 +71,12 @@ public class CliFrontendParser {
 					+ "' as the JobManager to deploy a YARN cluster for the job. Use this flag to connect to a " +
 					"different JobManager than the one specified in the configuration.");
 
+	static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
+			"Path to a savepoint to reset the job back to (for example file:///flink/savepoint-1537).");
+
+	static final Option SAVEPOINT_DISPOSE_OPTION = new Option("d", "dispose", true,
+			"Disposes an existing savepoint.");
+
 	// list specific options
 	static final Option RUNNING_OPTION = new Option("r", "running", false,
 			"Show only running programs and their JobIDs");
@@ -105,13 +111,19 @@ public class CliFrontendParser {
 
 		RUNNING_OPTION.setRequired(false);
 		SCHEDULED_OPTION.setRequired(false);
+
+		SAVEPOINT_PATH_OPTION.setRequired(false);
+		SAVEPOINT_PATH_OPTION.setArgName("savepointPath");
+
+		SAVEPOINT_DISPOSE_OPTION.setRequired(false);
+		SAVEPOINT_DISPOSE_OPTION.setArgName("savepointPath");
 	}
 
 	private static final Options RUN_OPTIONS = getRunOptions(buildGeneralOptions(new Options()));
 	private static final Options INFO_OPTIONS = getInfoOptions(buildGeneralOptions(new Options()));
 	private static final Options LIST_OPTIONS = getListOptions(buildGeneralOptions(new Options()));
 	private static final Options CANCEL_OPTIONS = getCancelOptions(buildGeneralOptions(new Options()));
-
+	private static final Options SAVEPOINT_OPTIONS = getSavepointOptions(buildGeneralOptions(new Options()));
 
 	private static Options buildGeneralOptions(Options options) {
 		options.addOption(HELP_OPTION);
@@ -128,6 +140,7 @@ public class CliFrontendParser {
 		options.addOption(ARGS_OPTION);
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
+		options.addOption(SAVEPOINT_PATH_OPTION);
 
 		// also add the YARN options so that the parser can parse them
 		yarnSessionCLi.getYARNSessionCLIOptions(options);
@@ -140,6 +153,7 @@ public class CliFrontendParser {
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
+		options.addOption(SAVEPOINT_PATH_OPTION);
 		return options;
 	}
 
@@ -183,6 +197,12 @@ public class CliFrontendParser {
 		return options;
 	}
 
+	private static Options getSavepointOptions(Options options) {
+		options = getJobManagerAddressOption(options);
+		options.addOption(SAVEPOINT_DISPOSE_OPTION);
+		return options;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Help
 	// --------------------------------------------------------------------------------------------
@@ -199,6 +219,7 @@ public class CliFrontendParser {
 		printHelpForInfo();
 		printHelpForList();
 		printHelpForCancel();
+		printHelpForSavepoint();
 
 		System.out.println();
 	}
@@ -255,6 +276,18 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
+	public static void printHelpForSavepoint() {
+		HelpFormatter formatter = new HelpFormatter();
+		formatter.setLeftPadding(5);
+		formatter.setWidth(80);
+
+		System.out.println("\nAction \"savepoint\" triggers savepoints for a running job or disposes existing ones.");
+		System.out.println("\n  Syntax: savepoint [OPTIONS] <Job ID>");
+		formatter.setSyntaxPrefix("  \"savepoint\" action options:");
+		formatter.printHelp(" ", getSavepointOptions(new Options()));
+		System.out.println();
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Line Parsing
 	// --------------------------------------------------------------------------------------------
@@ -292,6 +325,17 @@ public class CliFrontendParser {
 		}
 	}
 
+	public static SavepointOptions parseSavepointCommand(String[] args) throws CliArgsException {
+		try {
+			PosixParser parser = new PosixParser();
+			CommandLine line = parser.parse(SAVEPOINT_OPTIONS, args, false);
+			return new SavepointOptions(line);
+		}
+		catch (ParseException e) {
+			throw new CliArgsException(e.getMessage());
+		}
+	}
+
 	public static InfoOptions parseInfoCommand(String[] args) throws CliArgsException {
 		try {
 			PosixParser parser = new PosixParser();

http://git-wip-us.apache.org/repos/asf/flink/blob/3607575a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 499d3ca..73d49b5 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -19,19 +19,20 @@ package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
 
-import java.net.URL;
 import java.net.MalformedURLException;
-import java.util.List;
+import java.net.URL;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.List;
 
 import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
-import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
-import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
-import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;
 
 /**
  * Base class for command line options that refer to a JAR file program.
@@ -52,6 +53,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 
 	private final boolean detachedMode;
 
+	private final String savepointPath;
+
 	protected ProgramOptions(CommandLine line) throws CliArgsException {
 		super(line);
 
@@ -105,6 +108,12 @@ public abstract class ProgramOptions extends CommandLineOptions {
 
 		stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
 		detachedMode = line.hasOption(DETACHED_OPTION.getOpt());
+
+		if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
+			savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
+		} else {
+			savepointPath = null;
+		}
 	}
 
 	public String getJarFilePath() {
@@ -134,4 +143,8 @@ public abstract class ProgramOptions extends CommandLineOptions {
 	public boolean getDetachedMode() {
 		return detachedMode;
 	}
+
+	public String getSavepointPath() {
+		return savepointPath;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3607575a/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
new file mode 100644
index 0000000..10af76e
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.client.cli;
+
+import org.apache.commons.cli.CommandLine;
+
+import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_DISPOSE_OPTION;
+
+/**
+ * Command line options for the SAVEPOINT command
+ */
+public class SavepointOptions extends CommandLineOptions {
+
+	private final String[] args;
+	private boolean dispose;
+	private String disposeSavepointPath;
+
+	public SavepointOptions(CommandLine line) {
+		super(line);
+		this.args = line.getArgs();
+		this.dispose = line.hasOption(SAVEPOINT_DISPOSE_OPTION.getOpt());
+		this.disposeSavepointPath = line.getOptionValue(SAVEPOINT_DISPOSE_OPTION.getOpt());
+	}
+
+	public String[] getArgs() {
+		return args == null ? new String[0] : args;
+	}
+
+	public boolean isDispose() {
+		return dispose;
+	}
+
+	public String getDisposeSavepointPath() {
+		return disposeSavepointPath;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3607575a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
index 8f92c51..ff1a0fd 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java
@@ -62,18 +62,16 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 import akka.actor.ActorSystem;
 
-
 /**
  * Encapsulates the functionality necessary to submit a program to a remote cluster.
  */
 public class Client {
-	
+
 	private static final Logger LOG = LoggerFactory.getLogger(Client.class);
-	
-	
+
 	/** The optimizer used in the optimization of batch programs */
 	final Optimizer compiler;
-	
+
 	/** The actor system used to communicate with the JobManager */
 	private final ActorSystem actorSystem;
 
@@ -96,9 +94,9 @@ public class Client {
 	private boolean printStatusDuringExecution = true;
 
 	/**
-	 *  For interactive invocations, the Job ID is only available after the ContextEnvironment has
-	 *  been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment
-	 *  which lets us access the last JobID here.
+	 * For interactive invocations, the Job ID is only available after the ContextEnvironment has
+	 * been run inside the user JAR. We pass the Client to every instance of the ContextEnvironment
+	 * which lets us access the last JobID here.
 	 */
 	private JobID lastJobID;
 
@@ -112,7 +110,7 @@ public class Client {
 	 * if that is not possible.
 	 *
 	 * @param config The config used to obtain the job-manager's address, and used to configure the optimizer.
-	 * 
+	 *
 	 * @throws java.io.IOException Thrown, if the client's actor system could not be started.
 	 * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved.
 	 */
@@ -123,15 +121,14 @@ public class Client {
 	/**
 	 * Creates a new instance of the class that submits the jobs to a job-manager.
 	 * at the given address using the default port.
-	 * 
+	 *
 	 * @param config The configuration for the client-side processes, like the optimizer.
 	 * @param maxSlots maxSlots The number of maxSlots on the cluster if != -1.
-	 *                    
+	 *
 	 * @throws java.io.IOException Thrown, if the client's actor system could not be started.
-	 * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved.   
+	 * @throws java.net.UnknownHostException Thrown, if the JobManager's hostname could not be resolved.
 	 */
 	public Client(Configuration config, int maxSlots) throws IOException {
-
 		this.config = Preconditions.checkNotNull(config);
 		this.compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);
 		this.maxSlots = maxSlots;
@@ -147,7 +144,7 @@ public class Client {
 		timeout = AkkaUtils.getTimeout(config);
 		lookupTimeout = AkkaUtils.getTimeout(config);
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Startup & Shutdown
 	// ------------------------------------------------------------------------
@@ -161,15 +158,15 @@ public class Client {
 			this.actorSystem.awaitTermination();
 		}
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Configuration
 	// ------------------------------------------------------------------------
-	
+
 	/**
 	 * Configures whether the client should print progress updates during the execution to {@code System.out}.
 	 * All updates are logged via the SLF4J loggers regardless of this setting.
-	 * 
+	 *
 	 * @param print True to print updates to standard out during execution, false to not print them.
 	 */
 	public void setPrintStatusDuringExecution(boolean print) {
@@ -190,11 +187,11 @@ public class Client {
 	public int getMaxSlots() {
 		return this.maxSlots;
 	}
-	
+
 	// ------------------------------------------------------------------------
 	//  Access to the Program's Plan
 	// ------------------------------------------------------------------------
-	
+
 	public static String getOptimizedPlanAsJson(Optimizer compiler, PackagedProgram prog, int parallelism)
 			throws CompilerException, ProgramInvocationException
 	{
@@ -238,12 +235,14 @@ public class Client {
 	public JobSubmissionResult runBlocking(PackagedProgram prog, int parallelism) throws ProgramInvocationException {
 		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
-			return runBlocking(prog.getPlanWithJars(), parallelism);
+			return runBlocking(prog.getPlanWithJars(), parallelism, prog.getSavepointPath());
 		}
 		else if (prog.isUsingInteractiveMode()) {
 			LOG.info("Starting program in interactive mode");
 			ContextEnvironment.setAsContext(new ContextEnvironmentFactory(this, prog.getAllLibraries(),
-					prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, true));
+					prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, true,
+					prog.getSavepointPath()));
+
 			// invoke here
 			try {
 				prog.invokeInteractiveModeForExecution();
@@ -264,12 +263,13 @@ public class Client {
 	{
 		Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
 		if (prog.isUsingProgramEntryPoint()) {
-			return runDetached(prog.getPlanWithJars(), parallelism);
+			return runDetached(prog.getPlanWithJars(), parallelism, prog.getSavepointPath());
 		}
 		else if (prog.isUsingInteractiveMode()) {
 			LOG.info("Starting program in interactive mode");
 			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, prog.getAllLibraries(),
-					prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, false);
+					prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, false,
+					prog.getSavepointPath());
 			ContextEnvironment.setAsContext(factory);
 
 			// invoke here
@@ -286,6 +286,10 @@ public class Client {
 		}
 	}
 
+	public JobExecutionResult runBlocking(JobWithJars program, int parallelism) throws ProgramInvocationException {
+		return runBlocking(program, parallelism, null);
+	}
+
 	/**
 	 * Runs a program on the Flink cluster to which this client is connected. The call blocks until the
 	 * execution is complete, and returns afterwards.
@@ -300,16 +304,19 @@ public class Client {
 	 *                                    i.e. the job-manager is unreachable, or due to the fact that the
 	 *                                    parallel execution failed.
 	 */
-	public JobExecutionResult runBlocking(JobWithJars program, int parallelism) 
-			throws CompilerException, ProgramInvocationException
-	{
+	public JobExecutionResult runBlocking(JobWithJars program, int parallelism, String savepointPath)
+			throws CompilerException, ProgramInvocationException {
 		ClassLoader classLoader = program.getUserCodeClassLoader();
 		if (classLoader == null) {
 			throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
 		}
 
 		OptimizedPlan optPlan = getOptimizedPlan(compiler, program, parallelism);
-		return runBlocking(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader);
+		return runBlocking(optPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath);
+	}
+
+	public JobSubmissionResult runDetached(JobWithJars program, int parallelism) throws ProgramInvocationException {
+		return runDetached(program, parallelism, null);
 	}
 
 	/**
@@ -325,30 +332,37 @@ public class Client {
 	 *                                    or if the submission failed. That might be either due to an I/O problem,
 	 *                                    i.e. the job-manager is unreachable.
 	 */
-	public JobSubmissionResult runDetached(JobWithJars program, int parallelism)
-			throws CompilerException, ProgramInvocationException
-	{
+	public JobSubmissionResult runDetached(JobWithJars program, int parallelism, String savepointPath)
+			throws CompilerException, ProgramInvocationException {
 		ClassLoader classLoader = program.getUserCodeClassLoader();
 		if (classLoader == null) {
 			throw new IllegalArgumentException("The given JobWithJars does not provide a usercode class loader.");
 		}
 
 		OptimizedPlan optimizedPlan = getOptimizedPlan(compiler, program, parallelism);
-		return runDetached(optimizedPlan, program.getJarFiles(), program.getClasspaths(), classLoader);
+		return runDetached(optimizedPlan, program.getJarFiles(), program.getClasspaths(), classLoader, savepointPath);
+	}
+
+	public JobExecutionResult runBlocking(
+			FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException {
+		return runBlocking(compiledPlan, libraries, classpaths, classLoader, null);
 	}
-	
 
 	public JobExecutionResult runBlocking(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths,
-			ClassLoader classLoader) throws ProgramInvocationException
+			ClassLoader classLoader, String savepointPath) throws ProgramInvocationException
 	{
-		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths);
+		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointPath);
 		return runBlocking(job, classLoader);
 	}
 
+	public JobSubmissionResult runDetached(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException {
+		return runDetached(compiledPlan, libraries, classpaths, classLoader, null);
+	}
+
 	public JobSubmissionResult runDetached(FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths,
-			ClassLoader classLoader) throws ProgramInvocationException
+			ClassLoader classLoader, String savepointPath) throws ProgramInvocationException
 	{
-		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths);
+		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointPath);
 		return runDetached(job, classLoader);
 	}
 
@@ -519,13 +533,18 @@ public class Client {
 	}
 
 	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan) throws ProgramInvocationException {
-		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths());
+		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), null);
+	}
+
+	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException {
+		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointPath);
 	}
 
-	private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths) {
+	private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, String savepointPath) {
 		JobGraph job;
 		if (optPlan instanceof StreamingPlan) {
 			job = ((StreamingPlan) optPlan).getJobGraph();
+			job.setSavepointPath(savepointPath);
 		} else {
 			JobGraphGenerator gen = new JobGraphGenerator(this.config);
 			job = gen.compileJobGraph((OptimizedPlan) optPlan);

http://git-wip-us.apache.org/repos/asf/flink/blob/3607575a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 1e3d0d4..987558c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.client.program;
 
-import java.net.URL;
-import java.util.List;
-
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
@@ -28,8 +25,11 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
 
+import java.net.URL;
+import java.util.List;
+
 /**
- * Execution Environment for remote execution with the Client in blocking fashion.
+ * Execution Environment for remote execution with the Client.
  */
 public class ContextEnvironment extends ExecutionEnvironment {
 
@@ -40,13 +40,16 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	protected final List<URL> classpathsToAttach;
 	
 	protected final ClassLoader userCodeClassLoader;
+
+	protected final String savepointPath;
 	
 	public ContextEnvironment(Client remoteConnection, List<URL> jarFiles, List<URL> classpaths,
-			ClassLoader userCodeClassLoader) {
+			ClassLoader userCodeClassLoader, String savepointPath) {
 		this.client = remoteConnection;
 		this.jarFilesToAttach = jarFiles;
 		this.classpathsToAttach = classpaths;
 		this.userCodeClassLoader = userCodeClassLoader;
+		this.savepointPath = savepointPath;
 	}
 
 	@Override
@@ -54,7 +57,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		Plan p = createProgramPlan(jobName);
 		JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach,
 				this.userCodeClassLoader);
-		this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism());
+		this.lastJobExecutionResult = client.runBlocking(toRun, getParallelism(), savepointPath);
 		return this.lastJobExecutionResult;
 	}
 
@@ -94,7 +97,11 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	public ClassLoader getUserCodeClassLoader() {
 		return userCodeClassLoader;
 	}
-	
+
+	public String getSavepointPath() {
+		return savepointPath;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	
 	static void setAsContext(ContextEnvironmentFactory factory) {

http://git-wip-us.apache.org/repos/asf/flink/blob/3607575a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index 55f705b..e820bad 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -46,10 +46,11 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
 	private ExecutionEnvironment lastEnvCreated;
 
+	private String savepointPath;
 
 	public ContextEnvironmentFactory(Client client, List<URL> jarFilesToAttach,
 			List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
-			boolean wait)
+			boolean wait, String savepointPath)
 	{
 		this.client = client;
 		this.jarFilesToAttach = jarFilesToAttach;
@@ -57,6 +58,7 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 		this.userCodeClassLoader = userCodeClassLoader;
 		this.defaultParallelism = defaultParallelism;
 		this.wait = wait;
+		this.savepointPath = savepointPath;
 	}
 
 	@Override
@@ -66,8 +68,8 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 		}
 
 		lastEnvCreated = wait ?
-				new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader) :
-				new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader);
+				new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath) :
+				new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath);
 		if (defaultParallelism > 0) {
 			lastEnvCreated.setParallelism(defaultParallelism);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/3607575a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
index 0b1ae1d..037c36b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
@@ -41,8 +41,13 @@ public class DetachedEnvironment extends ContextEnvironment {
 
 	private static final Logger LOG = LoggerFactory.getLogger(DetachedEnvironment.class);
 
-	public DetachedEnvironment(Client remoteConnection, List<URL> jarFiles, List<URL> classpaths, ClassLoader userCodeClassLoader) {
-		super(remoteConnection, jarFiles, classpaths, userCodeClassLoader);
+	public DetachedEnvironment(
+			Client remoteConnection,
+			List<URL> jarFiles,
+			List<URL> classpaths,
+			ClassLoader userCodeClassLoader,
+			String savepointPath) {
+		super(remoteConnection, jarFiles, classpaths, userCodeClassLoader, savepointPath);
 	}
 
 	@Override
@@ -67,7 +72,7 @@ public class DetachedEnvironment extends ContextEnvironment {
 	 * Finishes this Context Environment's execution by explicitly running the plan constructed.
 	 */
 	JobSubmissionResult finalizeExecute() throws ProgramInvocationException {
-		return client.runDetached(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader);
+		return client.runDetached(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath);
 	}
 
 	public static final class DetachedJobExecutionResult extends JobExecutionResult {

http://git-wip-us.apache.org/repos/asf/flink/blob/3607575a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 8375ec2..f78502a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -86,6 +86,8 @@ public class PackagedProgram {
 	
 	private Plan plan;
 
+	private String savepointPath;
+
 	/**
 	 * Creates an instance that wraps the plan defined in the jar file using the given
 	 * argument.
@@ -254,9 +256,15 @@ public class PackagedProgram {
 					Program.class.getName() + " interface.");
 		}
 	}
-	
-	
-	
+
+	public void setSavepointPath(String savepointPath) {
+		this.savepointPath = savepointPath;
+	}
+
+	public String getSavepointPath() {
+		return savepointPath;
+	}
+
 	public String[] getArguments() {
 		return this.args;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/3607575a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index 64c2709..56173bd 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -22,7 +22,9 @@ package org.apache.flink.client;
 import static org.apache.flink.client.CliFrontendTestUtils.*;
 import static org.junit.Assert.*;
 
+import org.apache.flink.client.cli.CliFrontendParser;
 import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.Client;
 import org.apache.flink.client.program.PackagedProgram;
 import org.junit.BeforeClass;
@@ -90,6 +92,16 @@ public class CliFrontendRunTest {
 				CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
 				assertNotEquals(0, testFrontend.run(parameters));
 			}
+
+			// test configure savepoint path
+			{
+				String[] parameters = {"-s", "expectedSavepointPath", getTestJarPath()};
+				RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(1, false, false);
+				assertEquals(0, testFrontend.run(parameters));
+
+				RunOptions options = CliFrontendParser.parseRunCommand(parameters);
+				assertEquals("expectedSavepointPath", options.getSavepointPath());
+			}
 		}
 		catch (Exception e) {
 			e.printStackTrace();

http://git-wip-us.apache.org/repos/asf/flink/blob/3607575a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
new file mode 100644
index 0000000..13c895c
--- /dev/null
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendSavepointTest.java
@@ -0,0 +1,328 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Promise;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class CliFrontendSavepointTest {
+
+	private static PrintStream stdOut;
+	private static PrintStream stdErr;
+	private static ByteArrayOutputStream buffer;
+
+	// ------------------------------------------------------------------------
+	// Trigger savepoint
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testTriggerSavepointSuccess() throws Exception {
+		replaceStdOutAndStdErr();
+
+		try {
+			JobID jobId = new JobID();
+			ActorGateway jobManager = mock(ActorGateway.class);
+
+			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
+
+			when(jobManager.ask(
+					Mockito.eq(new JobManagerMessages.TriggerSavepoint(jobId)),
+					Mockito.any(FiniteDuration.class)))
+					.thenReturn(triggerResponse.future());
+
+			String savepointPath = "expectedSavepointPath";
+
+			triggerResponse.success(new JobManagerMessages
+					.TriggerSavepointSuccess(jobId, savepointPath));
+
+			CliFrontend frontend = new MockCliFrontend(
+					CliFrontendTestUtils.getConfigDir(), jobManager);
+
+			String[] parameters = { jobId.toString() };
+			int returnCode = frontend.savepoint(parameters);
+
+			assertEquals(0, returnCode);
+			verify(jobManager, times(1)).ask(
+					Mockito.eq(new JobManagerMessages.TriggerSavepoint(jobId)),
+					Mockito.any(FiniteDuration.class));
+
+			assertTrue(buffer.toString().contains("expectedSavepointPath"));
+		}
+		finally {
+			restoreStdOutAndStdErr();
+		}
+	}
+
+	@Test
+	public void testTriggerSavepointFailure() throws Exception {
+		replaceStdOutAndStdErr();
+
+		try {
+			JobID jobId = new JobID();
+			ActorGateway jobManager = mock(ActorGateway.class);
+
+			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
+
+			when(jobManager.ask(
+					Mockito.eq(new JobManagerMessages.TriggerSavepoint(jobId)),
+					Mockito.any(FiniteDuration.class)))
+					.thenReturn(triggerResponse.future());
+
+			Exception testException = new Exception("expectedTestException");
+
+			triggerResponse.success(new JobManagerMessages
+					.TriggerSavepointFailure(jobId, testException));
+
+			CliFrontend frontend = new MockCliFrontend(
+					CliFrontendTestUtils.getConfigDir(), jobManager);
+
+			String[] parameters = { jobId.toString() };
+			int returnCode = frontend.savepoint(parameters);
+
+			assertTrue(returnCode != 0);
+			verify(jobManager, times(1)).ask(
+					Mockito.eq(new JobManagerMessages.TriggerSavepoint(jobId)),
+					Mockito.any(FiniteDuration.class));
+
+			assertTrue(buffer.toString().contains("expectedTestException"));
+		}
+		finally {
+			restoreStdOutAndStdErr();
+		}
+	}
+
+	@Test
+	public void testTriggerSavepointFailureIllegalJobID() throws Exception {
+		replaceStdOutAndStdErr();
+
+		try {
+			CliFrontend frontend = new CliFrontend(CliFrontendTestUtils.getConfigDir());
+
+			String[] parameters = { "invalid job id" };
+			int returnCode = frontend.savepoint(parameters);
+
+			assertTrue(returnCode != 0);
+			assertTrue(buffer.toString().contains("not a valid ID"));
+		}
+		finally {
+			restoreStdOutAndStdErr();
+		}
+	}
+
+	@Test
+	public void testTriggerSavepointFailureUnknownResponse() throws Exception {
+		replaceStdOutAndStdErr();
+
+		try {
+			JobID jobId = new JobID();
+			ActorGateway jobManager = mock(ActorGateway.class);
+
+			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
+
+			when(jobManager.ask(
+					Mockito.eq(new JobManagerMessages.TriggerSavepoint(jobId)),
+					Mockito.any(FiniteDuration.class)))
+					.thenReturn(triggerResponse.future());
+
+			triggerResponse.success("UNKNOWN RESPONSE");
+
+			CliFrontend frontend = new MockCliFrontend(
+					CliFrontendTestUtils.getConfigDir(), jobManager);
+
+			String[] parameters = { jobId.toString() };
+			int returnCode = frontend.savepoint(parameters);
+
+			assertTrue(returnCode != 0);
+			verify(jobManager, times(1)).ask(
+					Mockito.eq(new JobManagerMessages.TriggerSavepoint(jobId)),
+					Mockito.any(FiniteDuration.class));
+
+			String errMsg = buffer.toString();
+			assertTrue(errMsg.contains("IllegalStateException"));
+			assertTrue(errMsg.contains("Unknown JobManager response"));
+		}
+		finally {
+			restoreStdOutAndStdErr();
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Dispose savepoint
+	// ------------------------------------------------------------------------
+
+	@Test
+	public void testDisposeSavepointSuccess() throws Exception {
+		replaceStdOutAndStdErr();
+
+		try {
+			String savepointPath = "expectedSavepointPath";
+			ActorGateway jobManager = mock(ActorGateway.class);
+
+			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
+
+			when(jobManager.ask(
+					Mockito.eq(new JobManagerMessages.DisposeSavepoint(savepointPath)),
+					Mockito.any(FiniteDuration.class))).thenReturn(triggerResponse.future());
+
+			triggerResponse.success(JobManagerMessages.getDisposeSavepointSuccess());
+
+			CliFrontend frontend = new MockCliFrontend(
+					CliFrontendTestUtils.getConfigDir(), jobManager);
+
+			String[] parameters = { "-d", savepointPath };
+			int returnCode = frontend.savepoint(parameters);
+
+			assertEquals(0, returnCode);
+			verify(jobManager, times(1)).ask(
+					Mockito.eq(new JobManagerMessages.DisposeSavepoint(savepointPath)),
+					Mockito.any(FiniteDuration.class));
+
+			String outMsg = buffer.toString();
+			assertTrue(outMsg.contains(savepointPath));
+			assertTrue(outMsg.contains("disposed"));
+		}
+		finally {
+			restoreStdOutAndStdErr();
+		}
+	}
+
+	@Test
+	public void testDisposeSavepointFailure() throws Exception {
+		replaceStdOutAndStdErr();
+
+		try {
+			String savepointPath = "expectedSavepointPath";
+			ActorGateway jobManager = mock(ActorGateway.class);
+
+			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
+
+			when(jobManager.ask(
+					Mockito.eq(new JobManagerMessages.DisposeSavepoint(savepointPath)),
+					Mockito.any(FiniteDuration.class)))
+					.thenReturn(triggerResponse.future());
+
+			Exception testException = new Exception("expectedTestException");
+
+			triggerResponse.success(new JobManagerMessages
+					.DisposeSavepointFailure(testException));
+
+			CliFrontend frontend = new MockCliFrontend(
+					CliFrontendTestUtils.getConfigDir(), jobManager);
+
+			String[] parameters = { "-d", savepointPath };
+			int returnCode = frontend.savepoint(parameters);
+
+			assertTrue(returnCode != 0);
+			verify(jobManager, times(1)).ask(
+					Mockito.eq(new JobManagerMessages.DisposeSavepoint(savepointPath)),
+					Mockito.any(FiniteDuration.class));
+
+			assertTrue(buffer.toString().contains("expectedTestException"));
+		}
+		finally {
+			restoreStdOutAndStdErr();
+		}
+	}
+
+	@Test
+	public void testDisposeSavepointFailureUnknownResponse() throws Exception {
+		replaceStdOutAndStdErr();
+
+		try {
+			String savepointPath = "expectedSavepointPath";
+			ActorGateway jobManager = mock(ActorGateway.class);
+
+			Promise<Object> triggerResponse = new scala.concurrent.impl.Promise.DefaultPromise<>();
+
+			when(jobManager.ask(
+					Mockito.eq(new JobManagerMessages.DisposeSavepoint(savepointPath)),
+					Mockito.any(FiniteDuration.class)))
+					.thenReturn(triggerResponse.future());
+
+			triggerResponse.success("UNKNOWN RESPONSE");
+
+			CliFrontend frontend = new MockCliFrontend(
+					CliFrontendTestUtils.getConfigDir(), jobManager);
+
+			String[] parameters = { "-d", savepointPath };
+			int returnCode = frontend.savepoint(parameters);
+
+			assertTrue(returnCode != 0);
+			verify(jobManager, times(1)).ask(
+					Mockito.eq(new JobManagerMessages.DisposeSavepoint(savepointPath)),
+					Mockito.any(FiniteDuration.class));
+
+			String errMsg = buffer.toString();
+			assertTrue(errMsg.contains("IllegalStateException"));
+			assertTrue(errMsg.contains("Unknown JobManager response"));
+		}
+		finally {
+			restoreStdOutAndStdErr();
+		}
+
+		replaceStdOutAndStdErr();
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static class MockCliFrontend extends CliFrontend {
+
+		private final ActorGateway mockJobManager;
+
+		public MockCliFrontend(String configDir, ActorGateway mockJobManager) throws Exception {
+			super(configDir);
+			this.mockJobManager = mockJobManager;
+		}
+
+		@Override
+		protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
+			return mockJobManager;
+		}
+	}
+
+	private static void replaceStdOutAndStdErr() {
+		stdOut = System.out;
+		stdErr = System.err;
+		buffer = new ByteArrayOutputStream();
+		PrintStream capture = new PrintStream(buffer);
+		System.setOut(capture);
+		System.setErr(capture);
+	}
+
+	private static void restoreStdOutAndStdErr() {
+		System.setOut(stdOut);
+		System.setErr(stdErr);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3607575a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 7a68fc5..23fcfed 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -60,7 +60,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 			((DetachedEnvironment) ctx).setDetachedPlan(streamGraph);
 			return DetachedEnvironment.DetachedJobExecutionResult.INSTANCE;
 		} else {
-			return ctx.getClient().runBlocking(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader());
+			return ctx.getClient().runBlocking(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointPath());
 		}
 	}
 }


Mime
View raw message