Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 54070200C24 for ; Thu, 23 Feb 2017 20:10:33 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 52BAE160B3E; Thu, 23 Feb 2017 19:10:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D8A55160B7E for ; Thu, 23 Feb 2017 20:10:30 +0100 (CET) Received: (qmail 7594 invoked by uid 500); 23 Feb 2017 19:10:30 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 7236 invoked by uid 99); 23 Feb 2017 19:10:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 23 Feb 2017 19:10:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 963DDDFF0B; Thu, 23 Feb 2017 19:10:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Thu, 23 Feb 2017 19:10:33 -0000 Message-Id: <1d62b6d169124bc28d38505191083c70@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [5/7] flink git commit: [FLINK-5763] [checkpoints] Add CheckpointOptions archived-at: Thu, 23 Feb 2017 19:10:33 -0000 [FLINK-5763] [checkpoints] Add CheckpointOptions Adds `CheckpointOptions` to the triggered checkpoint messages (coordinator to barrier injecting tasks) and barriers (flowing inline with the data: ```java public class CheckpointOptions { // Type of checkpoint // => FULL_CHECKPOINT // => SAVEPOINT @NonNull CheckpointType getCheckpointType(); // Custom target location. This is a String, because for future // backends it can be a logical location like a DB table. @Nullable String getTargetLocation(); } ``` This class would be the place to define more options for performing the checkpoints (for example for incremental checkpoints). These options are forwarded via the `StreamTask` to the `StreamOperator`s and `Snapshotable` backends. The `AbstractStreamOperator` checks the options and either i) forwards the shared per operator `CheckpointStreamFactory` (as of For this, the state backends provide the following new method: ``` CheckpointStreamFactory createSavepointStreamFactory(JobID, String, String); ``` The `MemoryStateBackend` returns the regular stream factory and the `FsStateBackend` returns a `FsSavepointStreamFactory`, which writes all checkpoint streams to a single directory (instead of the regular sub folders per checkpoint). We end up with the following directory layout for savepoints: ``` +---------------------------+ | :root_savepoint_directory | (custom per savepoint or configured default via `state.savepoints.dir`) +---------------------------+ | +---------------------------------------+ +-| savepoint-:jobId(0, 6)-:random_suffix | (one directory per savepoint) +---------------------------------------+ | +- _metadata (one per savepoint) +- :uuid (one data file per StreamTask) +- ... +- :uuid ``` Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e7a9174 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e7a9174 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e7a9174 Branch: refs/heads/master Commit: 6e7a91741708a2b167a2bbca5dda5b2059df5e18 Parents: 1f9f38b Author: Ufuk Celebi Authored: Thu Feb 16 17:56:23 2017 +0100 Committer: Stephan Ewen Committed: Thu Feb 23 18:39:49 2017 +0100 ---------------------------------------------------------------------- .../connectors/fs/RollingSinkITCase.java | 1 - .../state/RocksDBKeyedStateBackend.java | 5 +- .../streaming/state/RocksDBStateBackend.java | 9 ++ .../state/RocksDBAsyncSnapshotTest.java | 8 +- .../state/RocksDBStateBackendTest.java | 15 +- .../checkpoint/CheckpointCoordinator.java | 56 ++++++-- .../runtime/checkpoint/CheckpointOptions.java | 108 +++++++++++++++ .../runtime/checkpoint/CompletedCheckpoint.java | 2 +- .../runtime/checkpoint/PendingCheckpoint.java | 3 +- .../checkpoint/savepoint/SavepointStore.java | 137 +++++++++++++------ .../flink/runtime/executiongraph/Execution.java | 6 +- .../io/network/api/CheckpointBarrier.java | 44 +++++- .../api/serialization/EventSerializer.java | 59 +++++++- .../runtime/jobgraph/tasks/StatefulTask.java | 7 +- .../slots/ActorTaskManagerGateway.java | 6 +- .../jobmanager/slots/TaskManagerGateway.java | 5 +- .../jobmaster/RpcTaskManagerGateway.java | 3 +- .../messages/checkpoint/TriggerCheckpoint.java | 19 ++- .../state/AbstractKeyedStateBackend.java | 3 +- .../runtime/state/AbstractStateBackend.java | 8 ++ .../state/DefaultOperatorStateBackend.java | 8 +- .../flink/runtime/state/Snapshotable.java | 5 +- .../flink/runtime/state/StateBackend.java | 22 +++ .../filesystem/FsCheckpointStreamFactory.java | 21 +-- .../filesystem/FsSavepointStreamFactory.java | 58 ++++++++ .../state/filesystem/FsStateBackend.java | 9 ++ .../state/heap/HeapKeyedStateBackend.java | 4 +- .../state/memory/MemoryStateBackend.java | 9 ++ .../runtime/taskexecutor/TaskExecutor.java | 5 +- .../taskexecutor/TaskExecutorGateway.java | 4 +- .../apache/flink/runtime/taskmanager/Task.java | 10 +- .../flink/runtime/jobmanager/JobManager.scala | 2 +- .../flink/runtime/taskmanager/TaskManager.scala | 3 +- .../checkpoint/CheckpointCoordinatorTest.java | 53 ++++--- .../checkpoint/CheckpointOptionsTest.java | 48 +++++++ .../checkpoint/CheckpointStatsHistoryTest.java | 1 + .../savepoint/MigrationV0ToV1Test.java | 2 +- .../savepoint/SavepointLoaderTest.java | 4 +- .../savepoint/SavepointStoreTest.java | 48 +++++-- .../io/network/api/CheckpointBarrierTest.java | 61 +++++++++ .../api/serialization/EventSerializerTest.java | 45 ++++-- .../io/network/api/writer/RecordWriterTest.java | 5 +- .../jobmanager/JobManagerHARecoveryTest.java | 5 +- .../messages/CheckpointMessagesTest.java | 3 +- .../runtime/state/OperatorStateBackendTest.java | 3 +- .../runtime/state/StateBackendTestBase.java | 39 +++--- .../FsSavepointStreamFactoryTest.java | 67 +++++++++ .../runtime/taskmanager/TaskAsyncCallTest.java | 9 +- .../api/operators/AbstractStreamOperator.java | 43 +++++- .../api/operators/OperatorSnapshotResult.java | 2 +- .../streaming/api/operators/StreamOperator.java | 12 +- .../streaming/runtime/io/BarrierBuffer.java | 5 +- .../streaming/runtime/io/BarrierTracker.java | 9 +- .../streaming/runtime/tasks/OperatorChain.java | 5 +- .../streaming/runtime/tasks/StreamTask.java | 65 +++++++-- .../api/checkpoint/ListCheckpointedTest.java | 2 +- .../operators/AbstractStreamOperatorTest.java | 65 +++++---- .../AbstractUdfStreamOperatorLifecycleTest.java | 12 +- .../WrappingFunctionSnapshotRestoreTest.java | 2 +- .../operators/async/AsyncWaitOperatorTest.java | 5 +- .../io/BarrierBufferAlignmentLimitTest.java | 13 +- .../io/BarrierBufferMassiveRandomTest.java | 3 +- .../streaming/runtime/io/BarrierBufferTest.java | 33 ++--- .../runtime/io/BarrierTrackerTest.java | 7 +- .../runtime/tasks/BlockingCheckpointsTest.java | 8 +- .../runtime/tasks/OneInputStreamTaskTest.java | 31 +++-- .../runtime/tasks/SourceStreamTaskTest.java | 3 +- .../StreamTaskCancellationBarrierTest.java | 4 +- .../streaming/runtime/tasks/StreamTaskTest.java | 37 ++--- .../runtime/tasks/TwoInputStreamTaskTest.java | 29 ++-- .../util/AbstractStreamOperatorTestHarness.java | 10 +- .../KeyedOneInputStreamOperatorTestHarness.java | 7 +- .../test/checkpointing/SavepointITCase.java | 51 ++++--- .../streaming/runtime/StateBackendITCase.java | 7 +- 74 files changed, 1173 insertions(+), 354 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java ---------------------------------------------------------------------- diff --git a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java index 80ae294..72f2f21 100644 --- a/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java +++ b/flink-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java @@ -941,7 +941,6 @@ public class RollingSinkITCase extends StreamingMultipleProgramsTestBase { } } - private static class StreamWriterWithConfigCheck extends StringWriter { private String key; private String expect; http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index a0efe78..bd8d4dd 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -38,6 +38,7 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.migration.MigrationNamespaceSerializerProxy; import org.apache.flink.migration.MigrationUtil; import org.apache.flink.migration.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.async.AbstractAsyncIOCallable; import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -244,6 +245,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { * @param checkpointId The Id of the checkpoint. * @param timestamp The timestamp of the checkpoint. * @param streamFactory The factory that we can use for writing our state to streams. + * @param checkpointOptions Options for how to perform this checkpoint. * @return Future to the state handle of the snapshot data. * @throws Exception */ @@ -251,7 +253,8 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { public RunnableFuture snapshot( final long checkpointId, final long timestamp, - final CheckpointStreamFactory streamFactory) throws Exception { + final CheckpointStreamFactory streamFactory, + CheckpointOptions checkpointOptions) throws Exception { long startTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index 6b09a8a..3fd5d0f 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -219,6 +219,15 @@ public class RocksDBStateBackend extends AbstractStateBackend { } @Override + public CheckpointStreamFactory createSavepointStreamFactory( + JobID jobId, + String operatorIdentifier, + String targetLocation) throws IOException { + + return checkpointStreamBackend.createSavepointStreamFactory(jobId, operatorIdentifier, targetLocation); + } + + @Override public AbstractKeyedStateBackend createKeyedStateBackend( Environment env, JobID jobID, http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index bce8028..90de7a6 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.SubtaskState; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -186,7 +187,7 @@ public class RocksDBAsyncSnapshotTest { } } - task.triggerCheckpoint(new CheckpointMetaData(42, 17)); + task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint()); testHarness.processElement(new StreamRecord<>("Wohoo", 0)); @@ -266,7 +267,7 @@ public class RocksDBAsyncSnapshotTest { } } - task.triggerCheckpoint(new CheckpointMetaData(42, 17)); + task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint()); testHarness.processElement(new StreamRecord<>("Wohoo", 0)); BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await(); task.cancel(); @@ -342,7 +343,8 @@ public class RocksDBAsyncSnapshotTest { StringSerializer.INSTANCE, new ValueStateDescriptor<>("foobar", String.class)); - RunnableFuture snapshotFuture = keyedStateBackend.snapshot(checkpointId, timestamp, checkpointStreamFactory); + RunnableFuture snapshotFuture = keyedStateBackend.snapshot( + checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint()); try { FutureUtil.runIfNotDoneAndGet(snapshotFuture); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index dc90666..c7b5c20 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -26,6 +26,7 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.runtime.query.TaskKvStateRegistry; @@ -172,7 +173,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, + CheckpointOptions.forFullCheckpoint()); RocksDB spyDB = keyedStateBackend.db; @@ -209,7 +211,8 @@ public class RocksDBStateBackendTest extends StateBackendTestBase snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, + CheckpointOptions.forFullCheckpoint()); RocksDB spyDB = keyedStateBackend.db; @@ -237,7 +240,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); snapshot.cancel(true); verifyRocksObjectsReleased(); } @@ -245,7 +248,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); snapshot.cancel(true); Thread asyncSnapshotThread = new Thread(snapshot); asyncSnapshotThread.start(); @@ -262,7 +265,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); Thread asyncSnapshotThread = new Thread(snapshot); asyncSnapshotThread.start(); waiter.await(); // wait for snapshot to run @@ -282,7 +285,7 @@ public class RocksDBStateBackendTest extends StateBackendTestBase snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory); + RunnableFuture snapshot = keyedStateBackend.snapshot(0L, 0L, testStreamFactory, CheckpointOptions.forFullCheckpoint()); Thread asyncSnapshotThread = new Thread(snapshot); asyncSnapshotThread.start(); waiter.await(); // wait for snapshot to run http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 36649ad..c1c65b5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -21,6 +21,8 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; +import org.apache.flink.runtime.concurrent.ApplyFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; import org.apache.flink.runtime.execution.ExecutionState; @@ -296,15 +298,42 @@ public class CheckpointCoordinator { checkNotNull(targetDirectory, "Savepoint target directory"); CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); - CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory, false); - if (result.isSuccess()) { - return result.getPendingCheckpoint().getCompletionFuture(); - } - else { - Throwable cause = new Exception("Failed to trigger savepoint: " + result.getFailureReason().message()); - return FlinkCompletableFuture.completedExceptionally(cause); + // Create the unique savepoint directory + final String savepointDirectory = SavepointStore + .createSavepointDirectory(targetDirectory, job); + + CheckpointTriggerResult triggerResult = triggerCheckpoint( + timestamp, + props, + savepointDirectory, + false); + + Future result; + + if (triggerResult.isSuccess()) { + result = triggerResult.getPendingCheckpoint().getCompletionFuture(); + } else { + Throwable cause = new Exception("Failed to trigger savepoint: " + triggerResult.getFailureReason().message()); + result = FlinkCompletableFuture.completedExceptionally(cause); } + + // Make sure to remove the created base directory on Exceptions + result.exceptionallyAsync(new ApplyFunction() { + @Override + public Void apply(Throwable value) { + try { + SavepointStore.deleteSavepointDirectory(savepointDirectory); + } catch (Throwable t) { + LOG.warn("Failed to delete savepoint directory " + savepointDirectory + + " after failed savepoint.", t); + } + + return null; + } + }, executor); + + return result; } /** @@ -517,9 +546,16 @@ public class CheckpointCoordinator { } // end of lock scope + CheckpointOptions checkpointOptions; + if (!props.isSavepoint()) { + checkpointOptions = CheckpointOptions.forFullCheckpoint(); + } else { + checkpointOptions = CheckpointOptions.forSavepoint(targetDirectory); + } + // send the messages to the tasks that trigger their checkpoint for (Execution execution: executions) { - execution.triggerCheckpoint(checkpointID, timestamp); + execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions); } numUnsuccessfulCheckpointsTriggers.set(0); @@ -756,7 +792,7 @@ public class CheckpointCoordinator { triggerQueuedRequests(); } - + // record the time when this was completed, to calculate // the 'min delay between checkpoints' lastCheckpointCompletionNanos = System.nanoTime(); @@ -1030,7 +1066,7 @@ public class CheckpointCoordinator { final ExecutionAttemptID executionAttemptID, final long checkpointId, final StateObject stateObject) { - + if (stateObject != null) { executor.execute(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java new file mode 100644 index 0000000..cb98d10 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointOptions.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +import java.io.Serializable; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; + +/** + * Options for performing the checkpoint. + * + *

The {@link CheckpointProperties} are related and cover properties that + * are only relevant at the {@link CheckpointCoordinator}. These options are + * relevant at the {@link StatefulTask} instances running on task managers. + */ +public class CheckpointOptions implements Serializable { + + private static final long serialVersionUID = 5010126558083292915L; + + /** Type of the checkpoint. */ + @Nonnull + private final CheckpointType checkpointType; + + /** Target location for the checkpoint. */ + @Nullable + private final String targetLocation; + + private CheckpointOptions( + @Nonnull CheckpointType checkpointType, + String targetLocation) { + this.checkpointType = checkNotNull(checkpointType); + this.targetLocation = targetLocation; + } + + /** + * Returns the type of checkpoint to perform. + * + * @return Type of checkpoint to perform. + */ + @Nonnull + public CheckpointType getCheckpointType() { + return checkpointType; + } + + /** + * Returns a custom target location or null if none + * was specified. + * + * @return A custom target location or null. + */ + @Nullable + public String getTargetLocation() { + return targetLocation; + } + + @Override + public String toString() { + return "CheckpointOptions(" + checkpointType + ")"; + } + + // ------------------------------------------------------------------------ + + private static final CheckpointOptions FULL_CHECKPOINT = new CheckpointOptions(CheckpointType.FULL_CHECKPOINT, null); + + public static CheckpointOptions forFullCheckpoint() { + return FULL_CHECKPOINT; + } + + public static CheckpointOptions forSavepoint(String targetDirectory) { + checkNotNull(targetDirectory, "targetDirectory"); + return new CheckpointOptions(CheckpointType.SAVEPOINT, targetDirectory); + } + + // ------------------------------------------------------------------------ + + /** + * The type of checkpoint to perform. + */ + public enum CheckpointType { + + /** A full checkpoint. */ + FULL_CHECKPOINT, + + /** A savepoint. */ + SAVEPOINT; + + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java index 52f2a6a..53d888e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java @@ -159,7 +159,7 @@ public class CompletedCheckpoint implements Serializable { void discard() throws Exception { try { if (externalPath != null) { - SavepointStore.removeSavepoint(externalPath); + SavepointStore.removeSavepointFile(externalPath); } StateUtil.bestEffortDiscardAllStateObjects(taskStates.values()); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 9f66314..908ff7f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -214,7 +214,8 @@ public class PendingCheckpoint { Savepoint savepoint = new SavepointV1(checkpointId, taskStates.values()); externalPath = SavepointStore.storeSavepoint( targetDirectory, - savepoint); + savepoint + ); } catch (IOException e) { LOG.error("Failed to persist checkpoint {}.",checkpointId, e); } http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java index 48cca20..0caf5b2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java @@ -18,8 +18,16 @@ package org.apache.flink.runtime.checkpoint.savepoint; -import org.apache.flink.core.fs.FSDataInputStream; +import static org.apache.flink.util.Preconditions.checkNotNull; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.flink.api.common.JobID; import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.core.memory.DataInputViewStreamWrapper; @@ -28,14 +36,8 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; - -import static org.apache.flink.util.Preconditions.checkNotNull; - /** - * A file system based savepoint store. + * Utilities for storing and loading savepoint meta data files. * *

Stored savepoints have the following format: *

@@ -52,50 +54,84 @@ public class SavepointStore {
 	/** Magic number for sanity checks against stored savepoints. */
 	public static final int MAGIC_NUMBER = 0x4960672d;
 
-	/** Prefix for savepoint files. */
-	private static final String prefix = "savepoint-";
+	private static final String META_DATA_FILE = "_metadata ";
 
 	/**
-	 * Stores the savepoint.
+	 * Creates a savepoint directory.
 	 *
-	 * @param targetDirectory Target directory to store savepoint in
-	 * @param savepoint Savepoint to be stored
-	 * @param        Savepoint type
-	 * @return Path of stored savepoint
-	 * @throws Exception Failures during store are forwarded
+	 * @param baseDirectory Base target directory for the savepoint
+	 * @param jobId Optional JobID the savepoint belongs to
+	 * @return The created savepoint directory
+	 * @throws IOException FileSystem operation failures are forwarded
 	 */
-	public static  String storeSavepoint(
-			String targetDirectory,
-			T savepoint) throws IOException {
-
-		checkNotNull(targetDirectory, "Target directory");
-		checkNotNull(savepoint, "Savepoint");
+	public static String createSavepointDirectory(@Nonnull String baseDirectory, @Nullable JobID jobId) throws IOException {
+		String prefix;
+		if (jobId == null) {
+			prefix = "savepoint-";
+		} else {
+			prefix = String.format("savepoint-%s-", jobId.toString().substring(0, 6));
+		}
 
 		Exception latestException = null;
-		Path path = null;
-		FSDataOutputStream fdos = null;
+		Path savepointDirectory = null;
 
 		FileSystem fs = null;
 
 		// Try to create a FS output stream
 		for (int attempt = 0; attempt < 10; attempt++) {
-			path = new Path(targetDirectory, FileUtils.getRandomFilename(prefix));
+			Path path = new Path(baseDirectory, FileUtils.getRandomFilename(prefix));
 
 			if (fs == null) {
 				fs = FileSystem.get(path.toUri());
 			}
 
 			try {
-				fdos = fs.create(path, false);
-				break;
+				if (fs.mkdirs(path)) {
+					savepointDirectory = path;
+					break;
+				}
 			} catch (Exception e) {
 				latestException = e;
 			}
 		}
 
-		if (fdos == null) {
-			throw new IOException("Failed to create file output stream at " + path, latestException);
+		if (savepointDirectory == null) {
+			throw new IOException("Failed to create savepoint directory at " + baseDirectory, latestException);
+		} else {
+			return savepointDirectory.getPath();
 		}
+	}
+
+	/**
+	 * Deletes a savepoint directory.
+	 *
+	 * @param savepointDirectory Recursively deletes the given directory
+	 * @throws IOException FileSystem operation failures are forwarded
+	 */
+	public static void deleteSavepointDirectory(@Nonnull String savepointDirectory) throws IOException {
+		Path path = new Path(savepointDirectory);
+		FileSystem fs = FileSystem.get(path.toUri());
+		fs.delete(path, true);
+	}
+
+	/**
+	 * Stores the savepoint metadata file.
+	 *
+	 * @param        Savepoint type
+	 * @param directory Target directory to store savepoint in
+	 * @param savepoint Savepoint to be stored
+	 * @return Path of stored savepoint
+	 * @throws Exception Failures during store are forwarded
+	 */
+	public static  String storeSavepoint(String directory, T savepoint) throws IOException {
+		checkNotNull(directory, "Target directory");
+		checkNotNull(savepoint, "Savepoint");
+
+		Path basePath = new Path(directory);
+		FileSystem fs = FileSystem.get(basePath.toUri());
+
+		Path path = new Path(basePath, META_DATA_FILE);
+		FSDataOutputStream fdos = fs.create(path, false);
 
 		boolean success = false;
 		try (DataOutputStream dos = new DataOutputStream(fdos)) {
@@ -115,20 +151,41 @@ public class SavepointStore {
 			}
 		}
 
-		return path.toString();
+		return basePath.toString();
 	}
 
 	/**
 	 * Loads the savepoint at the specified path.
 	 *
-	 * @param path Path of savepoint to load
+	 * @param savepointFileOrDirectory Path to the parent savepoint directory or the meta data file.
 	 * @return The loaded savepoint
 	 * @throws Exception Failures during load are forwared
 	 */
-	public static Savepoint loadSavepoint(String path, ClassLoader userClassLoader) throws IOException {
-		Preconditions.checkNotNull(path, "Path");
+	public static Savepoint loadSavepoint(String savepointFileOrDirectory, ClassLoader userClassLoader) throws IOException {
+		Preconditions.checkNotNull(savepointFileOrDirectory, "Path");
+
+		Path path = new Path(savepointFileOrDirectory);
+
+		LOG.info("Loading savepoint from {}", path);
 
-		try (DataInputStream dis = new DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
+		FileSystem fs = FileSystem.get(path.toUri());
+
+		FileStatus status = fs.getFileStatus(path);
+
+		// If this is a directory, we need to find the meta data file
+		if (status.isDir()) {
+			Path candidatePath = new Path(path, META_DATA_FILE);
+			if (fs.exists(candidatePath)) {
+				path = candidatePath;
+				LOG.info("Using savepoint file in {}", path);
+			} else {
+				throw new IOException("Cannot find meta data file in directory " + path
+					+ ". Please try to load the savepoint directly from the meta data file "
+					+ "instead of the directory.");
+			}
+		}
+
+		try (DataInputStream dis = new DataInputViewStreamWrapper(fs.open(path))) {
 			int magicNumber = dis.readInt();
 
 			if (magicNumber == MAGIC_NUMBER) {
@@ -152,7 +209,7 @@ public class SavepointStore {
 	 * @param path Path of savepoint to remove
 	 * @throws Exception Failures during disposal are forwarded
 	 */
-	public static void removeSavepoint(String path) throws IOException {
+	public static void removeSavepointFile(String path) throws IOException {
 		Preconditions.checkNotNull(path, "Path");
 
 		try {
@@ -173,14 +230,4 @@ public class SavepointStore {
 		}
 	}
 
-	private static FSDataInputStream createFsInputStream(Path path) throws IOException {
-		FileSystem fs = FileSystem.get(path.toUri());
-
-		if (fs.exists(path)) {
-			return fs.open(path);
-		} else {
-			throw new IllegalArgumentException("Invalid path '" + path.toUri() + "'.");
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index b3fe443..3191d76 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
@@ -675,14 +676,15 @@ public class Execution implements AccessExecution, Archiveable Type of the key by which state is keyed.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index bc4594a..a335e45 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
@@ -31,6 +32,7 @@ import java.io.IOException;
  */
 @PublicEvolving
 public abstract class AbstractStateBackend implements StateBackend, java.io.Serializable {
+
 	private static final long serialVersionUID = 4620415814639230247L;
 
 	@Override
@@ -39,6 +41,12 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
 			String operatorIdentifier) throws IOException;
 
 	@Override
+	public abstract CheckpointStreamFactory createSavepointStreamFactory(
+			JobID jobId,
+			String operatorIdentifier,
+			@Nullable String targetLocation) throws IOException;
+
+	@Override
 	public abstract  AbstractKeyedStateBackend createKeyedStateBackend(
 			Environment env,
 			JobID jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
index adf0727..8dcf49e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java
@@ -30,6 +30,7 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -154,7 +155,10 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 
 	@Override
 	public RunnableFuture snapshot(
-			long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
+			long checkpointId,
+			long timestamp,
+			CheckpointStreamFactory streamFactory,
+			CheckpointOptions checkpointOptions) throws Exception {
 
 		if (registeredStates.isEmpty()) {
 			return new DoneFuture<>(null);
@@ -346,4 +350,4 @@ public class DefaultOperatorStateBackend implements OperatorStateBackend {
 			return partitionOffsets;
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
index a4a6bc4..0d92b46 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/Snapshotable.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import java.util.Collection;
 import java.util.concurrent.RunnableFuture;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 
 /**
  * Interface for operations that can perform snapshots of their state.
@@ -37,12 +38,14 @@ public interface Snapshotable {
 	 * @param checkpointId  The ID of the checkpoint.
 	 * @param timestamp     The timestamp of the checkpoint.
 	 * @param streamFactory The factory that we can use for writing our state to streams.
+	 * @param checkpointOptions Options for how to perform this checkpoint.
 	 * @return A runnable future that will yield a {@link StateObject}.
 	 */
 	RunnableFuture snapshot(
 			long checkpointId,
 			long timestamp,
-			CheckpointStreamFactory streamFactory) throws Exception;
+			CheckpointStreamFactory streamFactory,
+			CheckpointOptions checkpointOptions) throws Exception;
 
 	/**
 	 * Restores state that was previously snapshotted from the provided parameters. Typically the parameters are state

http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 846df89..7961b5e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 
+import javax.annotation.Nullable;
 import java.io.IOException;
 
 /**
@@ -95,6 +96,27 @@ public interface StateBackend extends java.io.Serializable {
 	 */
 	CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException;
 
+	/**
+	 * Creates a {@link CheckpointStreamFactory} that can be used to create streams
+	 * that should end up in a savepoint.
+	 *
+	 * 

This is only called if the triggered checkpoint is a savepoint. Commonly + * this will return the same factory as for regular checkpoints, but maybe + * slightly adjusted. + * + * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams. + * @param operatorIdentifier An identifier of the operator for which we create streams. + * @param targetLocation An optional custom location for the savepoint stream. + * + * @return The stream factory for savepoints. + * + * @throws IOException Failures during stream creation are forwarded. + */ + CheckpointStreamFactory createSavepointStreamFactory( + JobID jobId, + String operatorIdentifier, + @Nullable String targetLocation) throws IOException; + // ------------------------------------------------------------------------ // Structure Backends // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java index 30b1da6..8455d84 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java @@ -94,18 +94,15 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { MAX_FILE_STATE_THRESHOLD); } this.fileStateThreshold = fileStateSizeThreshold; + Path basePath = checkpointDataUri; + filesystem = basePath.getFileSystem(); - Path dir = new Path(basePath, jobId.toString()); + checkpointDirectory = createBasePath(filesystem, basePath, jobId); if (LOG.isDebugEnabled()) { - LOG.debug("Initializing file stream factory to URI {}.", dir); + LOG.debug("Initialed file stream factory to URI {}.", checkpointDirectory); } - - filesystem = basePath.getFileSystem(); - filesystem.mkdirs(dir); - - checkpointDirectory = dir; } @Override @@ -115,7 +112,7 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { checkFileSystemInitialized(); - Path checkpointDir = createCheckpointDirPath(checkpointID); + Path checkpointDir = createCheckpointDirPath(checkpointDirectory, checkpointID); int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold); return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold); } @@ -130,7 +127,13 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory { } } - private Path createCheckpointDirPath(long checkpointID) { + protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException { + Path dir = new Path(checkpointDirectory, jobID.toString()); + fs.mkdirs(dir); + return dir; + } + + protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) { return new Path(checkpointDirectory, "chk-" + checkpointID); } http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java new file mode 100644 index 0000000..7410d2d --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import java.io.IOException; +import org.apache.flink.api.common.JobID; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.CheckpointStreamFactory; + +/** + * A {@link CheckpointStreamFactory} that produces streams that write to a + * {@link FileSystem}. + * + *

The difference to the parent {@link FsCheckpointStreamFactory} is only + * in the created directory layout. All checkpoint files go to the checkpoint + * directory. + */ +public class FsSavepointStreamFactory extends FsCheckpointStreamFactory { + + public FsSavepointStreamFactory( + Path checkpointDataUri, + JobID jobId, + int fileStateSizeThreshold) throws IOException { + + super(checkpointDataUri, jobId, fileStateSizeThreshold); + } + + @Override + protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException { + // No checkpoint specific directory required as the savepoint directory + // is already unique. + return checkpointDirectory; + } + + @Override + protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) { + // No checkpoint specific directory required as the savepoint directory + // is already unique. + return checkpointDirectory; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java index 281dbb0..b614d98 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java @@ -173,6 +173,15 @@ public class FsStateBackend extends AbstractStateBackend { } @Override + public CheckpointStreamFactory createSavepointStreamFactory( + JobID jobId, + String operatorIdentifier, + String targetLocation) throws IOException { + + return new FsSavepointStreamFactory(new Path(targetLocation), jobId, fileStateThreshold); + } + + @Override public AbstractKeyedStateBackend createKeyedStateBackend( Environment env, JobID jobID, http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index 04e4fbc..4a5455a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -40,6 +40,7 @@ import org.apache.flink.migration.MigrationUtil; import org.apache.flink.migration.runtime.state.KvStateSnapshot; import org.apache.flink.migration.runtime.state.filesystem.AbstractFsStateSnapshot; import org.apache.flink.migration.runtime.state.memory.AbstractMemStateSnapshot; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.ArrayListSerializer; @@ -215,7 +216,8 @@ public class HeapKeyedStateBackend extends AbstractKeyedStateBackend { public RunnableFuture snapshot( long checkpointId, long timestamp, - CheckpointStreamFactory streamFactory) throws Exception { + CheckpointStreamFactory streamFactory, + CheckpointOptions checkpointOptions) throws Exception { if (stateTables.isEmpty()) { return new DoneFuture<>(null); http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java index 58a86df..2cc1164 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java @@ -75,6 +75,15 @@ public class MemoryStateBackend extends AbstractStateBackend { } @Override + public CheckpointStreamFactory createSavepointStreamFactory( + JobID jobId, + String operatorIdentifier, + String targetLocation) throws IOException { + + return new MemCheckpointStreamFactory(maxStateSize); + } + + @Override public AbstractKeyedStateBackend createKeyedStateBackend( Environment env, JobID jobID, String operatorIdentifier, http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 2980376..8db1d5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobCache; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.SlotID; @@ -475,13 +476,13 @@ public class TaskExecutor extends RpcEndpoint { // ---------------------------------------------------------------------- @RpcMethod - public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) throws CheckpointException { + public Acknowledge triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp, CheckpointOptions checkpointOptions) throws CheckpointException { log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID); final Task task = taskSlotTable.getTask(executionAttemptID); if (task != null) { - task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp); + task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions); return Acknowledge.get(); } else { http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index ebd4c0c..36a3255 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.taskexecutor; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.Future; @@ -97,9 +98,10 @@ public interface TaskExecutorGateway extends RpcGateway { * @param executionAttemptID identifying the task * @param checkpointID unique id for the checkpoint * @param checkpointTimestamp is the timestamp when the checkpoint has been initiated + * @param checkpointOptions for performing the checkpoint * @return Future acknowledge if the checkpoint has been successfully triggered */ - Future triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp); + Future triggerCheckpoint(ExecutionAttemptID executionAttemptID, long checkpointID, long checkpointTimestamp, CheckpointOptions checkpointOptions); /** * Confirm a checkpoint for the given task. The checkpoint is identified by the checkpoint ID http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index acb423b..c9f17b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.accumulators.AccumulatorRegistry; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException; import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -1117,8 +1118,13 @@ public class Task implements Runnable, TaskActions { * * @param checkpointID The ID identifying the checkpoint. * @param checkpointTimestamp The timestamp associated with the checkpoint. + * @param checkpointOptions Options for performing this checkpoint. */ - public void triggerCheckpointBarrier(final long checkpointID, long checkpointTimestamp) { + public void triggerCheckpointBarrier( + final long checkpointID, + long checkpointTimestamp, + final CheckpointOptions checkpointOptions) { + final AbstractInvokable invokable = this.invokable; final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp); @@ -1134,7 +1140,7 @@ public class Task implements Runnable, TaskActions { // activate safety net for checkpointing thread FileSystemSafetyNet.initializeSafetyNetForThread(); try { - boolean success = statefulTask.triggerCheckpoint(checkpointMetaData); + boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions); if (!success) { checkpointResponder.declineCheckpoint( getJobID(), getExecutionId(), checkpointID, http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 8b08181..21749cb 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -837,7 +837,7 @@ class JobManager( savepoint.dispose() // Remove the header file - SavepointStore.removeSavepoint(savepointPath) + SavepointStore.removeSavepointFile(savepointPath) senderRef ! DisposeSavepointSuccess } catch { http://git-wip-us.apache.org/repos/asf/flink/blob/6e7a9174/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index a70454b..25d5366 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -501,12 +501,13 @@ class TaskManager( val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp + val checkpointOptions = message.getCheckpointOptions log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.") val task = runningTasks.get(taskExecutionId) if (task != null) { - task.triggerCheckpointBarrier(checkpointId, timestamp) + task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions) } else { log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.") }