flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/2] flink git commit: [FLINK-4445] [client] Add allowNonRestoredState flag to CLI
Date Wed, 02 Nov 2016 07:10:22 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 d941b50db -> da32af123


[FLINK-4445] [client] Add allowNonRestoredState flag to CLI

Allow to specify whether a checkpoint restore should allow
checkpoint state that it cannot map to the program. This is
exposed via the CLI in the run command:

bin/flink run -s <savepointPath> -n ...

Furthermore, the savepoint restore settings are moved out of
the snapshotting settings.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f912615
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f912615
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f912615

Branch: refs/heads/release-1.1
Commit: 1f9126154bcde3e77e4b2541f7ab08897043bfff
Parents: d941b50
Author: Ufuk Celebi <uce@apache.org>
Authored: Wed Oct 26 18:00:21 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Nov 2 08:08:22 2016 +0100

----------------------------------------------------------------------
 docs/apis/cli.md                                |  90 +++++++++------
 .../org/apache/flink/client/CliFrontend.java    |   4 +-
 .../flink/client/cli/CliFrontendParser.java     |  17 ++-
 .../apache/flink/client/cli/ProgramOptions.java |  14 ++-
 .../flink/client/program/ClusterClient.java     |  25 ++--
 .../client/program/ContextEnvironment.java      |  13 ++-
 .../program/ContextEnvironmentFactory.java      |  11 +-
 .../client/program/DetachedEnvironment.java     |   7 +-
 .../flink/client/program/PackagedProgram.java   |  11 +-
 .../apache/flink/client/CliFrontendRunTest.java |  29 ++++-
 .../apache/flink/runtime/jobgraph/JobGraph.java |  40 ++++---
 .../jobgraph/SavepointRestoreSettings.java      | 113 +++++++++++++++++++
 .../jobgraph/tasks/JobSnapshottingSettings.java |  23 ----
 .../flink/runtime/jobmanager/JobManager.scala   |  22 ++--
 .../flink/runtime/jobmanager/JobSubmitTest.java |   3 +-
 .../environment/StreamContextEnvironment.java   |   2 +-
 .../test/checkpointing/SavepointITCase.java     |  11 +-
 17 files changed, 295 insertions(+), 140 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/docs/apis/cli.md
----------------------------------------------------------------------
diff --git a/docs/apis/cli.md b/docs/apis/cli.md
index 7209c3a..306b2da 100644
--- a/docs/apis/cli.md
+++ b/docs/apis/cli.md
@@ -150,6 +150,14 @@ Returns the path of the created savepoint. You need this path to restore and dis
 
 The run command has a savepoint flag to submit a job, which restores its state from a savepoint. The savepoint path is returned by the savepoint trigger command.
 
+By default, we try to match all savepoint state to the job being submitted. If you want to allow to skip savepoint state that cannot be restored with the new job you can set the `allowNonRestoredState` flag. You need to allow this if you removed an operator from your program that was part of the program when the savepoint was triggered and you still want to use the savepoint.
+
+{% highlight bash %}
+./bin/flink run -s <savepointPath> -n ...
+{% endhighlight %}
+
+This is useful if your program dropped an operator that was part of the savepoint.
+
 #### **Dispose a savepoint**
 
 {% highlight bash %}
@@ -179,41 +187,55 @@ Action "run" compiles and runs a program.
 
   Syntax: run [OPTIONS] <jar-file> <arguments>
   "run" action options:
-     -c,--class <classname>               Class with the program entry point
-                                          ("main" method or "getPlan()" method.
-                                          Only needed if the JAR file does not
-                                          specify the class in its manifest.
-     -C,--classpath <url>                 Adds a URL to each user code
-                                          classloader  on all nodes in the
-                                          cluster. The paths must specify a
-                                          protocol (e.g. file://) and be
-                                          accessible on all nodes (e.g. by means
-                                          of a NFS share). You can use this
-                                          option multiple times for specifying
-                                          more than one URL. The protocol must
-                                          be supported by the {@link
-                                          java.net.URLClassLoader}.
-     -d,--detached                        If present, runs the job in detached
-                                          mode
-     -m,--jobmanager <host:port>          Address of the JobManager (master) to
-                                          which to connect. Specify
-                                          'yarn-cluster' as the JobManager to
-                                          deploy a YARN cluster for the job. Use
-                                          this flag to connect to a different
-                                          JobManager than the one specified in
-                                          the configuration.
-     -p,--parallelism <parallelism>       The parallelism with which to run the
-                                          program. Optional flag to override the
-                                          default value specified in the
-                                          configuration.
-     -q,--sysoutLogging                   If present, supress logging output to
-                                          standard out.
-     -s,--fromSavepoint <savepointPath>   Path to a savepoint to reset the job
-                                          back to (for example
-                                          file:///flink/savepoint-1537).
-  Additional arguments if -m yarn-cluster is set:
+     -c,--class <classname>                         Class with the program entry
+                                                    point ("main" method or
+                                                    "getPlan()" method. Only
+                                                    needed if the JAR file does
+                                                    not specify the class in its
+                                                    manifest.
+     -C,--classpath <url>                           Adds a URL to each user code
+                                                    classloader  on all nodes in
+                                                    the cluster. The paths must
+                                                    specify a protocol (e.g.
+                                                    file://) and be accessible
+                                                    on all nodes (e.g. by means
+                                                    of a NFS share). You can use
+                                                    this option multiple times
+                                                    for specifying more than one
+                                                    URL. The protocol must be
+                                                    supported by the {@link
+                                                    java.net.URLClassLoader}.
+     -d,--detached                                  If present, runs the job in
+                                                    detached mode
+     -m,--jobmanager <host:port>                    Address of the JobManager
+                                                    (master) to which to
+                                                    connect. Use this flag to
+                                                    connect to a different
+                                                    JobManager than the one
+                                                    specified in the
+                                                    configuration.
+     -n,--allowNonRestoredState                     Allow non restored savepoint
+                                                    state in case an operator has
+                                                    been removed from the job.
+     -p,--parallelism <parallelism>                 The parallelism with which
+                                                    to run the program. Optional
+                                                    flag to override the default
+                                                    value specified in the
+                                                    configuration.
+     -q,--sysoutLogging                             If present, suppress logging
+                                                    output to standard out.
+     -s,--fromSavepoint <savepointPath>             Path to a savepoint to
+                                                    restore the job from (for
+                                                    example
+                                                    hdfs:///flink/savepoint-1537
+                                                    ).
+     -z,--zookeeperNamespace <zookeeperNamespace>   Namespace to create the
+                                                    Zookeeper sub-paths for high
+                                                    availability mode
+  Options for yarn-cluster mode:
      -yD <arg>                            Dynamic properties
      -yd,--yarndetached                   Start detached
+     -yid,--yarnapplicationId <arg>       Attach to running YARN session
      -yj,--yarnjar <arg>                  Path to Flink jar file
      -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container [in
                                           MB]
@@ -230,6 +252,8 @@ Action "run" compiles and runs a program.
                                           (t for transfer)
      -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container [in
                                           MB]
+     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
+                                          sub-paths for high availability mode
 
 
 Action "info" shows the optimized execution plan of the program (JSON).

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
index 929d02e..3a322dc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
@@ -239,7 +239,7 @@ public class CliFrontend {
 			client.setDetached(options.getDetachedMode());
 			LOG.debug("Client slots is set to {}", client.getMaxSlots());
 
-			LOG.debug("Savepoint path is set to {}", options.getSavepointPath());
+			LOG.debug(options.getSavepointRestoreSettings().toString());
 
 			int userParallelism = options.getParallelism();
 			LOG.debug("User parallelism is set to {}", userParallelism);
@@ -833,7 +833,7 @@ public class CliFrontend {
 				new PackagedProgram(jarFile, classpaths, programArgs) :
 				new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
 
-		program.setSavepointPath(options.getSavepointPath());
+		program.setSavepointRestoreSettings(options.getSavepointRestoreSettings());
 
 		return program;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 3ed383d..f88bbfa 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -70,7 +70,12 @@ public class CliFrontendParser {
 			"Use this flag to connect to a different JobManager than the one specified in the configuration.");
 
 	static final Option SAVEPOINT_PATH_OPTION = new Option("s", "fromSavepoint", true,
-			"Path to a savepoint to reset the job back to (for example file:///flink/savepoint-1537).");
+			"Path to a savepoint to restore the job from (for example hdfs:///flink/savepoint-1537).");
+
+	static final Option SAVEPOINT_ALLOW_NON_RESTORED_OPTION = new Option("n", "allowNonRestoredState", false,
+			"Allow to skip savepoint state that cannot be restored. " +
+					"You need to allow this if you removed an operator from your " +
+					"program that was part of the program when the savepoint was triggered.");
 
 	static final Option SAVEPOINT_DISPOSE_OPTION = new Option("d", "dispose", true,
 			"Path of savepoint to dispose.");
@@ -116,6 +121,8 @@ public class CliFrontendParser {
 		SAVEPOINT_PATH_OPTION.setRequired(false);
 		SAVEPOINT_PATH_OPTION.setArgName("savepointPath");
 
+		SAVEPOINT_ALLOW_NON_RESTORED_OPTION.setRequired(false);
+
 		ZOOKEEPER_NAMESPACE_OPTION.setRequired(false);
 		ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace");
 	}
@@ -146,7 +153,6 @@ public class CliFrontendParser {
 		options.addOption(ARGS_OPTION);
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
-		options.addOption(SAVEPOINT_PATH_OPTION);
 		options.addOption(ZOOKEEPER_NAMESPACE_OPTION);
 		return options;
 	}
@@ -157,13 +163,15 @@ public class CliFrontendParser {
 		options.addOption(PARALLELISM_OPTION);
 		options.addOption(LOGGING_OPTION);
 		options.addOption(DETACHED_OPTION);
-		options.addOption(SAVEPOINT_PATH_OPTION);
 		options.addOption(ZOOKEEPER_NAMESPACE_OPTION);
 		return options;
 	}
 
 	private static Options getRunOptions(Options options) {
 		options = getProgramSpecificOptions(options);
+		options.addOption(SAVEPOINT_PATH_OPTION);
+		options.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
+
 		options = getJobManagerAddressOption(options);
 		return addCustomCliOptions(options, true);
 	}
@@ -210,6 +218,9 @@ public class CliFrontendParser {
 
 	private static Options getRunOptionsWithoutDeprecatedOptions(Options options) {
 		Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
+		o.addOption(SAVEPOINT_PATH_OPTION);
+		o.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
+
 		return getJobManagerAddressOption(o);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
index 368ec19..80f573e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ProgramOptions.java
@@ -19,6 +19,7 @@ package org.apache.flink.client.cli;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import java.net.MalformedURLException;
 import java.net.URL;
@@ -33,6 +34,7 @@ import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.LOGGING_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.PARALLELISM_OPTION;
+import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_PATH_OPTION;
 
 /**
@@ -54,7 +56,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
 
 	private final boolean detachedMode;
 
-	private final String savepointPath;
+	private final SavepointRestoreSettings savepointSettings;
 
 	protected ProgramOptions(CommandLine line) throws CliArgsException {
 		super(line);
@@ -111,9 +113,11 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		detachedMode = line.hasOption(DETACHED_OPTION.getOpt());
 
 		if (line.hasOption(SAVEPOINT_PATH_OPTION.getOpt())) {
-			savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
+			String savepointPath = line.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
+			boolean allowNonRestoredState = line.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
+			this.savepointSettings = SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState);
 		} else {
-			savepointPath = null;
+			this.savepointSettings = SavepointRestoreSettings.none();
 		}
 	}
 
@@ -145,7 +149,7 @@ public abstract class ProgramOptions extends CommandLineOptions {
 		return detachedMode;
 	}
 
-	public String getSavepointPath() {
-		return savepointPath;
+	public SavepointRestoreSettings getSavepointRestoreSettings() {
+		return savepointSettings;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index 7092bfd..5e88af6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -50,6 +50,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.messages.GetClusterStatusResponse;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous;
 import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound;
@@ -314,7 +315,7 @@ public abstract class ClusterClient {
 				jobWithJars = prog.getPlanWithJars();
 			}
 
-			return run(jobWithJars, parallelism, prog.getSavepointPath());
+			return run(jobWithJars, parallelism, prog.getSavepointSettings());
 		}
 		else if (prog.isUsingInteractiveMode()) {
 			LOG.info("Starting program in interactive mode");
@@ -328,7 +329,7 @@ public abstract class ClusterClient {
 
 			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
 					prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
-					prog.getSavepointPath());
+					prog.getSavepointSettings());
 			ContextEnvironment.setAsContext(factory);
 
 			try {
@@ -357,7 +358,7 @@ public abstract class ClusterClient {
 	}
 
 	public JobSubmissionResult run(JobWithJars program, int parallelism) throws ProgramInvocationException {
-		return run(program, parallelism, null);
+		return run(program, parallelism, SavepointRestoreSettings.none());
 	}
 
 	/**
@@ -374,7 +375,7 @@ public abstract class ClusterClient {
 	 *                                    i.e. the job-manager is unreachable, or due to the fact that the
 	 *                                    parallel execution failed.
 	 */
-	public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, String savepointPath)
+	public JobSubmissionResult run(JobWithJars jobWithJars, int parallelism, SavepointRestoreSettings savepointSettings)
 			throws CompilerException, ProgramInvocationException {
 		ClassLoader classLoader = jobWithJars.getUserCodeClassLoader();
 		if (classLoader == null) {
@@ -382,19 +383,19 @@ public abstract class ClusterClient {
 		}
 
 		OptimizedPlan optPlan = getOptimizedPlan(compiler, jobWithJars, parallelism);
-		return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointPath);
+		return run(optPlan, jobWithJars.getJarFiles(), jobWithJars.getClasspaths(), classLoader, savepointSettings);
 	}
 
 	public JobSubmissionResult run(
 			FlinkPlan compiledPlan, List<URL> libraries, List<URL> classpaths, ClassLoader classLoader) throws ProgramInvocationException {
-		return run(compiledPlan, libraries, classpaths, classLoader, null);
+		return run(compiledPlan, libraries, classpaths, classLoader, SavepointRestoreSettings.none());
 	}
 
 	public JobSubmissionResult run(FlinkPlan compiledPlan,
-			List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, String savepointPath)
+			List<URL> libraries, List<URL> classpaths, ClassLoader classLoader, SavepointRestoreSettings savepointSettings)
 		throws ProgramInvocationException
 	{
-		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointPath);
+		JobGraph job = getJobGraph(compiledPlan, libraries, classpaths, savepointSettings);
 		return submitJob(job, classLoader);
 	}
 
@@ -613,15 +614,15 @@ public abstract class ClusterClient {
 		return getOptimizedPlan(compiler, prog.getPlan(), parallelism);
 	}
 
-	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, String savepointPath) throws ProgramInvocationException {
-		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointPath);
+	public JobGraph getJobGraph(PackagedProgram prog, FlinkPlan optPlan, SavepointRestoreSettings savepointSettings) throws ProgramInvocationException {
+		return getJobGraph(optPlan, prog.getAllLibraries(), prog.getClasspaths(), savepointSettings);
 	}
 
-	private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, String savepointPath) {
+	private JobGraph getJobGraph(FlinkPlan optPlan, List<URL> jarFiles, List<URL> classpaths, SavepointRestoreSettings savepointSettings) {
 		JobGraph job;
 		if (optPlan instanceof StreamingPlan) {
 			job = ((StreamingPlan) optPlan).getJobGraph();
-			job.setSavepointPath(savepointPath);
+			job.setSavepointRestoreSettings(savepointSettings);
 		} else {
 			JobGraphGenerator gen = new JobGraphGenerator(this.flinkConfig);
 			job = gen.compileJobGraph((OptimizedPlan) optPlan);

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index fe2d7e0..1ef94ce 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import java.net.URL;
 import java.util.List;
@@ -42,15 +43,15 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	
 	protected final ClassLoader userCodeClassLoader;
 
-	protected final String savepointPath;
+	protected final SavepointRestoreSettings savepointSettings;
 	
 	public ContextEnvironment(ClusterClient remoteConnection, List<URL> jarFiles, List<URL> classpaths,
-				ClassLoader userCodeClassLoader, String savepointPath) {
+				ClassLoader userCodeClassLoader, SavepointRestoreSettings savepointSettings) {
 		this.client = remoteConnection;
 		this.jarFilesToAttach = jarFiles;
 		this.classpathsToAttach = classpaths;
 		this.userCodeClassLoader = userCodeClassLoader;
-		this.savepointPath = savepointPath;
+		this.savepointSettings = savepointSettings;
 	}
 
 	@Override
@@ -58,7 +59,7 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		Plan p = createProgramPlan(jobName);
 		JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.classpathsToAttach,
 				this.userCodeClassLoader);
-		this.lastJobExecutionResult = client.run(toRun, getParallelism(), savepointPath).getJobExecutionResult();
+		this.lastJobExecutionResult = client.run(toRun, getParallelism(), savepointSettings).getJobExecutionResult();
 		return this.lastJobExecutionResult;
 	}
 
@@ -99,8 +100,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		return userCodeClassLoader;
 	}
 
-	public String getSavepointPath() {
-		return savepointPath;
+	public SavepointRestoreSettings getSavepointRestoreSettings() {
+		return savepointSettings;
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index f9b1fc2..0175d4c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import java.net.URL;
 import java.util.List;
@@ -46,11 +47,11 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
 	private ExecutionEnvironment lastEnvCreated;
 
-	private String savepointPath;
+	private SavepointRestoreSettings savepointSettings;
 
 	public ContextEnvironmentFactory(ClusterClient client, List<URL> jarFilesToAttach,
 			List<URL> classpathsToAttach, ClassLoader userCodeClassLoader, int defaultParallelism,
-			boolean isDetached, String savepointPath)
+			boolean isDetached, SavepointRestoreSettings savepointSettings)
 	{
 		this.client = client;
 		this.jarFilesToAttach = jarFilesToAttach;
@@ -58,7 +59,7 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 		this.userCodeClassLoader = userCodeClassLoader;
 		this.defaultParallelism = defaultParallelism;
 		this.isDetached = isDetached;
-		this.savepointPath = savepointPath;
+		this.savepointSettings = savepointSettings;
 	}
 
 	@Override
@@ -68,8 +69,8 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 		}
 
 		lastEnvCreated = isDetached ?
-				new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath):
-				new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath);
+				new DetachedEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings):
+				new ContextEnvironment(client, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings);
 		if (defaultParallelism > 0) {
 			lastEnvCreated.setParallelism(defaultParallelism);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
index 8298933..c67688f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/DetachedEnvironment.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,8 +47,8 @@ public class DetachedEnvironment extends ContextEnvironment {
 			List<URL> jarFiles,
 			List<URL> classpaths,
 			ClassLoader userCodeClassLoader,
-			String savepointPath) {
-		super(remoteConnection, jarFiles, classpaths, userCodeClassLoader, savepointPath);
+			SavepointRestoreSettings savepointSettings) {
+		super(remoteConnection, jarFiles, classpaths, userCodeClassLoader, savepointSettings);
 	}
 
 	@Override
@@ -72,7 +73,7 @@ public class DetachedEnvironment extends ContextEnvironment {
 	 * Finishes this Context Environment's execution by explicitly running the plan constructed.
 	 */
 	JobSubmissionResult finalizeExecute() throws ProgramInvocationException {
-		return client.run(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointPath);
+		return client.run(detachedPlan, jarFilesToAttach, classpathsToAttach, userCodeClassLoader, savepointSettings);
 	}
 
 	public static final class DetachedJobExecutionResult extends JobExecutionResult {

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 42e63c4..8931a3e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -49,6 +49,7 @@ import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.dag.DataSinkNode;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.util.InstantiationUtil;
 
 /**
@@ -86,7 +87,7 @@ public class PackagedProgram {
 	
 	private Plan plan;
 
-	private String savepointPath;
+	private SavepointRestoreSettings savepointSettings = SavepointRestoreSettings.none();
 
 	/**
 	 * Creates an instance that wraps the plan defined in the jar file using the given
@@ -257,12 +258,12 @@ public class PackagedProgram {
 		}
 	}
 
-	public void setSavepointPath(String savepointPath) {
-		this.savepointPath = savepointPath;
+	public void setSavepointRestoreSettings(SavepointRestoreSettings savepointSettings) {
+		this.savepointSettings = savepointSettings;
 	}
 
-	public String getSavepointPath() {
-		return savepointPath;
+	public SavepointRestoreSettings getSavepointSettings() {
+		return savepointSettings;
 	}
 
 	public String[] getArguments() {

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
index f710d8e..61d60e1 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/CliFrontendRunTest.java
@@ -19,17 +19,21 @@
 
 package org.apache.flink.client;
 
-import static org.apache.flink.client.CliFrontendTestUtils.*;
-import static org.junit.Assert.*;
-
 import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.client.cli.CommandLineOptions;
 import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.flink.client.CliFrontendTestUtils.getTestJarPath;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 
 public class CliFrontendRunTest {
 	
@@ -92,11 +96,24 @@ public class CliFrontendRunTest {
 				assertNotEquals(0, testFrontend.run(parameters));
 			}
 
-			// test configure savepoint path
+			// test configure savepoint path (no ignore flag)
 			{
 				String[] parameters = {"-s", "expectedSavepointPath", getTestJarPath()};
 				RunOptions options = CliFrontendParser.parseRunCommand(parameters);
-				assertEquals("expectedSavepointPath", options.getSavepointPath());
+				SavepointRestoreSettings savepointSettings = options.getSavepointRestoreSettings();
+				assertTrue(savepointSettings.restoreSavepoint());
+				assertEquals("expectedSavepointPath", savepointSettings.getRestorePath());
+				assertFalse(savepointSettings.allowNonRestoredState());
+			}
+
+			// test configure savepoint path (with ignore flag)
+			{
+				String[] parameters = {"-s", "expectedSavepointPath", "-n", getTestJarPath()};
+				RunOptions options = CliFrontendParser.parseRunCommand(parameters);
+				SavepointRestoreSettings savepointSettings = options.getSavepointRestoreSettings();
+				assertTrue(savepointSettings.restoreSavepoint());
+				assertEquals("expectedSavepointPath", savepointSettings.getRestorePath());
+				assertTrue(savepointSettings.allowNonRestoredState());
 			}
 
 			// test jar arguments

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f825d5b..4300d3e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -29,7 +29,6 @@ import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -46,6 +45,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts.
  * All programs from higher level APIs are transformed into JobGraphs.
@@ -102,6 +103,9 @@ public class JobGraph implements Serializable {
 	/** Job specific execution config */
 	private SerializedValue<ExecutionConfig> serializedExecutionConfig;
 
+	/** Savepoint restore settings. */
+	private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -240,12 +244,28 @@ public class JobGraph implements Serializable {
 	}
 
 	/**
+	 * Sets the savepoint restore settings.
+	 * @param settings The savepoint restore settings.
+	 */
+	public void setSavepointRestoreSettings(SavepointRestoreSettings settings) {
+		this.savepointRestoreSettings = checkNotNull(settings, "Savepoint restore settings");
+	}
+
+	/**
+	 * Returns the configured savepoint restore setting.
+	 * @return The configured savepoint restore settings.
+	 */
+	public SavepointRestoreSettings getSavepointRestoreSettings() {
+		return savepointRestoreSettings;
+	}
+
+	/**
 	 * Sets a serialized copy of the passed ExecutionConfig. Further modification of the referenced ExecutionConfig
 	 * object will not affect this serialized copy.
 	 * @param executionConfig The ExecutionConfig to be serialized.
 	 */
 	public void setExecutionConfig(ExecutionConfig executionConfig) {
-		Preconditions.checkNotNull(executionConfig, "ExecutionConfig must not be null.");
+		checkNotNull(executionConfig, "ExecutionConfig must not be null.");
 		try {
 			this.serializedExecutionConfig = new SerializedValue<>(executionConfig);
 		} catch (IOException e) {
@@ -342,22 +362,6 @@ public class JobGraph implements Serializable {
 		return classpaths;
 	}
 
-	/**
-	 * Sets the savepoint path to rollback the deployment to.
-	 *
-	 * @param savepointPath The savepoint path
-	 */
-	public void setSavepointPath(String savepointPath) {
-		if (savepointPath != null) {
-			if (snapshotSettings == null) {
-				throw new IllegalStateException("Checkpointing disabled");
-			}
-			else {
-				snapshotSettings.setSavepointPath(savepointPath);
-			}
-		}
-	}
-
 	// --------------------------------------------------------------------------------------------
 
 	public List<JobVertex> getVerticesSortedTopologicallyFromSources() throws InvalidProgramException {

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
new file mode 100644
index 0000000..48d3997
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointRestoreSettings.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph;
+
+import java.io.Serializable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Savepoint restore settings.
+ */
+public class SavepointRestoreSettings implements Serializable {
+
+	private static final long serialVersionUID = 87377506900849777L;
+
+	/** No restore should happen. */
+	private final static SavepointRestoreSettings NONE = new SavepointRestoreSettings(null, false);
+
+	/** By default, be strict when restoring from a savepoint.  */
+	private final static boolean DEFAULT_IGNORE_UNMAPPED_STATE = false;
+
+	/** Savepoint restore path. */
+	private final String restorePath;
+
+	/**
+	 * Flag indicating whether the restore should ignore if the savepoint contains
+	 * state for an operator that is not part of the job.
+	 */
+	private final boolean ignoreUnmappedState;
+
+	/**
+	 * Creates the restore settings.
+	 *
+	 * @param restorePath Savepoint restore path.
+	 * @param ignoreUnmappedState Ignore unmapped state.
+	 */
+	private SavepointRestoreSettings(String restorePath, boolean ignoreUnmappedState) {
+		this.restorePath = restorePath;
+		this.ignoreUnmappedState = ignoreUnmappedState;
+	}
+
+	/**
+	 * Returns whether to restore from savepoint.
+	 * @return <code>true</code> if should restore from savepoint.
+	 */
+	public boolean restoreSavepoint() {
+		return restorePath != null;
+	}
+
+	/**
+	 * Returns the path to the savepoint to restore from.
+	 * @return Path to the savepoint to restore from or <code>null</code> if
+	 * should not restore.
+	 */
+	public String getRestorePath() {
+		return restorePath;
+	}
+
+	/**
+	 * Returns whether the restore should ignore whether the savepoint contains
+	 * state that cannot be mapped to the job.
+	 *
+	 * @return <code>true</code> if restore should ignore whether the savepoint contains
+	 * state that cannot be mapped to the job.
+	 */
+	public boolean ignoreUnmappedState() {
+		return ignoreUnmappedState;
+	}
+
+	@Override
+	public String toString() {
+		if (restoreSavepoint()) {
+			return "SavepointRestoreSettings.forPath(" +
+					"restorePath='" + restorePath + '\'' +
+					", ignoreUnmappedState=" + ignoreUnmappedState +
+					')';
+		} else {
+			return "SavepointRestoreSettings.none()";
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static SavepointRestoreSettings none() {
+		return NONE;
+	}
+
+	public static SavepointRestoreSettings forPath(String savepointPath) {
+		return forPath(savepointPath, DEFAULT_IGNORE_UNMAPPED_STATE);
+	}
+
+	public static SavepointRestoreSettings forPath(String savepointPath, boolean ignoreUnmappedState) {
+		checkNotNull(savepointPath, "Savepoint restore path.");
+		return new SavepointRestoreSettings(savepointPath, ignoreUnmappedState);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
index ab701b5..edc3746 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -47,9 +47,6 @@ public class JobSnapshottingSettings implements java.io.Serializable{
 	private final long minPauseBetweenCheckpoints;
 	
 	private final int maxConcurrentCheckpoints;
-
-	/** Path to savepoint to reset state back to (optional, can be null) */
-	private String savepointPath;
 	
 	public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
 									List<JobVertexID> verticesToAcknowledge,
@@ -103,26 +100,6 @@ public class JobSnapshottingSettings implements java.io.Serializable{
 		return maxConcurrentCheckpoints;
 	}
 
-	/**
-	 * Sets the savepoint path.
-	 *
-	 * This is only set if the job shall be resumed from a savepoint on submission.
-	 *
-	 * @param savepointPath The path of the savepoint to resume from.
-	 */
-	public void setSavepointPath(String savepointPath) {
-		this.savepointPath = savepointPath;
-	}
-
-	/**
-	 * Returns the configured savepoint path or <code>null</code> if none is configured.
-	 *
-	 * @return The configured savepoint path or <code>null</code> if none is configured.
-	 */
-	public String getSavepointPath() {
-		return savepointPath;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 51cbedc..a28c25f 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -1293,19 +1293,17 @@ class JobManager(
           if (isRecovery) {
             executionGraph.restoreLatestCheckpointedState()
           } else {
-            val snapshotSettings = jobGraph.getSnapshotSettings
-            if (snapshotSettings != null) {
-              val savepointPath = snapshotSettings.getSavepointPath()
-
+            val savepointSettings = jobGraph.getSavepointRestoreSettings
+            if (savepointSettings.restoreSavepoint()) {
               // Reset state back to savepoint
-              if (savepointPath != null) {
-                try {
-                  executionGraph.restoreSavepoint(savepointPath)
-                } catch {
-                  case e: Exception =>
-                    jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(e)))
-                    throw new SuppressRestartsException(e)
-                }
+              val savepointPath = savepointSettings.getRestorePath()
+
+              try {
+                executionGraph.restoreSavepoint(savepointPath)
+              } catch {
+                case e: Exception =>
+                  jobInfo.client ! decorateMessage(JobResultFailure(new SerializedThrowable(e)))
+                  throw new SuppressRestartsException(e)
               }
             }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 93aed2d..536b729 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -202,7 +203,7 @@ public class JobSubmitTest {
 	public void testAnswerFailureWhenSavepointReadFails() throws Exception {
 		// create a simple job graph
 		JobGraph jg = createSimpleJobGraph();
-		jg.setSavepointPath("pathThatReallyDoesNotExist...");
+		jg.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("pathThatReallyDoesNotExist..."));
 
 		// submit the job
 		Future<Object> submitFuture = jmGateway.ask(

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index 0332684..be05550 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -65,7 +65,7 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 		} else {
 			return ctx
 				.getClient()
-				.run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointPath())
+				.run(streamGraph, ctx.getJars(), ctx.getClasspaths(), ctx.getUserCodeClassLoader(), ctx.getSavepointRestoreSettings())
 				.getJobExecutionResult();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1f912615/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 2d5bc3c..ca590f3 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -41,6 +41,7 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint;
 import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
@@ -278,7 +279,7 @@ public class SavepointITCase extends TestLogger {
 							}
 
 							// Set the savepoint path
-							jobGraph.setSavepointPath(savepointPath);
+							jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
 							LOG.info("Resubmitting job " + jobGraph.getJobID() + " with " +
 									"savepoint path " + savepointPath + " in detached mode.");
@@ -521,7 +522,7 @@ public class SavepointITCase extends TestLogger {
 			flink.start();
 
 			// Set the savepoint path
-			jobGraph.setSavepointPath(savepointPath);
+			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
 			LOG.info("Resubmitting job " + jobGraph.getJobID() + " with " +
 					"savepoint path " + savepointPath + " in detached mode.");
@@ -733,8 +734,8 @@ public class SavepointITCase extends TestLogger {
 			final JobGraph jobGraph = createJobGraph(parallelism, numberOfRetries, 3600000, 1000);
 
 			// Set non-existing savepoint path
-			jobGraph.setSavepointPath("unknown path");
-			assertEquals("unknown path", jobGraph.getSnapshotSettings().getSavepointPath());
+			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath("unknown path"));
+			assertEquals("unknown path", jobGraph.getSavepointRestoreSettings().getRestorePath());
 
 			LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
 
@@ -847,7 +848,7 @@ public class SavepointITCase extends TestLogger {
 
 			// Set source to fail on restore calls and try to recover from savepoint
 			RestoreStateCountingAndFailingSource.failOnRestoreStateCall = true;
-			jobGraph.setSavepointPath(savepointPath);
+			jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
 			try {
 				flink.submitJobAndWait(jobGraph, false, deadline.timeLeft());


Mime
View raw message