flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-4717] Add CancelJobWithSavepoint
Date Fri, 14 Oct 2016 08:07:42 GMT
Repository: flink
Updated Branches:
  refs/heads/master fd410d9f6 -> 5783671c2


[FLINK-4717] Add CancelJobWithSavepoint

- Adds CancelJobWithSavepoint message, which triggers a savepoint
  before cancelling the respective job.
- Adds -s [targetDirectory] option to CLI cancel command:
    * bin/flink cancel <jobID> (regular cancelling)
    * bin/flink cancel -s <jobID> (cancel with savepoint to default dir)
    * bin/flink cancel -s <targetDir> <jobID> (cancel with savepoint to targetDir)

This closes #2609.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5783671c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5783671c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5783671c

Branch: refs/heads/master
Commit: 5783671c2f30228a2d5b5b7bf09b762ae41db8e2
Parents: fd410d9
Author: Ufuk Celebi <uce@apache.org>
Authored: Fri Oct 7 11:48:47 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Fri Oct 14 10:07:09 2016 +0200

----------------------------------------------------------------------
 docs/setup/cli.md                               | 192 ++++++++++---------
 .../org/apache/flink/client/CliFrontend.java    |  55 +++++-
 .../apache/flink/client/cli/CancelOptions.java  |  18 ++
 .../flink/client/cli/CliFrontendParser.java     |  14 +-
 .../flink/client/CliFrontendListCancelTest.java | 106 +++++++++-
 .../checkpoint/CheckpointCoordinator.java       |  26 ++-
 .../checkpoint/CheckpointDeclineReason.java     |   2 +
 .../runtime/checkpoint/PendingCheckpoint.java   |   9 +
 .../flink/runtime/jobmanager/JobManager.scala   |  56 ++++++
 .../runtime/messages/JobManagerMessages.scala   |  19 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  89 ++++++---
 .../checkpoint/CheckpointStateRestoreTest.java  |   4 +-
 .../checkpoint/PendingCheckpointTest.java       |   2 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  11 +-
 .../runtime/jobmanager/JobManagerTest.java      | 142 ++++++++++++++
 .../test/checkpointing/SavepointITCase.java     |   4 +
 16 files changed, 607 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/docs/setup/cli.md
----------------------------------------------------------------------
diff --git a/docs/setup/cli.md b/docs/setup/cli.md
index 251a0f6..0d3045e 100644
--- a/docs/setup/cli.md
+++ b/docs/setup/cli.md
@@ -113,6 +113,10 @@ The command line can be used to
 
         ./bin/flink cancel <jobID>
 
+-   Cancel a job with a savepoint:
+
+        ./bin/flink cancel -s [targetDirectory] <jobID>
+
 -   Stop a job (streaming jobs only):
 
         ./bin/flink stop <jobID>
@@ -144,7 +148,19 @@ Returns the path of the created savepoint. You need this path to restore and dis
 
 You can optionally specify a `savepointDirectory` when triggering the savepoint. If you don't specify one here, you need to configure a default savepoint directory for the Flink installation (see [[savepoint.html#configuration]]).
 
-#### **Restore a savepoint**
+##### Cancel with a savepoint
+
+You can atomically trigger a savepoint and cancel a job.
+
+{% highlight bash %}
+./bin/flink cancel -s  [savepointDirectory] <jobID>
+{% endhighlight %}
+
+If no savepoint directory is configured, you need to configure a default savepoint directory for the Flink installation (see [[savepoint.html#configuration]]).
+
+The job will only be cancelled if the savepoint succeeds.
+
+#### Restore a savepoint
 
 {% highlight bash %}
 ./bin/flink run -s <savepointPath> ...
@@ -152,7 +168,7 @@ You can optionally specify a `savepointDirectory` when triggering the savepoint.
 
 The run command has a savepoint flag to submit a job, which restores its state from a savepoint. The savepoint path is returned by the savepoint trigger command.
 
-#### **Dispose a savepoint**
+#### Dispose a savepoint
 
 {% highlight bash %}
 ./bin/flink savepoint -d <savepointPath>
@@ -181,41 +197,51 @@ Action "run" compiles and runs a program.
 
   Syntax: run [OPTIONS] <jar-file> <arguments>
   "run" action options:
-     -c,--class <classname>               Class with the program entry point
-                                          ("main" method or "getPlan()" method.
-                                          Only needed if the JAR file does not
-                                          specify the class in its manifest.
-     -C,--classpath <url>                 Adds a URL to each user code
-                                          classloader  on all nodes in the
-                                          cluster. The paths must specify a
-                                          protocol (e.g. file://) and be
-                                          accessible on all nodes (e.g. by means
-                                          of a NFS share). You can use this
-                                          option multiple times for specifying
-                                          more than one URL. The protocol must
-                                          be supported by the {@link
-                                          java.net.URLClassLoader}.
-     -d,--detached                        If present, runs the job in detached
-                                          mode
-     -m,--jobmanager <host:port>          Address of the JobManager (master) to
-                                          which to connect. Specify
-                                          'yarn-cluster' as the JobManager to
-                                          deploy a YARN cluster for the job. Use
-                                          this flag to connect to a different
-                                          JobManager than the one specified in
-                                          the configuration.
-     -p,--parallelism <parallelism>       The parallelism with which to run the
-                                          program. Optional flag to override the
-                                          default value specified in the
-                                          configuration.
-     -q,--sysoutLogging                   If present, supress logging output to
-                                          standard out.
-     -s,--fromSavepoint <savepointPath>   Path to a savepoint to reset the job
-                                          back to (for example
-                                          file:///flink/savepoint-1537).
-  Additional arguments if -m yarn-cluster is set:
+     -c,--class <classname>                         Class with the program entry
+                                                    point ("main" method or
+                                                    "getPlan()" method. Only
+                                                    needed if the JAR file does
+                                                    not specify the class in its
+                                                    manifest.
+     -C,--classpath <url>                           Adds a URL to each user code
+                                                    classloader  on all nodes in
+                                                    the cluster. The paths must
+                                                    specify a protocol (e.g.
+                                                    file://) and be accessible
+                                                    on all nodes (e.g. by means
+                                                    of a NFS share). You can use
+                                                    this option multiple times
+                                                    for specifying more than one
+                                                    URL. The protocol must be
+                                                    supported by the {@link
+                                                    java.net.URLClassLoader}.
+     -d,--detached                                  If present, runs the job in
+                                                    detached mode
+     -m,--jobmanager <host:port>                    Address of the JobManager
+                                                    (master) to which to
+                                                    connect. Use this flag to
+                                                    connect to a different
+                                                    JobManager than the one
+                                                    specified in the
+                                                    configuration.
+     -p,--parallelism <parallelism>                 The parallelism with which
+                                                    to run the program. Optional
+                                                    flag to override the default
+                                                    value specified in the
+                                                    configuration.
+     -q,--sysoutLogging                             If present, suppress logging
+                                                    output to standard out.
+     -s,--fromSavepoint <savepointPath>             Path to a savepoint to reset
+                                                    the job back to (for example
+                                                    file:///flink/savepoint-1537
+                                                    ).
+     -z,--zookeeperNamespace <zookeeperNamespace>   Namespace to create the
+                                                    Zookeeper sub-paths for high
+                                                    availability mode
+  Options for yarn-cluster mode:
      -yD <arg>                            Dynamic properties
      -yd,--yarndetached                   Start detached
+     -yid,--yarnapplicationId <arg>       Attach to running YARN session
      -yj,--yarnjar <arg>                  Path to Flink jar file
      -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
                                           MB]
@@ -232,6 +258,9 @@ Action "run" compiles and runs a program.
                                           (t for transfer)
      -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container [in
                                           MB]
+     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
+                                          sub-paths for high availability mode
+
 
 
 Action "info" shows the optimized execution plan of the program (JSON).
@@ -242,16 +271,13 @@ Action "info" shows the optimized execution plan of the program (JSON).
                                       method or "getPlan()" method. Only needed
                                       if the JAR file does not specify the class
                                       in its manifest.
-     -m,--jobmanager <host:port>      Address of the JobManager (master) to
-                                      which to connect. Specify 'yarn-cluster'
-                                      as the JobManager to deploy a YARN cluster
-                                      for the job. Use this flag to connect to a
-                                      different JobManager than the one
-                                      specified in the configuration.
      -p,--parallelism <parallelism>   The parallelism with which to run the
                                       program. Optional flag to override the
                                       default value specified in the
                                       configuration.
+  Options for yarn-cluster mode:
+     -yid,--yarnapplicationId <arg>   Attach to running YARN session
+
 
 
 Action "list" lists running and scheduled programs.
@@ -259,41 +285,17 @@ Action "list" lists running and scheduled programs.
   Syntax: list [OPTIONS]
   "list" action options:
      -m,--jobmanager <host:port>   Address of the JobManager (master) to which
-                                   to connect. Specify 'yarn-cluster' as the
-                                   JobManager to deploy a YARN cluster for the
-                                   job. Use this flag to connect to a different
-                                   JobManager than the one specified in the
-                                   configuration.
+                                   to connect. Use this flag to connect to a
+                                   different JobManager than the one specified
+                                   in the configuration.
      -r,--running                  Show only running programs and their JobIDs
      -s,--scheduled                Show only scheduled programs and their JobIDs
-  Additional arguments if -m yarn-cluster is set:
-     -yid <yarnApplicationId>      YARN application ID of Flink YARN session to
-                                   connect to. Must not be set if JobManager HA
-                                   is used. In this case, JobManager RPC
-                                   location is automatically retrieved from
-                                   Zookeeper.
+  Options for yarn-cluster mode:
+     -yid,--yarnapplicationId <arg>   Attach to running YARN session
 
 
-Action "cancel" cancels a running program.
 
-  Syntax: cancel [OPTIONS] <Job ID>
-  "cancel" action options:
-     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
-                                   to connect. Specify 'yarn-cluster' as the
-                                   JobManager to deploy a YARN cluster for the
-                                   job. Use this flag to connect to a different
-                                   JobManager than the one specified in the
-                                   configuration.
-  Additional arguments if -m yarn-cluster is set:
-     -yid <yarnApplicationId>      YARN application ID of Flink YARN session to
-                                   connect to. Must not be set if JobManager HA
-                                   is used. In this case, JobManager RPC
-                                   location is automatically retrieved from
-                                   Zookeeper.
-
-
-Action "stop" stops a running program (streaming jobs only). There are no strong consistency
-guarantees for a stop request.
+Action "stop" stops a running program (streaming jobs only).
 
   Syntax: stop [OPTIONS] <Job ID>
   "stop" action options:
@@ -301,24 +303,40 @@ guarantees for a stop request.
                                    to connect. Use this flag to connect to a
                                    different JobManager than the one specified
                                    in the configuration.
-  Additional arguments if -m yarn-cluster is set:
-     -yid <yarnApplicationId>      YARN application ID of Flink YARN session to
-                                   connect to. Must not be set if JobManager HA
-                                   is used. In this case, JobManager RPC
-                                   location is automatically retrieved from
-                                   Zookeeper.
+  Options for yarn-cluster mode:
+     -yid,--yarnapplicationId <arg>   Attach to running YARN session
+
+
+
+Action "cancel" cancels a running program.
+
+  Syntax: cancel [OPTIONS] <Job ID>
+  "cancel" action options:
+     -m,--jobmanager <host:port>            Address of the JobManager (master)
+                                            to which to connect. Use this flag
+                                            to connect to a different JobManager
+                                            than the one specified in the
+                                            configuration.
+     -s,--withSavepoint <targetDirectory>   Trigger savepoint and cancel job.
+                                            The target directory is optional. If
+                                            no directory is specified, the
+                                            configured default directory
+                                            (state.savepoints.dir) is used.
+  Options for yarn-cluster mode:
+     -yid,--yarnapplicationId <arg>   Attach to running YARN session
+
 
 
 Action "savepoint" triggers savepoints for a running job or disposes existing ones.
 
- Syntax: savepoint [OPTIONS] <Job ID>
- "savepoint" action options:
-    -d,--dispose <arg>            Path of savepoint to dispose.
-    -j,--jarfile <jarfile>        Flink program JAR file.
-    -m,--jobmanager <host:port>   Address of the JobManager (master) to which
-                                  to connect. Use this flag to connect to a
-                                  different JobManager than the one specified
-                                  in the configuration.
- Options for yarn-cluster mode:
-    -yid,--yarnapplicationId <arg>   Attach to running YARN session
+  Syntax: savepoint [OPTIONS] <Job ID> [<target directory>]
+  "savepoint" action options:
+     -d,--dispose <arg>            Path of savepoint to dispose.
+     -j,--jarfile <jarfile>        Flink program JAR file.
+     -m,--jobmanager <host:port>   Address of the JobManager (master) to which
+                                   to connect. Use this flag to connect to a
+                                   different JobManager than the one specified
+                                   in the configuration.
+  Options for yarn-cluster mode:
+     -yid,--yarnapplicationId <arg>   Attach to running YARN session
 ~~~

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 90d3437..0572dc6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -60,7 +60,9 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
 import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
@@ -556,20 +558,38 @@ public class CliFrontend {
 		}
 
 		String[] cleanedArgs = options.getArgs();
+
+		boolean withSavepoint = options.isWithSavepoint();
+		String targetDirectory = options.getSavepointTargetDirectory();
+
 		JobID jobId;
 
+		// Figure out jobID. This is a little overly complicated, because
+		// we have to figure out whether the optional target directory
+		// is set:
+		// - cancel -s <jobID> => default target dir (JobID parsed as opt arg)
+		// - cancel -s <targetDir> <jobID> => custom target dir (parsed correctly)
 		if (cleanedArgs.length > 0) {
 			String jobIdString = cleanedArgs[0];
 			try {
 				jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
-			}
-			catch (Exception e) {
+			} catch (Exception e) {
 				LOG.error("Error: The value for the Job ID is not a valid ID.");
 				System.out.println("Error: The value for the Job ID is not a valid ID.");
 				return 1;
 			}
-		}
-		else {
+		} else if (targetDirectory != null)  {
+			// Try this for case: cancel -s <jobID> (default savepoint target dir)
+			String jobIdString = targetDirectory;
+			try {
+				jobId = new JobID(StringUtils.hexStringToByte(jobIdString));
+				targetDirectory = null;
+			} catch (Exception e) {
+				LOG.error("Missing JobID in the command line arguments.");
+				System.out.println("Error: Specify a Job ID to cancel a job.");
+				return 1;
+			}
+		} else {
 			LOG.error("Missing JobID in the command line arguments.");
 			System.out.println("Error: Specify a Job ID to cancel a job.");
 			return 1;
@@ -577,13 +597,36 @@ public class CliFrontend {
 
 		try {
 			ActorGateway jobManager = getJobManagerGateway(options);
-			Future<Object> response = jobManager.ask(new CancelJob(jobId), clientTimeout);
 
+			Object cancelMsg;
+			if (withSavepoint) {
+				if (targetDirectory == null) {
+					logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory.");
+				} else {
+					logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + ".");
+				}
+				cancelMsg = new CancelJobWithSavepoint(jobId, targetDirectory);
+			} else {
+				logAndSysout("Cancelling job " + jobId + ".");
+				cancelMsg = new CancelJob(jobId);
+			}
+
+			Future<Object> response = jobManager.ask(cancelMsg, clientTimeout);
 			final Object rc = Await.result(response, clientTimeout);
 
-			if (rc instanceof CancellationFailure) {
+			if (rc instanceof CancellationSuccess) {
+				if (withSavepoint) {
+					CancellationSuccess success = (CancellationSuccess) rc;
+					String savepointPath = success.savepointPath();
+					logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + ".");
+				} else {
+					logAndSysout("Cancelled job " + jobId + ".");
+				}
+			} else if (rc instanceof CancellationFailure) {
 				throw new Exception("Canceling the job with ID " + jobId + " failed.",
 						((CancellationFailure) rc).cause());
+			} else {
+				throw new IllegalStateException("Unexpected response: " + rc);
 			}
 
 			return 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
index 22e9ece..54e1a23 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CancelOptions.java
@@ -19,6 +19,8 @@ package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
 
+import static org.apache.flink.client.cli.CliFrontendParser.CANCEL_WITH_SAVEPOINT_OPTION;
+
 /**
  * Command line options for the CANCEL command
  */
@@ -26,12 +28,28 @@ public class CancelOptions extends CommandLineOptions {
 
 	private final String[] args;
 
+	/** Flag indicating whether to cancel with a savepoint. */
+	private final boolean withSavepoint;
+
+	/** Optional target directory for the savepoint. Overwrites cluster default. */
+	private final String targetDirectory;
+
 	public CancelOptions(CommandLine line) {
 		super(line);
 		this.args = line.getArgs();
+		this.withSavepoint = line.hasOption(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
+		this.targetDirectory = line.getOptionValue(CANCEL_WITH_SAVEPOINT_OPTION.getOpt());
 	}
 
 	public String[] getArgs() {
 		return args == null ? new String[0] : args;
 	}
+
+	public boolean isWithSavepoint() {
+		return withSavepoint;
+	}
+
+	public String getSavepointTargetDirectory() {
+		return targetDirectory;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 9f3ef63..2527a9c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -24,6 +24,7 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.flink.client.CliFrontend;
+import org.apache.flink.configuration.ConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +37,6 @@ public class CliFrontendParser {
 
 	private static final Logger LOG = LoggerFactory.getLogger(CliFrontendParser.class);
 
-
 	static final Option HELP_OPTION = new Option("h", "help", false,
 			"Show the help message for the CLI Frontend or the action.");
 
@@ -85,6 +85,11 @@ public class CliFrontendParser {
 	static final Option ZOOKEEPER_NAMESPACE_OPTION = new Option("z", "zookeeperNamespace", true,
 			"Namespace to create the Zookeeper sub-paths for high availability mode");
 
+	static final Option CANCEL_WITH_SAVEPOINT_OPTION = new Option(
+			"s", "withSavepoint", true, "Trigger savepoint and cancel job. The target " +
+			"directory is optional. If no directory is specified, the configured default " +
+			"directory (" + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + ") is used.");
+
 	static {
 		HELP_OPTION.setRequired(false);
 
@@ -118,6 +123,10 @@ public class CliFrontendParser {
 
 		ZOOKEEPER_NAMESPACE_OPTION.setRequired(false);
 		ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace");
+
+		CANCEL_WITH_SAVEPOINT_OPTION.setRequired(false);
+		CANCEL_WITH_SAVEPOINT_OPTION.setArgName("targetDirectory");
+		CANCEL_WITH_SAVEPOINT_OPTION.setOptionalArg(true);
 	}
 
 	private static final Options RUN_OPTIONS = getRunOptions(buildGeneralOptions(new Options()));
@@ -188,6 +197,7 @@ public class CliFrontendParser {
 	}
 
 	private static Options getCancelOptions(Options options) {
+		options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);
 		options = getJobManagerAddressOption(options);
 		return addCustomCliOptions(options, false);
 	}
@@ -213,7 +223,6 @@ public class CliFrontendParser {
 		return getJobManagerAddressOption(o);
 	}
 
-
 	private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) {
 		options.addOption(CLASS_OPTION);
 		options.addOption(PARALLELISM_OPTION);
@@ -228,6 +237,7 @@ public class CliFrontendParser {
 	}
 
 	private static Options getCancelOptionsWithoutDeprecatedOptions(Options options) {
+		options.addOption(CANCEL_WITH_SAVEPOINT_OPTION);
 		options = getJobManagerAddressOption(options);
 		return options;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
index 524e7e7..53311ef 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendListCancelTest.java
@@ -18,14 +18,16 @@
 
 package org.apache.flink.client;
 
-import akka.actor.*;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import akka.actor.Status;
 import akka.testkit.JavaTestKit;
-
-import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -34,7 +36,10 @@ import org.junit.Test;
 import java.util.UUID;
 
 import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 public class CliFrontendListCancelTest {
 
@@ -128,6 +133,68 @@ public class CliFrontendListCancelTest {
 		}
 	}
 
+	/**
+	 * Tests cancelling with the savepoint option.
+	 */
+	@Test
+	public void testCancelWithSavepoint() throws Exception {
+		{
+			// Cancel with savepoint (no target directory)
+			JobID jid = new JobID();
+			UUID leaderSessionID = UUID.randomUUID();
+
+			Props props = Props.create(CliJobManager.class, jid, leaderSessionID);
+			ActorRef jm = actorSystem.actorOf(props);
+			ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
+
+			String[] parameters = { "-s", jid.toString() };
+			InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
+			assertEquals(0, testFrontend.cancel(parameters));
+		}
+
+		{
+			// Cancel with savepoint (with target directory)
+			JobID jid = new JobID();
+			UUID leaderSessionID = UUID.randomUUID();
+
+			Props props = Props.create(CliJobManager.class, jid, leaderSessionID, "targetDirectory");
+			ActorRef jm = actorSystem.actorOf(props);
+			ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
+
+			String[] parameters = { "-s", "targetDirectory", jid.toString() };
+			InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
+			assertEquals(0, testFrontend.cancel(parameters));
+		}
+
+		{
+			// Cancel with savepoint (with target directory), but no job ID
+			JobID jid = new JobID();
+			UUID leaderSessionID = UUID.randomUUID();
+
+			Props props = Props.create(CliJobManager.class, jid, leaderSessionID, "targetDirectory");
+			ActorRef jm = actorSystem.actorOf(props);
+			ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
+
+			String[] parameters = { "-s", "targetDirectory" };
+			InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
+			assertNotEquals(0, testFrontend.cancel(parameters));
+		}
+
+		{
+			// Cancel with savepoint (no target directory)and no job ID
+			JobID jid = new JobID();
+			UUID leaderSessionID = UUID.randomUUID();
+
+			Props props = Props.create(CliJobManager.class, jid, leaderSessionID);
+			ActorRef jm = actorSystem.actorOf(props);
+			ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID);
+
+			String[] parameters = { "-s" };
+			InfoListTestCliFrontend testFrontend = new InfoListTestCliFrontend(gateway);
+			assertNotEquals(0, testFrontend.cancel(parameters));
+		}
+	}
+
 	@Test
 	public void testList() {
 		try {
@@ -182,10 +249,16 @@ public class CliFrontendListCancelTest {
 	protected static final class CliJobManager extends FlinkUntypedActor {
 		private final JobID jobID;
 		private final UUID leaderSessionID;
+		private final String targetDirectory;
 
 		public CliJobManager(final JobID jobID, final UUID leaderSessionID){
+			this(jobID, leaderSessionID, null);
+		}
+
+		public CliJobManager(final JobID jobID, final UUID leaderSessionID, String targetDirectory){
 			this.jobID = jobID;
 			this.leaderSessionID = leaderSessionID;
+			this.targetDirectory = targetDirectory;
 		}
 
 		@Override
@@ -198,7 +271,7 @@ public class CliFrontendListCancelTest {
 
 				if (jobID != null && jobID.equals(cancelJob.jobID())) {
 					getSender().tell(
-							decorateMessage(new Status.Success(new Object())),
+							decorateMessage(new Status.Success(new JobManagerMessages.CancellationSuccess(jobID, null))),
 							getSelf());
 				}
 				else {
@@ -207,6 +280,27 @@ public class CliFrontendListCancelTest {
 							getSelf());
 				}
 			}
+			else if (message instanceof JobManagerMessages.CancelJobWithSavepoint) {
+				JobManagerMessages.CancelJobWithSavepoint cancelJob = (JobManagerMessages.CancelJobWithSavepoint) message;
+
+				if (jobID != null && jobID.equals(cancelJob.jobID())) {
+					if (targetDirectory == null && cancelJob.savepointDirectory() == null ||
+							targetDirectory != null && targetDirectory.equals(cancelJob.savepointDirectory())) {
+						getSender().tell(
+								decorateMessage(new JobManagerMessages.CancellationSuccess(jobID, targetDirectory)),
+								getSelf());
+					} else {
+						getSender().tell(
+								decorateMessage(new JobManagerMessages.CancellationFailure(jobID, new Exception("Wrong target directory"))),
+								getSelf());
+					}
+				}
+				else {
+					getSender().tell(
+							decorateMessage(new JobManagerMessages.CancellationFailure(jobID, new Exception("Wrong or no JobID"))),
+							getSelf());
+				}
+			}
 			else if (message instanceof JobManagerMessages.RequestRunningJobsStatus$) {
 				getSender().tell(
 						decorateMessage(new JobManagerMessages.RunningJobsStatus()),

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index ab4bde7..00028c4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -287,7 +287,7 @@ public class CheckpointCoordinator {
 		checkNotNull(targetDirectory, "Savepoint target directory");
 
 		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
-		CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory);
+		CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory, false);
 
 		if (result.isSuccess()) {
 			return result.getPendingCheckpoint().getCompletionFuture();
@@ -303,13 +303,21 @@ public class CheckpointCoordinator {
 	 * timestamp.
 	 *
 	 * @param timestamp The timestamp for the checkpoint.
+	 * @param isPeriodic Flag indicating whether this triggered checkpoint is
+	 * periodic. If this flag is true, but the periodic scheduler is disabled,
+	 * the checkpoint will be declined.
 	 * @return <code>true</code> if triggering the checkpoint succeeded.
 	 */
-	public boolean triggerCheckpoint(long timestamp) throws Exception {
-		return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory).isSuccess();
+	public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) throws Exception {
+		return triggerCheckpoint(timestamp, checkpointProperties, checkpointDirectory, isPeriodic).isSuccess();
 	}
 
-	CheckpointTriggerResult triggerCheckpoint(long timestamp, CheckpointProperties props, String targetDirectory) throws Exception {
+	CheckpointTriggerResult triggerCheckpoint(
+			long timestamp,
+			CheckpointProperties props,
+			String targetDirectory,
+			boolean isPeriodic) throws Exception {
+
 		// Sanity check
 		if (props.externalizeCheckpoint() && targetDirectory == null) {
 			throw new IllegalStateException("No target directory specified to persist checkpoint to.");
@@ -322,6 +330,11 @@ public class CheckpointCoordinator {
 				return new CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
 			}
 
+			// Don't allow periodic checkpoint if scheduling has been disabled
+			if (isPeriodic && !periodicScheduling) {
+				return new CheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);
+			}
+
 			// validate whether the checkpoint can be triggered, with respect to the limit of
 			// concurrent checkpoints, and the minimum time between checkpoints.
 			// these checks are not relevant for savepoints
@@ -417,6 +430,7 @@ public class CheckpointCoordinator {
 					checkpointID,
 					timestamp,
 					ackTasks,
+					isPeriodic,
 					props,
 					targetDirectory);
 
@@ -580,7 +594,7 @@ public class CheckpointCoordinator {
 				}
 				if (!haveMoreRecentPending && !triggerRequestQueued) {
 					LOG.info("Triggering new checkpoint because of discarded checkpoint " + checkpointId);
-					triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), checkpoint.getTargetDirectory());
+					triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), checkpoint.getTargetDirectory(), checkpoint.isPeriodic());
 				} else if (!haveMoreRecentPending) {
 					LOG.info("Promoting queued checkpoint request because of discarded checkpoint " + checkpointId);
 					triggerQueuedRequests();
@@ -1084,7 +1098,7 @@ public class CheckpointCoordinator {
 		@Override
 		public void run() {
 			try {
-				triggerCheckpoint(System.currentTimeMillis());
+				triggerCheckpoint(System.currentTimeMillis(), true);
 			}
 			catch (Exception e) {
 				LOG.error("Exception while triggering checkpoint", e);

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
index 2cc9094..60fe657 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointDeclineReason.java
@@ -25,6 +25,8 @@ public enum CheckpointDeclineReason {
 
 	COORDINATOR_SHUTDOWN("Checkpoint coordinator is shut down."),
 
+	PERIODIC_SCHEDULER_SHUTDOWN("Periodic checkpoint scheduler is shut down."),
+
 	ALREADY_QUEUED("Another checkpoint request has already been queued."),
 
 	TOO_MANY_CONCURRENT_CHECKPOINTS("The maximum number of concurrent checkpoints is exceeded"),

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 983f1d7..6f50392 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -67,6 +67,9 @@ public class PendingCheckpoint {
 
 	private final Map<ExecutionAttemptID, ExecutionVertex> notYetAcknowledgedTasks;
 
+	/** Flag indicating whether the checkpoint is triggered as part of periodic scheduling. */
+	private final boolean isPeriodic;
+
 	/**
 	 * The checkpoint properties. If the checkpoint should be persisted
 	 * externally, it happens in {@link #finalizeCheckpoint()}.
@@ -90,12 +93,14 @@ public class PendingCheckpoint {
 			long checkpointId,
 			long checkpointTimestamp,
 			Map<ExecutionAttemptID, ExecutionVertex> verticesToConfirm,
+			boolean isPeriodic,
 			CheckpointProperties props,
 			String targetDirectory) {
 		this.jobId = checkNotNull(jobId);
 		this.checkpointId = checkpointId;
 		this.checkpointTimestamp = checkpointTimestamp;
 		this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
+		this.isPeriodic = isPeriodic;
 		this.taskStates = new HashMap<>();
 		this.props = checkNotNull(props);
 		this.targetDirectory = targetDirectory;
@@ -147,6 +152,10 @@ public class PendingCheckpoint {
 		return discarded;
 	}
 
+	boolean isPeriodic() {
+		return isPeriodic;
+	}
+
 	/**
 	 * Checks whether this checkpoint can be subsumed or whether it should always continue, regardless
 	 * of newer checkpoints in progress.

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 450e810..cca0124 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -578,6 +578,62 @@ class JobManager(
           )
       }
 
+    case CancelJobWithSavepoint(jobId, savepointDirectory) =>
+      try {
+        val targetDirectory = if (savepointDirectory != null) {
+          savepointDirectory
+        } else {
+          defaultSavepointDir
+        }
+
+        log.info(s"Trying to cancel job $jobId with savepoint to $targetDirectory")
+
+        currentJobs.get(jobId) match {
+          case Some((executionGraph, _)) =>
+            // We don't want any checkpoint between the savepoint and cancellation
+            val coord = executionGraph.getCheckpointCoordinator
+            coord.stopCheckpointScheduler()
+
+            // Trigger the savepoint
+            val future = coord.triggerSavepoint(System.currentTimeMillis(), targetDirectory)
+
+            val senderRef = sender()
+            future.handleAsync[Void](
+              new BiFunction[CompletedCheckpoint, Throwable, Void] {
+                override def apply(success: CompletedCheckpoint, cause: Throwable): Void = {
+                  if (success != null) {
+                    val path = success.getExternalPath()
+                    log.info(s"Savepoint stored in $path. Now cancelling $jobId.")
+                    executionGraph.cancel()
+                    senderRef ! decorateMessage(CancellationSuccess(jobId, path))
+                  } else {
+                    val msg = CancellationFailure(
+                      jobId,
+                      new Exception("Failed to trigger savepoint.", cause))
+                    senderRef ! decorateMessage(msg)
+                  }
+                  null
+                }
+              },
+              context.dispatcher)
+
+          case None =>
+            log.info(s"No job found with ID $jobId.")
+            sender ! decorateMessage(
+              CancellationFailure(
+                jobId,
+                new IllegalArgumentException(s"No job found with ID $jobId."))
+            )
+        }
+      } catch {
+        case t: Throwable =>
+          log.info(s"Failure during cancellation of job $jobId with savepoint.", t)
+          sender ! decorateMessage(
+            CancellationFailure(
+              jobId,
+              new Exception(s"Failed to cancel job $jobId with savepoint.", t)))
+      }
+
     case StopJob(jobID) =>
       log.info(s"Trying to stop job with ID $jobID.")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index fd45cda..4cf6a02 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -111,6 +111,21 @@ object JobManagerMessages {
   case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID
 
   /**
+    * Cancels the job with the given [[jobID]] at the JobManager. Before cancellation a savepoint
+    * is triggered without any other checkpoints in between. The result of the cancellation is
+    * the path of the triggered savepoint on success or an exception.
+    *
+    * @param jobID ID of the job to cancel
+    * @param savepointDirectory Optional target directory for the savepoint.
+    *                           If no target directory is specified here, the
+    *                           cluster default is used.
+    */
+  case class CancelJobWithSavepoint(
+      jobID: JobID,
+      savepointDirectory: String = null)
+    extends RequiresLeaderSessionID
+
+  /**
    * Stops a (streaming) job with the given [[jobID]] at the JobManager. The result of
    * stopping is sent back to the sender as a [[StoppingResponse]] message.
    *
@@ -280,7 +295,9 @@ object JobManagerMessages {
    * Denotes a successful job cancellation
    * @param jobID
    */
-  case class CancellationSuccess(jobID: JobID) extends CancellationResponse
+  case class CancellationSuccess(
+    jobID: JobID,
+    savepointPath: String = null) extends CancellationResponse
 
   /**
    * Denotes a failed job cancellation

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 7b0e819..2a20c6c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -135,7 +135,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should not succeed
-			assertFalse(coord.triggerCheckpoint(timestamp));
+			assertFalse(coord.triggerCheckpoint(timestamp, false));
 
 			// still, nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -188,7 +188,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should not succeed
-			assertFalse(coord.triggerCheckpoint(timestamp));
+			assertFalse(coord.triggerCheckpoint(timestamp, false));
 
 			// still, nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -239,7 +239,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should not succeed
-			assertFalse(coord.triggerCheckpoint(timestamp));
+			assertFalse(coord.triggerCheckpoint(timestamp, false));
 
 			// still, nothing should be happening
 			assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -290,7 +290,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should succeed
-			assertTrue(coord.triggerCheckpoint(timestamp));
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
 			// validate that we have a pending checkpoint
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -417,10 +417,10 @@ public class CheckpointCoordinatorTest {
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should succeed
-			assertTrue(coord.triggerCheckpoint(timestamp));
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
 			// trigger second checkpoint, should also succeed
-			assertTrue(coord.triggerCheckpoint(timestamp + 2));
+			assertTrue(coord.triggerCheckpoint(timestamp + 2, false));
 
 			// validate that we have a pending checkpoint
 			assertEquals(2, coord.getNumberOfPendingCheckpoints());
@@ -538,7 +538,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should succeed
-			assertTrue(coord.triggerCheckpoint(timestamp));
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
 			// validate that we have a pending checkpoint
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -608,7 +608,7 @@ public class CheckpointCoordinatorTest {
 			// trigger another checkpoint and see that this one replaces the other checkpoint
 			// ---------------
 			final long timestampNew = timestamp + 7;
-			coord.triggerCheckpoint(timestampNew);
+			coord.triggerCheckpoint(timestampNew, false);
 
 			long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
 			CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L);
@@ -692,7 +692,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should succeed
-			assertTrue(coord.triggerCheckpoint(timestamp1));
+			assertTrue(coord.triggerCheckpoint(timestamp1, false));
 
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -713,7 +713,7 @@ public class CheckpointCoordinatorTest {
 
 			// start the second checkpoint
 			// trigger the first checkpoint. this should succeed
-			assertTrue(coord.triggerCheckpoint(timestamp2));
+			assertTrue(coord.triggerCheckpoint(timestamp2, false));
 
 			assertEquals(2, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -832,7 +832,7 @@ public class CheckpointCoordinatorTest {
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
 
 			// trigger the first checkpoint. this should succeed
-			assertTrue(coord.triggerCheckpoint(timestamp1));
+			assertTrue(coord.triggerCheckpoint(timestamp1, false));
 
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -853,7 +853,7 @@ public class CheckpointCoordinatorTest {
 
 			// start the second checkpoint
 			// trigger the first checkpoint. this should succeed
-			assertTrue(coord.triggerCheckpoint(timestamp2));
+			assertTrue(coord.triggerCheckpoint(timestamp2, false));
 
 			assertEquals(2, coord.getNumberOfPendingCheckpoints());
 			assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -955,7 +955,7 @@ public class CheckpointCoordinatorTest {
 					new DisabledCheckpointStatsTracker());
 
 			// trigger a checkpoint, partially acknowledged
-			assertTrue(coord.triggerCheckpoint(timestamp));
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
 			assertEquals(1, coord.getNumberOfPendingCheckpoints());
 
 			PendingCheckpoint checkpoint = coord.getPendingCheckpoints().values().iterator().next();
@@ -1023,7 +1023,7 @@ public class CheckpointCoordinatorTest {
 					null,
 					new DisabledCheckpointStatsTracker());
 
-			assertTrue(coord.triggerCheckpoint(timestamp));
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
 			long checkpointId = coord.getPendingCheckpoints().keySet().iterator().next();
 
@@ -1431,10 +1431,10 @@ public class CheckpointCoordinatorTest {
 		CheckpointMetaData checkpointMetaDataS1 = new CheckpointMetaData(savepointId1, 0L);
 		assertEquals(1, coord.getNumberOfPendingCheckpoints());
 
-		assertTrue(coord.triggerCheckpoint(timestamp + 1));
+		assertTrue(coord.triggerCheckpoint(timestamp + 1, false));
 		assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
-		assertTrue(coord.triggerCheckpoint(timestamp + 2));
+		assertTrue(coord.triggerCheckpoint(timestamp + 2, false));
 		long checkpointId2 = counter.getLast();
 		assertEquals(3, coord.getNumberOfPendingCheckpoints());
 
@@ -1450,7 +1450,7 @@ public class CheckpointCoordinatorTest {
 		assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
 		assertFalse(savepointFuture1.isDone());
 
-		assertTrue(coord.triggerCheckpoint(timestamp + 3));
+		assertTrue(coord.triggerCheckpoint(timestamp + 3, false));
 		assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
 		Future<CompletedCheckpoint> savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir);
@@ -1841,7 +1841,7 @@ public class CheckpointCoordinatorTest {
 				new DisabledCheckpointStatsTracker());
 
 		// trigger the checkpoint
-		coord.triggerCheckpoint(timestamp);
+		coord.triggerCheckpoint(timestamp, false);
 
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
@@ -1946,7 +1946,7 @@ public class CheckpointCoordinatorTest {
 				new DisabledCheckpointStatsTracker());
 
 		// trigger the checkpoint
-		coord.triggerCheckpoint(timestamp);
+		coord.triggerCheckpoint(timestamp, false);
 
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
@@ -2061,7 +2061,7 @@ public class CheckpointCoordinatorTest {
 				new DisabledCheckpointStatsTracker());
 
 		// trigger the checkpoint
-		coord.triggerCheckpoint(timestamp);
+		coord.triggerCheckpoint(timestamp, false);
 
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
@@ -2184,7 +2184,7 @@ public class CheckpointCoordinatorTest {
 				new DisabledCheckpointStatsTracker());
 
 		// trigger the checkpoint
-		coord.triggerCheckpoint(timestamp);
+		coord.triggerCheckpoint(timestamp, false);
 
 		assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
 		long checkpointId = Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
@@ -2298,7 +2298,7 @@ public class CheckpointCoordinatorTest {
 					"fake-directory",
 					new DisabledCheckpointStatsTracker());
 
-			assertTrue(coord.triggerCheckpoint(timestamp));
+			assertTrue(coord.triggerCheckpoint(timestamp, false));
 
 			for (PendingCheckpoint checkpoint : coord.getPendingCheckpoints().values()) {
 				CheckpointProperties props = checkpoint.getProps();
@@ -2654,6 +2654,48 @@ public class CheckpointCoordinatorTest {
 		}
 	}
 
+	@Test
+	public void testStopPeriodicScheduler() throws Exception {
+		// create some mock Execution vertices that receive the checkpoint trigger messages
+		final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+		ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+		// set up the coordinator and validate the initial state
+		CheckpointCoordinator coord = new CheckpointCoordinator(
+				new JobID(),
+				600000,
+				600000,
+				0,
+				Integer.MAX_VALUE,
+				ExternalizedCheckpointSettings.none(),
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				new ExecutionVertex[] { vertex1 },
+				new StandaloneCheckpointIDCounter(),
+				new StandaloneCompletedCheckpointStore(1),
+				null,
+				new DisabledCheckpointStatsTracker());
+
+		// Periodic
+		CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
+				System.currentTimeMillis(),
+				CheckpointProperties.forStandardCheckpoint(),
+				null,
+				true);
+
+		assertEquals(true, triggerResult.isFailure());
+		assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, triggerResult.getFailureReason());
+
+		// Not periodic
+		triggerResult = coord.triggerCheckpoint(
+				System.currentTimeMillis(),
+				CheckpointProperties.forStandardCheckpoint(),
+				null,
+				false);
+
+		assertEquals(false, triggerResult.isFailure());
+	}
+
 	private void testCreateKeyGroupPartitions(int maxParallelism, int parallelism) {
 		List<KeyGroupRange> ranges = CheckpointCoordinator.createKeyGroupPartitions(maxParallelism, parallelism);
 		for (int i = 0; i < maxParallelism; ++i) {
@@ -2664,7 +2706,6 @@ public class CheckpointCoordinatorTest {
 		}
 	}
 
-
 	@Test
 	public void testPartitionableStateRepartitioning() {
 		Random r = new Random(42);
@@ -2809,7 +2850,7 @@ public class CheckpointCoordinatorTest {
 		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
 		String targetDirectory = "xjasdkjakshdmmmxna";
 
-		CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(timestamp, props, targetDirectory);
+		CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(timestamp, props, targetDirectory, false);
 		assertEquals(true, triggerResult.isSuccess());
 
 		// validate that we have a pending checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index b4dcab5..950526c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -110,7 +110,7 @@ public class CheckpointStateRestoreTest {
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
-			coord.triggerCheckpoint(timestamp);
+			coord.triggerCheckpoint(timestamp, false);
 
 			PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next();
 			final long checkpointId = pending.getCheckpointId();
@@ -209,7 +209,7 @@ public class CheckpointStateRestoreTest {
 
 			// create ourselves a checkpoint with state
 			final long timestamp = 34623786L;
-			coord.triggerCheckpoint(timestamp);
+			coord.triggerCheckpoint(timestamp, false);
 
 			PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next();
 			final long checkpointId = pending.getCheckpointId();

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 2667743..84f0e1f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -210,7 +210,7 @@ public class PendingCheckpointTest {
 
 	private static PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, String targetDirectory) {
 		Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(ACK_TASKS);
-		return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, props, targetDirectory);
+		return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, false, props, targetDirectory);
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 9277029..5ec6991 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -60,7 +60,6 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
-import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -447,9 +446,9 @@ public class JobManagerHARecoveryTest {
 
 		@Override
 		public void setInitialState(
-			ChainedStateHandle<StreamStateHandle> chainedState,
-			List<KeyGroupsStateHandle> keyGroupsState,
-			List<Collection<OperatorStateHandle>> partitionableOperatorState) throws Exception {
+				ChainedStateHandle<StreamStateHandle> chainedState,
+				List<KeyGroupsStateHandle> keyGroupsState,
+				List<Collection<OperatorStateHandle>> partitionableOperatorState) throws Exception {
 			int subtaskIndex = getIndexInSubtaskGroup();
 			if (subtaskIndex < recoveredStates.length) {
 				try (FSDataInputStream in = chainedState.get(0).openInputStream()) {
@@ -465,10 +464,8 @@ public class JobManagerHARecoveryTest {
 						String.valueOf(UUID.randomUUID()),
 						InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId()));
 
-				RetrievableStreamStateHandle<Long> state = new RetrievableStreamStateHandle<Long>(byteStreamStateHandle);
-
 				ChainedStateHandle<StreamStateHandle> chainedStateHandle =
-						new ChainedStateHandle<StreamStateHandle>(Collections.singletonList(state));
+						new ChainedStateHandle<StreamStateHandle>(Collections.singletonList(byteStreamStateHandle));
 
 				CheckpointStateHandles checkpointStateHandles =
 						new CheckpointStateHandles(chainedStateHandle, null, Collections.<KeyGroupsStateHandle>emptyList());

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 8d150ac..183477a 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
+import akka.actor.PoisonPill;
 import akka.testkit.JavaTestKit;
 import com.typesafe.config.Config;
 import org.apache.flink.api.common.JobID;
@@ -27,6 +28,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -43,8 +45,15 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
@@ -67,12 +76,17 @@ import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhe
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
+import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testutils.StoppableInvokable;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import scala.Option;
 import scala.Some;
 import scala.Tuple2;
 import scala.concurrent.Await;
@@ -81,7 +95,9 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 import scala.reflect.ClassTag$;
 
+import java.io.File;
 import java.net.InetAddress;
+import java.util.Collections;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
@@ -94,6 +110,7 @@ import static org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.No
 import static org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
 import static org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -101,6 +118,9 @@ import static org.junit.Assert.fail;
 
 public class JobManagerTest {
 
+	@Rule
+	public TemporaryFolder tmpFolder = new TemporaryFolder();
+
 	private static ActorSystem system;
 
 	@BeforeClass
@@ -562,4 +582,126 @@ public class JobManagerTest {
 		JobStatusIs jobStatus = Await.result(failedFuture, deadline.timeLeft());
 		assertEquals(JobStatus.FAILED, jobStatus.state());
 	}
+
+	@Test
+	public void testCancelWithSavepoint() throws Exception {
+		File defaultSavepointDir = tmpFolder.newFolder();
+
+		FiniteDuration timeout = new FiniteDuration(30, TimeUnit.SECONDS);
+		Configuration config = new Configuration();
+		config.setString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, defaultSavepointDir.getAbsolutePath());
+
+		ActorSystem actorSystem = null;
+		ActorGateway jobManager = null;
+		ActorGateway archiver = null;
+		ActorGateway taskManager = null;
+		try {
+			actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
+
+			Tuple2<ActorRef, ActorRef> master = JobManager.startJobManagerActors(
+					config,
+					actorSystem,
+					Option.apply("jm"),
+					Option.apply("arch"),
+					TestingJobManager.class,
+					TestingMemoryArchivist.class);
+
+			jobManager = new AkkaActorGateway(master._1(), null);
+			archiver = new AkkaActorGateway(master._2(), null);
+
+			ActorRef taskManagerRef = TaskManager.startTaskManagerComponentsAndActor(
+					config,
+					ResourceID.generate(),
+					actorSystem,
+					"localhost",
+					Option.apply("tm"),
+					Option.<LeaderRetrievalService>apply(new StandaloneLeaderRetrievalService(jobManager.path())),
+					true,
+					TestingTaskManager.class);
+
+			taskManager = new AkkaActorGateway(taskManagerRef, null);
+
+			// Wait until connected
+			Object msg = new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
+			Await.ready(taskManager.ask(msg, timeout), timeout);
+
+			// Create job graph
+			JobVertex sourceVertex = new JobVertex("Source");
+			sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
+			sourceVertex.setParallelism(1);
+
+			JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
+
+			JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+					Collections.singletonList(sourceVertex.getID()),
+					Collections.singletonList(sourceVertex.getID()),
+					Collections.singletonList(sourceVertex.getID()),
+					3600000,
+					3600000,
+					0,
+					Integer.MAX_VALUE,
+					ExternalizedCheckpointSettings.none());
+
+			jobGraph.setSnapshotSettings(snapshottingSettings);
+
+			// Submit job graph
+			msg = new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED);
+			Await.result(jobManager.ask(msg, timeout), timeout);
+
+			// Wait for all tasks to be running
+			msg = new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+			Await.result(jobManager.ask(msg, timeout), timeout);
+
+			// Notify when canelled
+			msg = new NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED);
+			Future<Object> cancelled = jobManager.ask(msg, timeout);
+
+			// Cancel with savepoint
+			String savepointPath = null;
+
+			for (int i = 0; i < 10; i++) {
+				msg = new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(), null);
+				CancellationResponse cancelResp = (CancellationResponse) Await.result(jobManager.ask(msg, timeout), timeout);
+
+				if (cancelResp instanceof CancellationFailure) {
+					CancellationFailure failure = (CancellationFailure) cancelResp;
+					if (failure.cause().getMessage().contains(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING.message())) {
+						Thread.sleep(200); // wait and retry
+					} else {
+						failure.cause().printStackTrace();
+						fail("Failed to cancel job: " + failure.cause().getMessage());
+					}
+				} else {
+					savepointPath = ((CancellationSuccess) cancelResp).savepointPath();
+					break;
+				}
+			}
+
+			// Verify savepoint path
+			assertNotEquals("Savepoint not triggered", null, savepointPath);
+
+			// Wait for job status change
+			Await.ready(cancelled, timeout);
+
+			File savepointFile = new File(savepointPath);
+			assertEquals(true, savepointFile.exists());
+		} finally {
+			if (actorSystem != null) {
+				actorSystem.shutdown();
+			}
+
+			if (archiver != null) {
+				archiver.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (jobManager != null) {
+				jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+
+			if (taskManager != null) {
+				taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
+			}
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5783671c/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index fc2835d..74de942 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -64,6 +64,7 @@ import org.apache.flink.testutils.junit.RetryRule;
 import org.apache.flink.util.TestLogger;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -100,6 +101,9 @@ public class SavepointITCase extends TestLogger {
 	@Rule
 	public RetryRule retryRule = new RetryRule();
 
+	@Rule
+	public TemporaryFolder folder= new TemporaryFolder();
+
 	/**
 	 * Tests that it is possible to submit a job, trigger a savepoint, and
 	 * later restart the job on a new cluster. The savepoint is written to


Mime
View raw message