flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/7] flink git commit: [FLINK-5763] [checkpoints] Add CheckpointOptions
Date Thu, 23 Feb 2017 19:10:33 GMT
[FLINK-5763] [checkpoints] Add CheckpointOptions

Adds `CheckpointOptions` to the triggered checkpoint messages (coordinator
to barrier injecting tasks) and barriers (flowing inline with the data:

```java
public class CheckpointOptions {

  // Type of checkpoint
  // => FULL_CHECKPOINT
  // => SAVEPOINT
  @NonNull
  CheckpointType getCheckpointType();

  // Custom target location. This is a String, because for future
  // backends it can be a logical location like a DB table.
  @Nullable
  String getTargetLocation();

}
```

This class would be the place to define more options for performing the
checkpoints (for example for incremental checkpoints).

These options are forwarded via the `StreamTask` to the `StreamOperator`s and
`Snapshotable` backends. The `AbstractStreamOperator` checks the options and
either i) forwards the shared per operator `CheckpointStreamFactory` (as of

For this, the state backends provide the following new method:

```
CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String);
```

The `MemoryStateBackend` returns the regular stream factory and the
`FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all
checkpoint streams to a single directory (instead of the regular sub folders
per checkpoint).

We end up with the following directory layout for savepoints:

```
+---------------------------+
| :root_savepoint_directory | (custom per savepoint or configured default via `state.savepoints.dir`)
+---------------------------+
  | +---------------------------------------+
  +-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint)
    +---------------------------------------+
       |
       +- _metadata (one per savepoint)
       +- :uuid (one data file per StreamTask)
       +- ...
       +- :uuid
```


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e7a9174
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e7a9174
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e7a9174

Branch: refs/heads/master
Commit: 6e7a91741708a2b167a2bbca5dda5b2059df5e18
Parents: 1f9f38b
Author: Ufuk Celebi <uce@apache.org>
Authored: Thu Feb 16 17:56:23 2017 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Feb 23 18:39:49 2017 +0100

----------------------------------------------------------------------
 .../connectors/fs/RollingSinkITCase.java        |   1 -
 .../state/RocksDBKeyedStateBackend.java         |   5 +-
 .../streaming/state/RocksDBStateBackend.java    |   9 ++
 .../state/RocksDBAsyncSnapshotTest.java         |   8 +-
 .../state/RocksDBStateBackendTest.java          |  15 +-
 .../checkpoint/CheckpointCoordinator.java       |  56 ++++++--
 .../runtime/checkpoint/CheckpointOptions.java   | 108 +++++++++++++++
 .../runtime/checkpoint/CompletedCheckpoint.java |   2 +-
 .../runtime/checkpoint/PendingCheckpoint.java   |   3 +-
 .../checkpoint/savepoint/SavepointStore.java    | 137 +++++++++++++------
 .../flink/runtime/executiongraph/Execution.java |   6 +-
 .../io/network/api/CheckpointBarrier.java       |  44 +++++-
 .../api/serialization/EventSerializer.java      |  59 +++++++-
 .../runtime/jobgraph/tasks/StatefulTask.java    |   7 +-
 .../slots/ActorTaskManagerGateway.java          |   6 +-
 .../jobmanager/slots/TaskManagerGateway.java    |   5 +-
 .../jobmaster/RpcTaskManagerGateway.java        |   3 +-
 .../messages/checkpoint/TriggerCheckpoint.java  |  19 ++-
 .../state/AbstractKeyedStateBackend.java        |   3 +-
 .../runtime/state/AbstractStateBackend.java     |   8 ++
 .../state/DefaultOperatorStateBackend.java      |   8 +-
 .../flink/runtime/state/Snapshotable.java       |   5 +-
 .../flink/runtime/state/StateBackend.java       |  22 +++
 .../filesystem/FsCheckpointStreamFactory.java   |  21 +--
 .../filesystem/FsSavepointStreamFactory.java    |  58 ++++++++
 .../state/filesystem/FsStateBackend.java        |   9 ++
 .../state/heap/HeapKeyedStateBackend.java       |   4 +-
 .../state/memory/MemoryStateBackend.java        |   9 ++
 .../runtime/taskexecutor/TaskExecutor.java      |   5 +-
 .../taskexecutor/TaskExecutorGateway.java       |   4 +-
 .../apache/flink/runtime/taskmanager/Task.java  |  10 +-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   3 +-
 .../checkpoint/CheckpointCoordinatorTest.java   |  53 ++++---
 .../checkpoint/CheckpointOptionsTest.java       |  48 +++++++
 .../checkpoint/CheckpointStatsHistoryTest.java  |   1 +
 .../savepoint/MigrationV0ToV1Test.java          |   2 +-
 .../savepoint/SavepointLoaderTest.java          |   4 +-
 .../savepoint/SavepointStoreTest.java           |  48 +++++--
 .../io/network/api/CheckpointBarrierTest.java   |  61 +++++++++
 .../api/serialization/EventSerializerTest.java  |  45 ++++--
 .../io/network/api/writer/RecordWriterTest.java |   5 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   5 +-
 .../messages/CheckpointMessagesTest.java        |   3 +-
 .../runtime/state/OperatorStateBackendTest.java |   3 +-
 .../runtime/state/StateBackendTestBase.java     |  39 +++---
 .../FsSavepointStreamFactoryTest.java           |  67 +++++++++
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   9 +-
 .../api/operators/AbstractStreamOperator.java   |  43 +++++-
 .../api/operators/OperatorSnapshotResult.java   |   2 +-
 .../streaming/api/operators/StreamOperator.java |  12 +-
 .../streaming/runtime/io/BarrierBuffer.java     |   5 +-
 .../streaming/runtime/io/BarrierTracker.java    |   9 +-
 .../streaming/runtime/tasks/OperatorChain.java  |   5 +-
 .../streaming/runtime/tasks/StreamTask.java     |  65 +++++++--
 .../api/checkpoint/ListCheckpointedTest.java    |   2 +-
 .../operators/AbstractStreamOperatorTest.java   |  65 +++++----
 .../AbstractUdfStreamOperatorLifecycleTest.java |  12 +-
 .../WrappingFunctionSnapshotRestoreTest.java    |   2 +-
 .../operators/async/AsyncWaitOperatorTest.java  |   5 +-
 .../io/BarrierBufferAlignmentLimitTest.java     |  13 +-
 .../io/BarrierBufferMassiveRandomTest.java      |   3 +-
 .../streaming/runtime/io/BarrierBufferTest.java |  33 ++---
 .../runtime/io/BarrierTrackerTest.java          |   7 +-
 .../runtime/tasks/BlockingCheckpointsTest.java  |   8 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |  31 +++--
 .../runtime/tasks/SourceStreamTaskTest.java     |   3 +-
 .../StreamTaskCancellationBarrierTest.java      |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  37 ++---
 .../runtime/tasks/TwoInputStreamTaskTest.java   |  29 ++--
 .../util/AbstractStreamOperatorTestHarness.java |  10 +-
 .../KeyedOneInputStreamOperatorTestHarness.java |   7 +-
 .../test/checkpointing/SavepointITCase.java     |  51 ++++---
 .../streaming/runtime/StateBackendITCase.java   |   7 +-
 74 files changed, 1173 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
index 80ae294..72f2f21 100644
--- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
+++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
@@ -941,7 +941,6 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase {
 		}
 	}
 
-
 	private static class StreamWriterWithConfigCheck<T> extends StringWriter<T> {
 		private String key;
 		private String expect;

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a0efe78..bd8d4dd 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -38,6 +38,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.migration.MigrationNamespaceSerializerProxy;
 import org.apache.flink.migration.MigrationUtil;
 import org.apache.flink.migration.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable;
 import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -244,6 +245,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	 * @param checkpointId  The Id of the checkpoint.
 	 * @param timestamp     The timestamp of the checkpoint.
 	 * @param streamFactory The factory that we can use for writing our state to streams.
+	 * @param checkpointOptions Options for how to perform this checkpoint.
 	 * @return Future to the state handle of the snapshot data.
 	 * @throws Exception
 	 */
@@ -251,7 +253,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	public RunnableFuture<KeyGroupsStateHandle> snapshot(
 			final long checkpointId,
 			final long timestamp,
-			final CheckpointStreamFactory streamFactory) throws Exception {
+			final CheckpointStreamFactory streamFactory,
+			CheckpointOptions checkpointOptions) throws Exception {
 
 		long startTime = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 6b09a8a..3fd5d0f 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -219,6 +219,15 @@ public class RocksDBStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
+	public CheckpointStreamFactory createSavepointStreamFactory(
+			JobID jobId,
+			String operatorIdentifier,
+			String targetLocation) throws IOException {
+
+		return checkpointStreamBackend.createSavepointStreamFactory(jobId, operatorIdentifier, targetLocation);
+	}
+
+	@Override
 	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,
 			JobID jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index bce8028..90de7a6 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -186,7 +187,7 @@ public class RocksDBAsyncSnapshotTest {
 			}
 		}
 
-		task.triggerCheckpoint(new CheckpointMetaData(42, 17));
+		task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint());
 
 		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
 
@@ -266,7 +267,7 @@ public class RocksDBAsyncSnapshotTest {
 			}
 		}
 
-		task.triggerCheckpoint(new CheckpointMetaData(42, 17));
+		task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint());
 		testHarness.processElement(new StreamRecord<>("Wohoo", 0));
 		BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
 		task.cancel();
@@ -342,7 +343,8 @@ public class RocksDBAsyncSnapshotTest {
 			StringSerializer.INSTANCE,
 			new ValueStateDescriptor<>("foobar", String.class));
 
-		RunnableFuture<KeyGroupsStateHandle> snapshotFuture = keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshotFuture = keyedStateBackend.snapshot(
+			checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint());
 
 		try {
 			FutureUtil.runIfNotDoneAndGet(snapshotFuture);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index dc90666..c7b5c20 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -172,7 +173,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	@Test
 	public void testRunningSnapshotAfterBackendClosed() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
+			CheckpointOptions.forFullCheckpoint());
 
 		RocksDB spyDB = keyedStateBackend.db;
 
@@ -209,7 +211,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	@Test
 	public void testReleasingSnapshotAfterBackendClosed() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory,
+			CheckpointOptions.forFullCheckpoint());
 
 		RocksDB spyDB = keyedStateBackend.db;
 
@@ -237,7 +240,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	@Test
 	public void testDismissingSnapshot() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
 		snapshot.cancel(true);
 		verifyRocksObjectsReleased();
 	}
@@ -245,7 +248,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	@Test
 	public void testDismissingSnapshotNotRunnable() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
 		snapshot.cancel(true);
 		Thread asyncSnapshotThread = new Thread(snapshot);
 		asyncSnapshotThread.start();
@@ -262,7 +265,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	@Test
 	public void testCompletingSnapshot() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
 		Thread asyncSnapshotThread = new Thread(snapshot);
 		asyncSnapshotThread.start();
 		waiter.await(); // wait for snapshot to run
@@ -282,7 +285,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase<RocksDBStateBa
 	@Test
 	public void testCancelRunningSnapshot() throws Exception {
 		setupRocksKeyedStateBackend();
-		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory);
+		RunnableFuture<KeyGroupsStateHandle> snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint());
 		Thread asyncSnapshotThread = new Thread(snapshot);
 		asyncSnapshotThread.start();
 		waiter.await(); // wait for snapshot to run

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 36649ad..c1c65b5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -296,15 +298,42 @@ public class CheckpointCoordinator {
 		checkNotNull(targetDirectory, "Savepoint target directory");
 
 		CheckpointProperties props = CheckpointProperties.forStandardSavepoint();
-		CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory, false);
 
-		if (result.isSuccess()) {
-			return result.getPendingCheckpoint().getCompletionFuture();
-		}
-		else {
-			Throwable cause = new Exception("Failed to trigger savepoint: " + result.getFailureReason().message());
-			return FlinkCompletableFuture.completedExceptionally(cause);
+		// Create the unique savepoint directory
+		final String savepointDirectory = SavepointStore
+			.createSavepointDirectory(targetDirectory, job);
+
+		CheckpointTriggerResult triggerResult = triggerCheckpoint(
+			timestamp,
+			props,
+			savepointDirectory,
+			false);
+
+		Future<CompletedCheckpoint> result;
+
+		if (triggerResult.isSuccess()) {
+			result = triggerResult.getPendingCheckpoint().getCompletionFuture();
+		} else {
+			Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message());
+			result = FlinkCompletableFuture.completedExceptionally(cause);
 		}
+
+		// Make sure to remove the created base directory on Exceptions
+		result.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
+			@Override
+			public Void apply(Throwable value) {
+				try {
+					SavepointStore.deleteSavepointDirectory(savepointDirectory);
+				} catch (Throwable t) {
+					LOG.warn("Failed to delete savepoint directory " + savepointDirectory
+						+ " after failed savepoint.", t);
+				}
+
+				return null;
+			}
+		}, executor);
+
+		return result;
 	}
 
 	/**
@@ -517,9 +546,16 @@ public class CheckpointCoordinator {
 				}
 				// end of lock scope
 
+				CheckpointOptions checkpointOptions;
+				if (!props.isSavepoint()) {
+					checkpointOptions = CheckpointOptions.forFullCheckpoint();
+				} else {
+					checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory);
+				}
+
 				// send the messages to the tasks that trigger their checkpoint
 				for (Execution execution: executions) {
-					execution.triggerCheckpoint(checkpointID, timestamp);
+					execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
 				}
 
 				numUnsuccessfulCheckpointsTriggers.set(0);
@@ -756,7 +792,7 @@ public class CheckpointCoordinator {
 
 			triggerQueuedRequests();
 		}
-		
+
 		// record the time when this was completed, to calculate
 		// the 'min delay between checkpoints'
 		lastCheckpointCompletionNanos = System.nanoTime();
@@ -1030,7 +1066,7 @@ public class CheckpointCoordinator {
 			final ExecutionAttemptID executionAttemptID,
 			final long checkpointId,
 			final StateObject stateObject) {
-		
+
 		if (stateObject != null) {
 			executor.execute(new Runnable() {
 				@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
new file mode 100644
index 0000000..cb98d10
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.Serializable;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+
+/**
+ * Options for performing the checkpoint.
+ *
+ * <p>The {@link CheckpointProperties} are related and cover properties that
+ * are only relevant at the {@link CheckpointCoordinator}. These options are
+ * relevant at the {@link StatefulTask} instances running on task managers.
+ */
+public class CheckpointOptions implements Serializable {
+
+	private static final long serialVersionUID = 5010126558083292915L;
+
+	/** Type of the checkpoint. */
+	@Nonnull
+	private final CheckpointType checkpointType;
+
+	/** Target location for the checkpoint. */
+	@Nullable
+	private final String targetLocation;
+
+	private CheckpointOptions(
+			@Nonnull CheckpointType checkpointType,
+			String targetLocation) {
+		this.checkpointType = checkNotNull(checkpointType);
+		this.targetLocation = targetLocation;
+	}
+
+	/**
+	 * Returns the type of checkpoint to perform.
+	 *
+	 * @return Type of checkpoint to perform.
+	 */
+	@Nonnull
+	public CheckpointType getCheckpointType() {
+		return checkpointType;
+	}
+
+	/**
+	 * Returns a custom target location or <code>null</code> if none
+	 * was specified.
+	 *
+	 * @return A custom target location or <code>null</code>.
+	 */
+	@Nullable
+	public String getTargetLocation() {
+		return targetLocation;
+	}
+
+	@Override
+	public String toString() {
+		return "CheckpointOptions(" + checkpointType + ")";
+	}
+
+	// ------------------------------------------------------------------------
+
+	private static final CheckpointOptions FULL_CHECKPOINT = new CheckpointOptions(CheckpointType.FULL_CHECKPOINT, null);
+
+	public static CheckpointOptions forFullCheckpoint() {
+		return FULL_CHECKPOINT;
+	}
+
+	public static CheckpointOptions forSavepoint(String targetDirectory) {
+		checkNotNull(targetDirectory, "targetDirectory");
+		return new CheckpointOptions(CheckpointType.SAVEPOINT, targetDirectory);
+	}
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 *  The type of checkpoint to perform.
+	 */
+	public enum CheckpointType {
+
+		/** A full checkpoint. */
+		FULL_CHECKPOINT,
+
+		/** A savepoint. */
+		SAVEPOINT;
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 52f2a6a..53d888e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -159,7 +159,7 @@ public class CompletedCheckpoint implements Serializable {
 	void discard() throws Exception {
 		try {
 			if (externalPath != null) {
-				SavepointStore.removeSavepoint(externalPath);
+				SavepointStore.removeSavepointFile(externalPath);
 			}
 
 			StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 9f66314..908ff7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -214,7 +214,8 @@ public class PendingCheckpoint {
 					Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values());
 					externalPath = SavepointStore.storeSavepoint(
 							targetDirectory,
-							savepoint);
+							savepoint
+					);
 				} catch (IOException e) {
 					LOG.error("Failed to persist checkpoint {}.",checkpointId, e);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 48cca20..0caf5b2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -18,8 +18,16 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
-import org.apache.flink.core.fs.FSDataInputStream;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -28,14 +36,8 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
- * A file system based savepoint store.
+ * Utilities for storing and loading savepoint meta data files.
  *
  * <p>Stored savepoints have the following format:
  * <pre>
@@ -52,50 +54,84 @@ public class SavepointStore {
 	/** Magic number for sanity checks against stored savepoints. */
 	public static final int MAGIC_NUMBER = 0x4960672d;
 
-	/** Prefix for savepoint files. */
-	private static final String prefix = "savepoint-";
+	private static final String META_DATA_FILE = "_metadata ";
 
 	/**
-	 * Stores the savepoint.
+	 * Creates a savepoint directory.
 	 *
-	 * @param targetDirectory Target directory to store savepoint in
-	 * @param savepoint Savepoint to be stored
-	 * @param <T>       Savepoint type
-	 * @return Path of stored savepoint
-	 * @throws Exception Failures during store are forwarded
+	 * @param baseDirectory Base target directory for the savepoint
+	 * @param jobId Optional JobID the savepoint belongs to
+	 * @return The created savepoint directory
+	 * @throws IOException FileSystem operation failures are forwarded
 	 */
-	public static <T extends Savepoint> String storeSavepoint(
-			String targetDirectory,
-			T savepoint) throws IOException {
-
-		checkNotNull(targetDirectory, "Target directory");
-		checkNotNull(savepoint, "Savepoint");
+	public static String createSavepointDirectory(@Nonnull String baseDirectory, @Nullable JobID jobId) throws IOException {
+		String prefix;
+		if (jobId == null) {
+			prefix = "savepoint-";
+		} else {
+			prefix = String.format("savepoint-%s-", jobId.toString().substring(0, 6));
+		}
 
 		Exception latestException = null;
-		Path path = null;
-		FSDataOutputStream fdos = null;
+		Path savepointDirectory = null;
 
 		FileSystem fs = null;
 
 		// Try to create a FS output stream
 		for (int attempt = 0; attempt < 10; attempt++) {
-			path = new Path(targetDirectory, FileUtils.getRandomFilename(prefix));
+			Path path = new Path(baseDirectory, FileUtils.getRandomFilename(prefix));
 
 			if (fs == null) {
 				fs = FileSystem.get(path.toUri());
 			}
 
 			try {
-				fdos = fs.create(path, false);
-				break;
+				if (fs.mkdirs(path)) {
+					savepointDirectory = path;
+					break;
+				}
 			} catch (Exception e) {
 				latestException = e;
 			}
 		}
 
-		if (fdos == null) {
-			throw new IOException("Failed to create file output stream at " + path, latestException);
+		if (savepointDirectory == null) {
+			throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
+		} else {
+			return savepointDirectory.getPath();
 		}
+	}
+
+	/**
+	 * Deletes a savepoint directory.
+	 *
+	 * @param savepointDirectory Recursively deletes the given directory
+	 * @throws IOException FileSystem operation failures are forwarded
+	 */
+	public static void deleteSavepointDirectory(@Nonnull String savepointDirectory) throws IOException {
+		Path path = new Path(savepointDirectory);
+		FileSystem fs = FileSystem.get(path.toUri());
+		fs.delete(path, true);
+	}
+
+	/**
+	 * Stores the savepoint metadata file.
+	 *
+	 * @param <T>       Savepoint type
+	 * @param directory Target directory to store savepoint in
+	 * @param savepoint Savepoint to be stored
+	 * @return Path of stored savepoint
+	 * @throws Exception Failures during store are forwarded
+	 */
+	public static <T extends Savepoint> String storeSavepoint(String directory, T savepoint) throws IOException {
+		checkNotNull(directory, "Target directory");
+		checkNotNull(savepoint, "Savepoint");
+
+		Path basePath = new Path(directory);
+		FileSystem fs = FileSystem.get(basePath.toUri());
+
+		Path path = new Path(basePath, META_DATA_FILE);
+		FSDataOutputStream fdos = fs.create(path, false);
 
 		boolean success = false;
 		try (DataOutputStream dos = new DataOutputStream(fdos)) {
@@ -115,20 +151,41 @@ public class SavepointStore {
 			}
 		}
 
-		return path.toString();
+		return basePath.toString();
 	}
 
 	/**
 	 * Loads the savepoint at the specified path.
 	 *
-	 * @param path Path of savepoint to load
+	 * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
 	 * @return The loaded savepoint
 	 * @throws Exception Failures during load are forwared
 	 */
-	public static Savepoint loadSavepoint(String path, ClassLoader userClassLoader) throws IOException {
-		Preconditions.checkNotNull(path, "Path");
+	public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException {
+		Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
+
+		Path path = new Path(savepointFileOrDirectory);
+
+		LOG.info("Loading savepoint from {}", path);
 
-		try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
+		FileSystem fs = FileSystem.get(path.toUri());
+
+		FileStatus status = fs.getFileStatus(path);
+
+		// If this is a directory, we need to find the meta data file
+		if (status.isDir()) {
+			Path candidatePath = new Path(path, META_DATA_FILE);
+			if (fs.exists(candidatePath)) {
+				path = candidatePath;
+				LOG.info("Using savepoint file in {}", path);
+			} else {
+				throw new IOException("Cannot find meta data file in directory " + path
+					+ ". Please try to load the savepoint directly from the meta data file "
+					+ "instead of the directory.");
+			}
+		}
+
+		try (DataInputStream dis = new DataInputViewStreamWrapper(fs.open(path))) {
 			int magicNumber = dis.readInt();
 
 			if (magicNumber == MAGIC_NUMBER) {
@@ -152,7 +209,7 @@ public class SavepointStore {
 	 * @param path Path of savepoint to remove
 	 * @throws Exception Failures during disposal are forwarded
 	 */
-	public static void removeSavepoint(String path) throws IOException {
+	public static void removeSavepointFile(String path) throws IOException {
 		Preconditions.checkNotNull(path, "Path");
 
 		try {
@@ -173,14 +230,4 @@ public class SavepointStore {
 		}
 	}
 
-	private static FSDataInputStream createFsInputStream(Path path) throws IOException {
-		FileSystem fs = FileSystem.get(path.toUri());
-
-		if (fs.exists(path)) {
-			return fs.open(path);
-		} else {
-			throw new IllegalArgumentException("Invalid path '" + path.toUri() + "'.");
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b3fe443..3191d76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
@@ -675,14 +676,15 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution
 	 *
 	 * @param checkpointId of th checkpoint to trigger
 	 * @param timestamp of the checkpoint to trigger
+	 * @param checkpointOptions of the checkpoint to trigger
 	 */
-	public void triggerCheckpoint(long checkpointId, long timestamp) {
+	public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
 		final SimpleSlot slot = assignedResource;
 
 		if (slot != null) {
 			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
 
-			taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp);
+			taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
 		} else {
 			LOG.debug("The execution has no slot assigned. This indicates that the execution is " +
 				"no longer running.");

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
index 59f56b0..0752897 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/CheckpointBarrier.java
@@ -18,10 +18,15 @@
 
 package org.apache.flink.runtime.io.network.api;
 
+import static org.apache.flink.util.Preconditions.checkElementIndex;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 import java.io.IOException;
 
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.event.RuntimeEvent;
 
 /**
@@ -43,12 +48,14 @@ public class CheckpointBarrier extends RuntimeEvent {
 
 	private long id;
 	private long timestamp;
+	private CheckpointOptions checkpointOptions;
 
 	public CheckpointBarrier() {}
 
-	public CheckpointBarrier(long id, long timestamp) {
+	public CheckpointBarrier(long id, long timestamp, CheckpointOptions checkpointOptions) {
 		this.id = id;
 		this.timestamp = timestamp;
+		this.checkpointOptions = checkNotNull(checkpointOptions);
 	}
 
 	public long getId() {
@@ -59,20 +66,53 @@ public class CheckpointBarrier extends RuntimeEvent {
 		return timestamp;
 	}
 
+	public CheckpointOptions getCheckpointOptions() {
+		return checkpointOptions;
+	}
+
+	// ------------------------------------------------------------------------
+	// Serialization
 	// ------------------------------------------------------------------------
 	
 	@Override
 	public void write(DataOutputView out) throws IOException {
 		out.writeLong(id);
 		out.writeLong(timestamp);
+		CheckpointType checkpointType = checkpointOptions.getCheckpointType();
+
+		out.writeInt(checkpointType.ordinal());
+
+		if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+			return;
+		} else if (checkpointType == CheckpointType.SAVEPOINT) {
+			String targetLocation = checkpointOptions.getTargetLocation();
+			assert(targetLocation != null);
+			out.writeUTF(targetLocation);
+		} else {
+			throw new IOException("Unknown CheckpointType " + checkpointType);
+		}
 	}
 
 	@Override
 	public void read(DataInputView in) throws IOException {
 		id = in.readLong();
 		timestamp = in.readLong();
+
+		int typeOrdinal = in.readInt();
+		checkElementIndex(typeOrdinal, CheckpointType.values().length, "Unknown CheckpointType ordinal " + typeOrdinal);
+		CheckpointType checkpointType = CheckpointType.values()[typeOrdinal];
+
+		if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+			checkpointOptions = CheckpointOptions.forFullCheckpoint();
+		} else if (checkpointType == CheckpointType.SAVEPOINT) {
+			String targetLocation = in.readUTF();
+			checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
+		} else {
+			throw new IOException("Illegal CheckpointType " + checkpointType);
+		}
 	}
-	
+
+
 	// ------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
index 4d9f431..223cbfe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.runtime.io.network.api.serialization;
 
+import java.nio.charset.Charset;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -34,6 +37,7 @@ import org.apache.flink.util.InstantiationUtil;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Utility class to serialize and deserialize task events.
@@ -60,10 +64,34 @@ public class EventSerializer {
 		else if (eventClass == CheckpointBarrier.class) {
 			CheckpointBarrier barrier = (CheckpointBarrier) event;
 
-			ByteBuffer buf = ByteBuffer.allocate(20);
-			buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
-			buf.putLong(4, barrier.getId());
-			buf.putLong(12, barrier.getTimestamp());
+			CheckpointOptions checkpointOptions = barrier.getCheckpointOptions();
+			CheckpointType checkpointType = checkpointOptions.getCheckpointType();
+
+			ByteBuffer buf;
+			if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+				buf = ByteBuffer.allocate(24);
+				buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
+				buf.putLong(4, barrier.getId());
+				buf.putLong(12, barrier.getTimestamp());
+				buf.putInt(20, checkpointType.ordinal());
+			} else if (checkpointType == CheckpointType.SAVEPOINT) {
+				String targetLocation = checkpointOptions.getTargetLocation();
+				assert(targetLocation != null);
+				byte[] bytes = targetLocation.getBytes(Charset.forName("UTF-8"));
+
+				buf = ByteBuffer.allocate(24 + 4 + bytes.length);
+				buf.putInt(0, CHECKPOINT_BARRIER_EVENT);
+				buf.putLong(4, barrier.getId());
+				buf.putLong(12, barrier.getTimestamp());
+				buf.putInt(20, checkpointType.ordinal());
+				buf.putInt(24, bytes.length);
+				for (int i = 0; i < bytes.length; i++) {
+					buf.put(28 + i, bytes[i]);
+				}
+			} else {
+				throw new IOException("Unknown checkpoint type: " + checkpointType);
+			}
+
 			return buf;
 		}
 		else if (eventClass == EndOfSuperstepEvent.class) {
@@ -172,7 +200,28 @@ public class EventSerializer {
 			else if (type == CHECKPOINT_BARRIER_EVENT) {
 				long id = buffer.getLong();
 				long timestamp = buffer.getLong();
-				return new CheckpointBarrier(id, timestamp);
+
+				CheckpointOptions checkpointOptions;
+
+				int checkpointTypeOrdinal = buffer.getInt();
+				Preconditions.checkElementIndex(type, CheckpointType.values().length,
+					"Illegal CheckpointType ordinal " + checkpointTypeOrdinal);
+				CheckpointType checkpointType = CheckpointType.values()[checkpointTypeOrdinal];
+
+				if (checkpointType == CheckpointType.FULL_CHECKPOINT) {
+					checkpointOptions = CheckpointOptions.forFullCheckpoint();
+				} else if (checkpointType == CheckpointType.SAVEPOINT) {
+					int len = buffer.getInt();
+					byte[] bytes = new byte[len];
+					buffer.get(bytes);
+					String targetLocation = new String(bytes, Charset.forName("UTF-8"));
+
+					checkpointOptions = CheckpointOptions.forSavepoint(targetLocation);
+				} else {
+					throw new IOException("Unknown checkpoint type: " + checkpointType);
+				}
+
+				return new CheckpointBarrier(id, timestamp, checkpointOptions);
 			}
 			else if (type == END_OF_SUPERSTEP_EVENT) {
 				return EndOfSuperstepEvent.INSTANCE;

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 87b66ce..0930011 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobgraph.tasks;
 
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.state.TaskStateHandles;
 
 /**
@@ -46,21 +47,23 @@ public interface StatefulTask {
 	 * method.
 	 *
 	 * @param checkpointMetaData Meta data for about this checkpoint
+	 * @param checkpointOptions Options for performing this checkpoint
 	 *
 	 * @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
 	 */
-	boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData) throws Exception;
+	boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception;
 
 	/**
 	 * This method is called when a checkpoint is triggered as a result of receiving checkpoint
 	 * barriers on all input streams.
 	 * 
 	 * @param checkpointMetaData Meta data for about this checkpoint
+	 * @param checkpointOptions Options for performing this checkpoint
 	 * @param checkpointMetrics Metrics about this checkpoint
 	 * 
 	 * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
 	 */
-	void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics) throws Exception;
+	void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception;
 
 	/**
 	 * Aborts a checkpoint as the result of receiving possibly some checkpoint barriers,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
index fe4ecfb..2876ebe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/ActorTaskManagerGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.slots;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.clusterframework.messages.StopCluster;
 import org.apache.flink.runtime.concurrent.Future;
@@ -196,12 +197,13 @@ public class ActorTaskManagerGateway implements TaskManagerGateway {
 			ExecutionAttemptID executionAttemptID,
 			JobID jobId,
 			long checkpointId,
-			long timestamp) {
+			long timestamp,
+			CheckpointOptions checkpointOptions) {
 
 		Preconditions.checkNotNull(executionAttemptID);
 		Preconditions.checkNotNull(jobId);
 
-		actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp));
+		actorGateway.tell(new TriggerCheckpoint(jobId, executionAttemptID, checkpointId, timestamp, checkpointOptions));
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
index db0a3bf..09f104f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/TaskManagerGateway.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmanager.slots;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -160,12 +161,14 @@ public interface TaskManagerGateway {
 	 * @param jobId identifying the job to which the task belongs
 	 * @param checkpointId of the checkpoint to trigger
 	 * @param timestamp of the checkpoint to trigger
+	 * @param checkpointOptions of the checkpoint to trigger
 	 */
 	void triggerCheckpoint(
 		ExecutionAttemptID executionAttemptID,
 		JobID jobId,
 		long checkpointId,
-		long timestamp);
+		long timestamp,
+		CheckpointOptions checkpointOptions);
 
 	/**
 	 * Request the task manager log from the task manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
index eba97d2..28fef27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -123,7 +124,7 @@ public class RpcTaskManagerGateway implements TaskManagerGateway {
 	}
 
 	@Override
-	public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {
+	public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
 //		taskExecutorGateway.triggerCheckpoint(executionAttemptID, jobId, checkpointId, timestamp);
 		throw new UnsupportedOperationException("Operation is not yet supported.");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
index 0528755..3477e13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/TriggerCheckpoint.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.runtime.messages.checkpoint;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
 /**
@@ -33,9 +36,19 @@ public class TriggerCheckpoint extends AbstractCheckpointMessage implements java
 	/** The timestamp associated with the checkpoint */
 	private final long timestamp;
 
-	public TriggerCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) {
+	/** Options for how to perform the checkpoint. */
+	private final CheckpointOptions checkpointOptions;
+
+	public TriggerCheckpoint(
+			JobID job,
+			ExecutionAttemptID taskExecutionId,
+			long checkpointId,
+			long timestamp,
+			CheckpointOptions checkpointOptions) {
+
 		super(job, taskExecutionId, checkpointId);
 		this.timestamp = timestamp;
+		this.checkpointOptions = checkNotNull(checkpointOptions);
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -44,6 +57,10 @@ public class TriggerCheckpoint extends AbstractCheckpointMessage implements java
 		return timestamp;
 	}
 
+	public CheckpointOptions getCheckpointOptions() {
+		return checkpointOptions;
+	}
+
 	// --------------------------------------------------------------------------------------------
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
index 3ed49f1..14f897f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractKeyedStateBackend.java
@@ -36,6 +36,7 @@ import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.internal.InternalAggregatingState;
 import org.apache.flink.runtime.state.internal.InternalFoldingState;
@@ -54,7 +55,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Base implementation of KeyedStateBackend. The state can be checkpointed
- * to streams using {@link #snapshot(long, long, CheckpointStreamFactory)}.
+ * to streams using {@link #snapshot(long, long, CheckpointStreamFactory, CheckpointOptions)}.
  *
  * @param <K> Type of the key by which state is keyed.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index bc4594a..a335e45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
@@ -31,6 +32,7 @@ import java.io.IOException;
  */
 @PublicEvolving
 public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {
+
 	private static final long serialVersionUID = 4620415814639230247L;
 
 	@Override
@@ -39,6 +41,12 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
 			String operatorIdentifier) throws IOException;
 
 	@Override
+	public abstract CheckpointStreamFactory createSavepointStreamFactory(
+			JobID jobId,
+			String operatorIdentifier,
+			@Nullable String targetLocation) throws IOException;
+
+	@Override
 	public abstract <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,
 			JobID jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index adf0727..8dcf49e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -154,7 +155,10 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 	@Override
 	public RunnableFuture<OperatorStateHandle> snapshot(
-			long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
+			long checkpointId,
+			long timestamp,
+			CheckpointStreamFactory streamFactory,
+			CheckpointOptions checkpointOptions) throws Exception {
 
 		if (registeredStates.isEmpty()) {
 			return new DoneFuture<>(null);
@@ -346,4 +350,4 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			return partitionOffsets;
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
index a4a6bc4..0d92b46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import java.util.Collection;
 import java.util.concurrent.RunnableFuture;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 
 /**
  * Interface for operations that can perform snapshots of their state.
@@ -37,12 +38,14 @@ public interface Snapshotable<S extends StateObject> {
 	 * @param checkpointId  The ID of the checkpoint.
 	 * @param timestamp     The timestamp of the checkpoint.
 	 * @param streamFactory The factory that we can use for writing our state to streams.
+	 * @param checkpointOptions Options for how to perform this checkpoint.
 	 * @return A runnable future that will yield a {@link StateObject}.
 	 */
 	RunnableFuture<S> snapshot(
 			long checkpointId,
 			long timestamp,
-			CheckpointStreamFactory streamFactory) throws Exception;
+			CheckpointStreamFactory streamFactory,
+			CheckpointOptions checkpointOptions) throws Exception;
 
 	/**
 	 * Restores state that was previously snapshotted from the provided parameters. Typically the parameters are state

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 846df89..7961b5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
@@ -95,6 +96,27 @@ public interface StateBackend extends java.io.Serializable {
 	 */
 	CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException;
 
+	/**
+	 * Creates a {@link CheckpointStreamFactory} that can be used to create streams
+	 * that should end up in a savepoint.
+	 *
+	 * <p>This is only called if the triggered checkpoint is a savepoint. Commonly
+	 * this will return the same factory as for regular checkpoints, but maybe
+	 * slightly adjusted.
+	 *
+	 * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams.
+	 * @param operatorIdentifier An identifier of the operator for which we create streams.
+	 * @param targetLocation An optional custom location for the savepoint stream.
+	 * 
+	 * @return The stream factory for savepoints.
+	 * 
+	 * @throws IOException Failures during stream creation are forwarded.
+	 */
+	CheckpointStreamFactory createSavepointStreamFactory(
+			JobID jobId,
+			String operatorIdentifier,
+			@Nullable String targetLocation) throws IOException;
+
 	// ------------------------------------------------------------------------
 	//  Structure Backends 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 30b1da6..8455d84 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -94,18 +94,15 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 				MAX_FILE_STATE_THRESHOLD);
 		}
 		this.fileStateThreshold = fileStateSizeThreshold;
+
 		Path basePath = checkpointDataUri;
+		filesystem = basePath.getFileSystem();
 
-		Path dir = new Path(basePath, jobId.toString());
+		checkpointDirectory = createBasePath(filesystem, basePath, jobId);
 
 		if (LOG.isDebugEnabled()) {
-			LOG.debug("Initializing file stream factory to URI {}.", dir);
+			LOG.debug("Initialed file stream factory to URI {}.", checkpointDirectory);
 		}
-
-		filesystem = basePath.getFileSystem();
-		filesystem.mkdirs(dir);
-
-		checkpointDirectory = dir;
 	}
 
 	@Override
@@ -115,7 +112,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 	public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
 		checkFileSystemInitialized();
 
-		Path checkpointDir = createCheckpointDirPath(checkpointID);
+		Path checkpointDir = createCheckpointDirPath(checkpointDirectory, checkpointID);
 		int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
 		return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
 	}
@@ -130,7 +127,13 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
 		}
 	}
 
-	private Path createCheckpointDirPath(long checkpointID) {
+	protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException {
+		Path dir = new Path(checkpointDirectory, jobID.toString());
+		fs.mkdirs(dir);
+		return dir;
+	}
+
+	protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) {
 		return new Path(checkpointDirectory, "chk-" + checkpointID);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
new file mode 100644
index 0000000..7410d2d
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import java.io.IOException;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+
+/**
+ * A {@link CheckpointStreamFactory} that produces streams that write to a
+ * {@link FileSystem}.
+ *
+ * <p>The difference to the parent {@link FsCheckpointStreamFactory} is only
+ * in the created directory layout. All checkpoint files go to the checkpoint
+ * directory.
+ */
+public class FsSavepointStreamFactory extends FsCheckpointStreamFactory {
+
+	public FsSavepointStreamFactory(
+			Path checkpointDataUri,
+			JobID jobId,
+			int fileStateSizeThreshold) throws IOException {
+
+		super(checkpointDataUri, jobId, fileStateSizeThreshold);
+	}
+
+	@Override
+	protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException {
+		// No checkpoint specific directory required as the savepoint directory
+		// is already unique.
+		return checkpointDirectory;
+	}
+
+	@Override
+	protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) {
+		// No checkpoint specific directory required as the savepoint directory
+		// is already unique.
+		return checkpointDirectory;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index 281dbb0..b614d98 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -173,6 +173,15 @@ public class FsStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
+	public CheckpointStreamFactory createSavepointStreamFactory(
+			JobID jobId,
+			String operatorIdentifier,
+			String targetLocation) throws IOException {
+
+		return new FsSavepointStreamFactory(new Path(targetLocation), jobId, fileStateThreshold);
+	}
+
+	@Override
 	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env,
 			JobID jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
index 04e4fbc..4a5455a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
@@ -40,6 +40,7 @@ import org.apache.flink.migration.MigrationUtil;
 import org.apache.flink.migration.runtime.state.KvStateSnapshot;
 import org.apache.flink.migration.runtime.state.filesystem.AbstractFsStateSnapshot;
 import org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.ArrayListSerializer;
@@ -215,7 +216,8 @@ public class HeapKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
 	public RunnableFuture<KeyGroupsStateHandle> snapshot(
 			long checkpointId,
 			long timestamp,
-			CheckpointStreamFactory streamFactory) throws Exception {
+			CheckpointStreamFactory streamFactory,
+			CheckpointOptions checkpointOptions) throws Exception {
 
 		if (stateTables.isEmpty()) {
 			return new DoneFuture<>(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index 58a86df..2cc1164 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -75,6 +75,15 @@ public class MemoryStateBackend extends AbstractStateBackend {
 	}
 
 	@Override
+	public CheckpointStreamFactory createSavepointStreamFactory(
+			JobID jobId,
+			String operatorIdentifier,
+			String targetLocation) throws IOException {
+
+		return new MemCheckpointStreamFactory(maxStateSize);
+	}
+
+	@Override
 	public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
 			Environment env, JobID jobID,
 			String operatorIdentifier,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 2980376..8db1d5b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -475,13 +476,13 @@ public class TaskExecutor extends RpcEndpoint<TaskExecutorGateway> {
 	// ----------------------------------------------------------------------
 
 	@RpcMethod
-	public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException {
+	public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) throws CheckpointException {
 		log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);
 
 		final Task task = taskSlotTable.getTask(executionAttemptID);
 
 		if (task != null) {
-			task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp);
+			task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);
 
 			return Acknowledge.get();
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index ebd4c0c..36a3255 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
@@ -97,9 +98,10 @@ public interface TaskExecutorGateway extends RpcGateway {
 	 * @param executionAttemptID identifying the task
 	 * @param checkpointID unique id for the checkpoint
 	 * @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
+	 * @param checkpointOptions for performing the checkpoint
 	 * @return Future acknowledge if the checkpoint has been successfully triggered
 	 */
-	Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp);
+	Future<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions);
 
 	/**
 	 * Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index acb423b..c9f17b8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
 import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -1117,8 +1118,13 @@ public class Task implements Runnable, TaskActions {
 	 * 
 	 * @param checkpointID The ID identifying the checkpoint.
 	 * @param checkpointTimestamp The timestamp associated with the checkpoint.
+	 * @param checkpointOptions Options for performing this checkpoint.
 	 */
-	public void triggerCheckpointBarrier(final long checkpointID, long checkpointTimestamp) {
+	public void triggerCheckpointBarrier(
+			final long checkpointID,
+			long checkpointTimestamp,
+			final CheckpointOptions checkpointOptions) {
+
 		final AbstractInvokable invokable = this.invokable;
 		final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);
 
@@ -1134,7 +1140,7 @@ public class Task implements Runnable, TaskActions {
 						// activate safety net for checkpointing thread
 						FileSystemSafetyNet.initializeSafetyNetForThread();
 						try {
-							boolean success = statefulTask.triggerCheckpoint(checkpointMetaData);
+							boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);
 							if (!success) {
 								checkpointResponder.declineCheckpoint(
 										getJobID(), getExecutionId(), checkpointID,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 8b08181..21749cb 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -837,7 +837,7 @@ class JobManager(
           savepoint.dispose()
 
           // Remove the header file
-          SavepointStore.removeSavepoint(savepointPath)
+          SavepointStore.removeSavepointFile(savepointPath)
 
           senderRef ! DisposeSavepointSuccess
         } catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index a70454b..25d5366 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -501,12 +501,13 @@ class TaskManager(
         val taskExecutionId = message.getTaskExecutionId
         val checkpointId = message.getCheckpointId
         val timestamp = message.getTimestamp
+        val checkpointOptions = message.getCheckpointOptions
 
         log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")
 
         val task = runningTasks.get(taskExecutionId)
         if (task != null) {
-          task.triggerCheckpointBarrier(checkpointId, timestamp)
+          task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions)
         } else {
           log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")
         }


Mime
View raw message