Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 593C8200BA0 for ; Fri, 14 Oct 2016 10:06:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 57876160AFA; Fri, 14 Oct 2016 08:06:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CEB1A160AFB for ; Fri, 14 Oct 2016 10:06:00 +0200 (CEST) Received: (qmail 7825 invoked by uid 500); 14 Oct 2016 08:05:59 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 7573 invoked by uid 99); 14 Oct 2016 08:05:59 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Oct 2016 08:05:59 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 90425E055E; Fri, 14 Oct 2016 08:05:59 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uce@apache.org To: commits@flink.apache.org Date: Fri, 14 Oct 2016 08:06:01 -0000 Message-Id: <14929f95f1ec4c749862ea594f554b5b@git.apache.org> In-Reply-To: <821dd60721664ebdaa93d59582db9292@git.apache.org> References: <821dd60721664ebdaa93d59582db9292@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/7] flink git commit: [FLINK-4512] [FLIP-10] Add option to persist periodic checkpoints archived-at: Fri, 14 Oct 2016 08:06:03 -0000 http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java deleted file mode 100644 index d2bf40e..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.savepoint; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.IllegalConfigurationException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Factory for {@link SavepointStore} instances. - */ -public class SavepointStoreFactory { - - public static final String SAVEPOINT_BACKEND_KEY = "savepoints.state.backend"; - public static final String SAVEPOINT_DIRECTORY_KEY = "savepoints.state.backend.fs.dir"; - public static final String DEFAULT_SAVEPOINT_BACKEND = "jobmanager"; - - public static final Logger LOG = LoggerFactory.getLogger(SavepointStoreFactory.class); - - /** - * Creates a {@link SavepointStore} from the specified Configuration. - * - *

You can configure a savepoint-specific backend for the savepoints. If - * you don't configure anything, the regular checkpoint backend will be - * used. - * - *

The default and fallback backend is the job manager, which loses the - * savepoint after the job manager shuts down. - * - * @param config The configuration to parse the savepoint backend configuration. - * @return A savepoint store. - */ - public static SavepointStore createFromConfig(Configuration config) throws Exception { - - // Try a the savepoint-specific configuration first. - String savepointBackend = config.getString(SAVEPOINT_BACKEND_KEY, DEFAULT_SAVEPOINT_BACKEND); - - if (savepointBackend == null) { - LOG.info("No savepoint state backend configured. " + - "Using job manager savepoint state backend."); - return createJobManagerSavepointStore(); - } else if (savepointBackend.equals("jobmanager")) { - LOG.info("Using job manager savepoint state backend."); - return createJobManagerSavepointStore(); - } else if (savepointBackend.equals("filesystem")) { - String rootPath = config.getString(SAVEPOINT_DIRECTORY_KEY, null); - - if (rootPath == null) { - throw new IllegalConfigurationException("Using filesystem as savepoint state backend, " + - "but did not specify directory. Please set the " + - "following configuration key: '" + SAVEPOINT_DIRECTORY_KEY + - "' (e.g. " + SAVEPOINT_DIRECTORY_KEY + ": hdfs:///flink/savepoints/). " + - "Falling back to job manager savepoint backend."); - } else { - LOG.info("Using filesystem savepoint backend (root path: {}).", rootPath); - - return createFileSystemSavepointStore(rootPath); - } - } else { - throw new IllegalConfigurationException("Unexpected savepoint backend " + - "configuration '" + savepointBackend + "'. " + - "Falling back to job manager savepoint state backend."); - } - } - - // ------------------------------------------------------------------------ - // Savepoint stores - // ------------------------------------------------------------------------ - - private static SavepointStore createJobManagerSavepointStore() { - return new HeapSavepointStore(); - } - - private static SavepointStore createFileSystemSavepointStore(String rootPath) throws IOException { - return new FsSavepointStore(rootPath, "savepoint-"); - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java index c474311..b1d7ff2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java @@ -47,6 +47,13 @@ public interface JobCheckpointStats { */ long getCount(); + /** + * Returns the most recent external path of a checkpoint. + * + * @return External checkpoint path or null if none available. + */ + String getExternalPath(); + // ------------------------------------------------------------------------ // Duration // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java index 2217fd4..db8a0e0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java @@ -130,6 +130,7 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker { metrics.gauge("lastCheckpointSize", new CheckpointSizeGauge()); metrics.gauge("lastCheckpointDuration", new CheckpointDurationGauge()); + metrics.gauge("lastCheckpointExternalPath", new CheckpointExternalPathGauge()); } @Override @@ -278,6 +279,7 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker { // Need to clone in order to have a consistent snapshot. // We can safely update it afterwards. (List) history.clone(), + latestCompletedCheckpoint.getExternalPath(), overallCount, overallMinDuration, overallMaxDuration, @@ -349,6 +351,7 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker { // General private final List recentHistory; private final long count; + private final String externalPath; // Duration private final long minDuration; @@ -362,6 +365,7 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker { public JobCheckpointStatsSnapshot( List recentHistory, + String externalPath, long count, long minDuration, long maxDuration, @@ -372,6 +376,7 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker { this.recentHistory = recentHistory; this.count = count; + this.externalPath = externalPath; this.minDuration = minDuration; this.maxDuration = maxDuration; @@ -393,6 +398,11 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker { } @Override + public String getExternalPath() { + return externalPath; + } + + @Override public long getMinDuration() { return minDuration; } @@ -440,4 +450,17 @@ public class SimpleCheckpointStatsTracker implements CheckpointStatsTracker { return latestCompletedCheckpoint == null ? -1 : latestCompletedCheckpoint.getDuration(); } } + + private class CheckpointExternalPathGauge implements Gauge { + + @Override + public String getValue() { + CompletedCheckpoint checkpoint = latestCompletedCheckpoint; + if (checkpoint != null && checkpoint.getExternalPath() != null) { + return checkpoint.getExternalPath(); + } else { + return "n/a"; + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index cf98ca6..10f0e88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.SuppressRestartsException; @@ -49,6 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.ScheduleMode; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; import org.apache.flink.runtime.query.KvStateLocationRegistry; import org.apache.flink.runtime.taskmanager.TaskExecutionState; @@ -353,12 +353,13 @@ public class ExecutionGraph { long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints, + ExternalizedCheckpointSettings externalizeSettings, List verticesToTrigger, List verticesToWaitFor, List verticesToCommitTo, CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, - SavepointStore savepointStore, + String checkpointDir, CheckpointStatsTracker statsTracker) { // simple sanity checks @@ -389,12 +390,13 @@ public class ExecutionGraph { checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, + externalizeSettings, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, checkpointIDCounter, checkpointStore, - savepointStore, + checkpointDir, checkpointStatsTracker); // the periodic checkpoint scheduler is activated and deactivated as a result of @@ -414,7 +416,7 @@ public class ExecutionGraph { } if (checkpointCoordinator != null) { - checkpointCoordinator.suspend(); + checkpointCoordinator.shutdown(state); checkpointCoordinator = null; checkpointStatsTracker = null; } @@ -1076,15 +1078,8 @@ public class ExecutionGraph { CheckpointCoordinator coord = this.checkpointCoordinator; this.checkpointCoordinator = null; if (coord != null) { - if (state.isGloballyTerminalState()) { - coord.shutdown(); - } else { - coord.suspend(); - } + coord.shutdown(state); } - - // We don't clean the checkpoint stats tracker, because we want - // it to be available after the job has terminated. } catch (Exception e) { LOG.error("Error while cleaning up after execution", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index 1c6eb8d..dcd6a5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore; import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker; @@ -40,9 +39,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; - import org.slf4j.Logger; - import scala.concurrent.ExecutionContext; import scala.concurrent.ExecutionContext$; import scala.concurrent.duration.FiniteDuration; @@ -71,7 +68,6 @@ public class ExecutionGraphBuilder { Executor executor, ClassLoader classLoader, CheckpointRecoveryFactory recoveryFactory, - SavepointStore savepointStore, Time timeout, RestartStrategy restartStrategy, MetricGroup metrics, @@ -82,7 +78,7 @@ public class ExecutionGraphBuilder { final ExecutionContext executionContext = ExecutionContext$.MODULE$.fromExecutor(executor); return buildGraph(prior, jobGraph, jobManagerConfig, executionContext, - classLoader, recoveryFactory, savepointStore, timeout, restartStrategy, + classLoader, recoveryFactory, timeout, restartStrategy, metrics, parallelismForAutoMax, log); } @@ -98,7 +94,6 @@ public class ExecutionGraphBuilder { ExecutionContext executionContext, ClassLoader classLoader, CheckpointRecoveryFactory recoveryFactory, - SavepointStore savepointStore, Time timeout, RestartStrategy restartStrategy, MetricGroup metrics, @@ -183,7 +178,6 @@ public class ExecutionGraphBuilder { // configure the state checkpointing JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings(); if (snapshotSettings != null) { - List triggerVertices = idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph); @@ -220,17 +214,22 @@ public class ExecutionGraphBuilder { checkpointStatsTracker = new SimpleCheckpointStatsTracker(historySize, ackVertices, metrics); } + /** The default directory for externalized checkpoints. */ + String externalizedCheckpointsDir = jobManagerConfig.getString( + ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null); + executionGraph.enableSnapshotCheckpointing( snapshotSettings.getCheckpointInterval(), snapshotSettings.getCheckpointTimeout(), snapshotSettings.getMinPauseBetweenCheckpoints(), snapshotSettings.getMaxConcurrentCheckpoints(), + snapshotSettings.getExternalizedCheckpointSettings(), triggerVertices, ackVertices, confirmVertices, checkpointIdCounter, completedCheckpoints, - savepointStore, + externalizedCheckpointsDir, checkpointStatsTracker); } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java new file mode 100644 index 0000000..779fc76 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.tasks; + +import org.apache.flink.annotation.Internal; + +/** + * Grouped settings for externalized checkpoints. + */ +@Internal +public class ExternalizedCheckpointSettings implements java.io.Serializable { + + private static final ExternalizedCheckpointSettings NONE = new ExternalizedCheckpointSettings(false, false); + + /** Flag indicating whether checkpoints should be externalized. */ + private final boolean externalizeCheckpoints; + + /** Flag indicating whether externalized checkpoints should delete on cancellation. */ + private final boolean deleteOnCancellation; + + private ExternalizedCheckpointSettings(boolean externalizeCheckpoints, boolean deleteOnCancellation) { + this.externalizeCheckpoints = externalizeCheckpoints; + this.deleteOnCancellation = deleteOnCancellation; + } + + /** + * Returns true if checkpoints should be externalized. + * + * @return true if checkpoints should be externalized. + */ + public boolean externalizeCheckpoints() { + return externalizeCheckpoints; + } + + /** + * Returns true if externalized checkpoints should be deleted on cancellation. + * + * @return true if externalized checkpoints should be deleted on cancellation. + */ + public boolean deleteOnCancellation() { + return deleteOnCancellation; + } + + public static ExternalizedCheckpointSettings externalizeCheckpoints(boolean deleteOnCancellation) { + return new ExternalizedCheckpointSettings(true, deleteOnCancellation); + } + + public static ExternalizedCheckpointSettings none() { + return NONE; + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java index ab701b5..36a5c5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java @@ -32,7 +32,6 @@ import static java.util.Objects.requireNonNull; public class JobSnapshottingSettings implements java.io.Serializable{ private static final long serialVersionUID = -2593319571078198180L; - private final List verticesToTrigger; @@ -48,19 +47,25 @@ public class JobSnapshottingSettings implements java.io.Serializable{ private final int maxConcurrentCheckpoints; + /** Settings for externalized checkpoints. */ + private final ExternalizedCheckpointSettings externalizedCheckpointSettings; + /** Path to savepoint to reset state back to (optional, can be null) */ private String savepointPath; - public JobSnapshottingSettings(List verticesToTrigger, - List verticesToAcknowledge, - List verticesToConfirm, - long checkpointInterval, long checkpointTimeout, - long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints) - { + public JobSnapshottingSettings( + List verticesToTrigger, + List verticesToAcknowledge, + List verticesToConfirm, + long checkpointInterval, + long checkpointTimeout, + long minPauseBetweenCheckpoints, + int maxConcurrentCheckpoints, + ExternalizedCheckpointSettings externalizedCheckpointSettings) { + // sanity checks if (checkpointInterval < 1 || checkpointTimeout < 1 || - minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) - { + minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) { throw new IllegalArgumentException(); } @@ -71,6 +76,7 @@ public class JobSnapshottingSettings implements java.io.Serializable{ this.checkpointTimeout = checkpointTimeout; this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; + this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings); } // -------------------------------------------------------------------------------------------- @@ -103,6 +109,10 @@ public class JobSnapshottingSettings implements java.io.Serializable{ return maxConcurrentCheckpoints; } + public ExternalizedCheckpointSettings getExternalizedCheckpointSettings() { + return externalizedCheckpointSettings; + } + /** * Sets the savepoint path. * http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java index e125e10..67fc397 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java @@ -218,15 +218,13 @@ public class ZooKeeperUtils { * @param configuration {@link Configuration} object * @param jobId ID of job to create the instance for * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints to retain - * @param userClassLoader User code class loader * @return {@link ZooKeeperCompletedCheckpointStore} instance */ public static CompletedCheckpointStore createCompletedCheckpoints( CuratorFramework client, Configuration configuration, JobID jobId, - int maxNumberOfCheckpointsToRetain, - ClassLoader userClassLoader) throws Exception { + int maxNumberOfCheckpointsToRetain) throws Exception { checkNotNull(configuration, "Configuration"); @@ -244,7 +242,6 @@ public class ZooKeeperUtils { return new ZooKeeperCompletedCheckpointStore( maxNumberOfCheckpointsToRetain, - userClassLoader, client, checkpointsPath, stateStorage); http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala index 485a21e..f426254 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala @@ -24,7 +24,6 @@ import akka.actor.ActorRef import org.apache.flink.api.common.JobID import org.apache.flink.configuration.Configuration import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory @@ -69,7 +68,6 @@ abstract class ContaineredJobManager( leaderElectionService: LeaderElectionService, submittedJobGraphs : SubmittedJobGraphStore, checkpointRecoveryFactory : CheckpointRecoveryFactory, - savepointStore: SavepointStore, jobRecoveryTimeout: FiniteDuration, metricsRegistry: Option[FlinkMetricRegistry]) extends JobManager( @@ -84,7 +82,6 @@ abstract class ContaineredJobManager( leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, - savepointStore, jobRecoveryTimeout, metricsRegistry) { http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index be820ae..450e810 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -19,8 +19,8 @@ package org.apache.flink.runtime.jobmanager import java.io.{File, IOException} -import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, UnknownHostException} import java.lang.management.ManagementFactory +import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, UnknownHostException} import java.util.UUID import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException} import javax.management.ObjectName @@ -34,23 +34,23 @@ import org.apache.flink.api.common.time.Time import org.apache.flink.configuration.{ConfigConstants, Configuration, GlobalConfiguration, HighAvailabilityOptions} import org.apache.flink.core.fs.FileSystem import org.apache.flink.core.io.InputSplitAssigner -import org.apache.flink.metrics.{Gauge, MetricGroup} import org.apache.flink.metrics.groups.UnregisteredMetricsGroup +import org.apache.flink.metrics.{Gauge, MetricGroup} import org.apache.flink.runtime.accumulators.AccumulatorSnapshot import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour} import org.apache.flink.runtime.blob.BlobServer import org.apache.flink.runtime.checkpoint._ -import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore, SavepointStoreFactory} -import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, DisabledCheckpointStatsTracker, SimpleCheckpointStatsTracker} +import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, SavepointStore} import org.apache.flink.runtime.client._ -import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID +import org.apache.flink.runtime.concurrent.BiFunction +import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory -import org.apache.flink.runtime.executiongraph.{ExecutionGraphBuilder, ExecutionGraph, ExecutionJobVertex, StatusListenerMessenger} +import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionGraphBuilder, ExecutionJobVertex, StatusListenerMessenger} import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager} import org.apache.flink.runtime.io.network.PartitionState import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus} @@ -66,15 +66,14 @@ import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendSta import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, AccumulatorResultStringsFound, AccumulatorResultsErroneous, AccumulatorResultsFound, RequestAccumulatorResults, RequestAccumulatorResultsStringified} import org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, AcknowledgeCheckpoint, DeclineCheckpoint} -import org.apache.flink.runtime.messages.webmonitor.InfoMessage -import org.apache.flink.runtime.messages.webmonitor._ -import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, MetricRegistry => FlinkMetricRegistry} +import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _} import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup +import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry, MetricRegistryConfiguration} import org.apache.flink.runtime.process.ProcessReaper -import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation} import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, NotifyKvStateRegistered, NotifyKvStateUnregistered} -import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration} +import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation} import org.apache.flink.runtime.security.SecurityContext +import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, SecurityConfiguration} import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util._ import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils} @@ -128,7 +127,6 @@ class JobManager( protected val leaderElectionService: LeaderElectionService, protected val submittedJobGraphs : SubmittedJobGraphStore, protected val checkpointRecoveryFactory : CheckpointRecoveryFactory, - protected val savepointStore: SavepointStore, protected val jobRecoveryTimeout: FiniteDuration, protected val metricsRegistry: Option[FlinkMetricRegistry]) extends FlinkActor @@ -178,6 +176,13 @@ class JobManager( val webMonitorPort : Int = flinkConfiguration.getInteger( ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1) + /** The default directory for savepoints. */ + val defaultSavepointDir: String = ConfigurationUtil.getStringWithDeprecatedKeys( + flinkConfiguration, + ConfigConstants.SAVEPOINT_DIRECTORY_KEY, + null, + ConfigConstants.SAVEPOINT_FS_DIRECTORY_KEY) + /** The resource manager actor responsible for allocating and managing task manager resources. */ var currentResourceManager: Option[ActorRef] = None @@ -242,14 +247,6 @@ class JobManager( } try { - savepointStore.shutdown() - } catch { - case e: Exception => - log.error("Could not shut down savepoint store.", e) - throw new RuntimeException("Could not stop the savepoint store store.", e) - } - - try { // revoke leadership and stop leader election service leaderElectionService.stop() } catch { @@ -695,7 +692,7 @@ class JobManager( case kvStateMsg : KvStateMessage => handleKvStateMessage(kvStateMsg) - case TriggerSavepoint(jobId) => + case TriggerSavepoint(jobId, savepointDirectory) => currentJobs.get(jobId) match { case Some((graph, _)) => val checkpointCoordinator = graph.getCheckpointCoordinator() @@ -703,31 +700,46 @@ class JobManager( if (checkpointCoordinator != null) { // Immutable copy for the future val senderRef = sender() - - future { - try { - // Do this async, because checkpoint coordinator operations can - // contain blocking calls to the state backend or ZooKeeper. - val savepointFuture = checkpointCoordinator.triggerSavepoint( - System.currentTimeMillis()) - - savepointFuture.onComplete { - // Success, respond with the savepoint path - case scala.util.Success(savepointPath) => - senderRef ! TriggerSavepointSuccess(jobId, savepointPath) - - // Failure, respond with the cause - case scala.util.Failure(t) => - senderRef ! TriggerSavepointFailure( - jobId, - new Exception("Failed to complete savepoint", t)) - }(context.dispatcher) - } catch { - case e: Exception => - senderRef ! TriggerSavepointFailure(jobId, new Exception( - "Failed to trigger savepoint", e)) + try { + val targetDirectory : String = savepointDirectory.getOrElse( + flinkConfiguration.getString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, null)) + + if (targetDirectory == null) { + throw new IllegalStateException("No savepoint directory configured. " + + "You can either specify a directory when triggering this savepoint or " + + "configure a cluster-wide default via key '" + + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'.") } - }(context.dispatcher) + + // Do this async, because checkpoint coordinator operations can + // contain blocking calls to the state backend or ZooKeeper. + val savepointFuture = checkpointCoordinator.triggerSavepoint( + System.currentTimeMillis(), + targetDirectory) + + savepointFuture.handleAsync[Void]( + new BiFunction[CompletedCheckpoint, Throwable, Void] { + override def apply(success: CompletedCheckpoint, cause: Throwable): Void = { + if (success != null) { + if (success.getExternalPath != null) { + senderRef ! TriggerSavepointSuccess(jobId, success.getExternalPath) + } else { + senderRef ! TriggerSavepointFailure( + jobId, new Exception("Savepoint has not been persisted.")) + } + } else { + senderRef ! TriggerSavepointFailure( + jobId, new Exception("Failed to complete savepoint", cause)) + } + null + } + }, + context.dispatcher) + } catch { + case e: Exception => + senderRef ! TriggerSavepointFailure(jobId, new Exception( + "Failed to trigger savepoint", e)) + } } else { sender() ! TriggerSavepointFailure(jobId, new IllegalStateException( "Checkpointing disabled. You can enable it via the execution environment of " + @@ -744,12 +756,15 @@ class JobManager( try { log.info(s"Disposing savepoint at '$savepointPath'.") - val savepoint = savepointStore.loadSavepoint(savepointPath) + val savepoint = SavepointStore.loadSavepoint(savepointPath) log.debug(s"$savepoint") - // Dispose the savepoint - savepointStore.disposeSavepoint(savepointPath) + // Dispose checkpoint state + savepoint.dispose() + + // Remove the header file + SavepointStore.removeSavepoint(savepointPath) senderRef ! DisposeSavepointSuccess } catch { @@ -1150,7 +1165,6 @@ class JobManager( executionContext, userCodeLoader, checkpointRecoveryFactory, - savepointStore, Time.of(timeout.length, timeout.unit), restartStrategy, jobMetrics, @@ -1218,7 +1232,7 @@ class JobManager( // load the savepoint as a checkpoint into the system val savepoint: CompletedCheckpoint = SavepointLoader.loadAndValidateSavepoint( - jobId, executionGraph.getAllVertices, savepointStore, savepointPath) + jobId, executionGraph.getAllVertices, savepointPath) executionGraph.getCheckpointCoordinator.getCheckpointStore .addCheckpoint(savepoint) @@ -2420,7 +2434,6 @@ object JobManager { LeaderElectionService, SubmittedJobGraphStore, CheckpointRecoveryFactory, - SavepointStore, FiniteDuration, // timeout for job recovery Option[FlinkMetricRegistry] ) = { @@ -2497,8 +2510,6 @@ object JobManager { new ZooKeeperCheckpointRecoveryFactory(client, configuration)) } - val savepointStore = SavepointStoreFactory.createFromConfig(configuration) - val jobRecoveryTimeoutStr = configuration.getValue(HighAvailabilityOptions.HA_JOB_DELAY) val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || jobRecoveryTimeoutStr.isEmpty) { @@ -2531,7 +2542,6 @@ object JobManager { leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, - savepointStore, jobRecoveryTimeout, metricRegistry) } @@ -2595,8 +2605,7 @@ object JobManager { leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, - savepointStore, - jobRecoveryTimeout, + jobRecoveryTimeout, metricsRegistry) = createJobManagerComponents( configuration, None) @@ -2622,7 +2631,6 @@ object JobManager { leaderElectionService, submittedJobGraphs, checkpointRecoveryFactory, - savepointStore, jobRecoveryTimeout, metricsRegistry) @@ -2657,7 +2665,6 @@ object JobManager { leaderElectionService: LeaderElectionService, submittedJobGraphStore: SubmittedJobGraphStore, checkpointRecoveryFactory: CheckpointRecoveryFactory, - savepointStore: SavepointStore, jobRecoveryTimeout: FiniteDuration, metricsRegistry: Option[FlinkMetricRegistry]): Props = { @@ -2674,7 +2681,6 @@ object JobManager { leaderElectionService, submittedJobGraphStore, checkpointRecoveryFactory, - savepointStore, jobRecoveryTimeout, metricsRegistry) } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index 5e2b547..fd45cda 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -467,8 +467,11 @@ object JobManagerMessages { * of triggering and acknowledging checkpoints. * * @param jobId The JobID of the job to trigger the savepoint for. + * @param savepointDirectory Optional target directory */ - case class TriggerSavepoint(jobId: JobID) extends RequiresLeaderSessionID + case class TriggerSavepoint( + jobId: JobID, + savepointDirectory : Option[String] = Option.empty) extends RequiresLeaderSessionID /** * Response after a successful savepoint trigger containing the savepoint path. http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala index 27c9dd9..2f453a3 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala @@ -25,7 +25,6 @@ import org.apache.flink.api.common.JobID import org.apache.flink.api.common.io.FileOutputFormat import org.apache.flink.configuration.{ConfigConstants, Configuration} import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory -import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.{ResourceID, ResourceIDRetrievable} @@ -119,7 +118,6 @@ class LocalFlinkMiniCluster( leaderElectionService, submittedJobGraphStore, checkpointRecoveryFactory, - savepointStore, jobRecoveryTimeout, metricsRegistry) = JobManager.createJobManagerComponents(config, createLeaderElectionService()) @@ -143,7 +141,6 @@ class LocalFlinkMiniCluster( leaderElectionService, submittedJobGraphStore, checkpointRecoveryFactory, - savepointStore, jobRecoveryTimeout, metricsRegistry), jobManagerName) @@ -248,7 +245,6 @@ class LocalFlinkMiniCluster( leaderElectionService: LeaderElectionService, submittedJobGraphStore: SubmittedJobGraphStore, checkpointRecoveryFactory: CheckpointRecoveryFactory, - savepointStore: SavepointStore, jobRecoveryTimeout: FiniteDuration, metricsRegistry: Option[MetricRegistry]): Props = { @@ -265,7 +261,6 @@ class LocalFlinkMiniCluster( leaderElectionService, submittedJobGraphStore, checkpointRecoveryFactory, - savepointStore, jobRecoveryTimeout, metricsRegistry) } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 728c7d5..7b0e819 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -23,14 +23,16 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore; import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; @@ -50,11 +52,12 @@ import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import scala.concurrent.ExecutionContext; -import scala.concurrent.Future; import java.io.IOException; import java.io.Serializable; @@ -92,7 +95,8 @@ import static org.mockito.Mockito.when; */ public class CheckpointCoordinatorTest { - private static final ClassLoader cl = Thread.currentThread().getContextClassLoader(); + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); @Test public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() { @@ -115,13 +119,15 @@ public class CheckpointCoordinatorTest { jid, 600000, 600000, - 0, Integer.MAX_VALUE, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] {}, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(1), + null, new DisabledCheckpointStatsTracker()); // nothing should be happening @@ -135,7 +141,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -168,12 +174,13 @@ public class CheckpointCoordinatorTest { 600000, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] {}, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(1), + null, new DisabledCheckpointStatsTracker()); // nothing should be happening @@ -187,7 +194,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -218,12 +225,13 @@ public class CheckpointCoordinatorTest { 600000, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] {}, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(1), + null, new DisabledCheckpointStatsTracker()); // nothing should be happening @@ -237,7 +245,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -269,12 +277,13 @@ public class CheckpointCoordinatorTest { 600000, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(1), + null, new DisabledCheckpointStatsTracker()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -363,7 +372,7 @@ public class CheckpointCoordinatorTest { long checkpointIdNew2 = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); assertEquals(checkpointIdNew2, checkpointIdNew); - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -395,12 +404,13 @@ public class CheckpointCoordinatorTest { 600000, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(1), + null, new DisabledCheckpointStatsTracker()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -488,7 +498,7 @@ public class CheckpointCoordinatorTest { coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID2, checkpoint1Id, checkpoint1.getCheckpointTimestamp())); assertTrue(checkpoint1.isDiscarded()); - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -515,12 +525,13 @@ public class CheckpointCoordinatorTest { 600000, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(1), + null, new DisabledCheckpointStatsTracker()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -626,7 +637,7 @@ public class CheckpointCoordinatorTest { verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2)); } - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -668,12 +679,13 @@ public class CheckpointCoordinatorTest { 600000, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, new ExecutionVertex[] { commitVertex }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(2), + null, new DisabledCheckpointStatsTracker()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -766,7 +778,7 @@ public class CheckpointCoordinatorTest { assertEquals(jid, sc2.getJobId()); assertTrue(sc2.getTaskStates().isEmpty()); - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -807,12 +819,13 @@ public class CheckpointCoordinatorTest { 600000, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { triggerVertex1, triggerVertex2 }, new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 }, new ExecutionVertex[] { commitVertex }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(10, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(10), + null, new DisabledCheckpointStatsTracker()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -893,7 +906,7 @@ public class CheckpointCoordinatorTest { // send the last remaining ack for the first checkpoint. This should not do anything coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, new CheckpointMetaData(checkpointId1, 0L))); - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -932,12 +945,13 @@ public class CheckpointCoordinatorTest { 200, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] { commitVertex }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(2), + null, new DisabledCheckpointStatsTracker()); // trigger a checkpoint, partially acknowledged @@ -968,7 +982,7 @@ public class CheckpointCoordinatorTest { verify(commitVertex, times(0)) .sendMessageToCurrentExecution(any(NotifyCheckpointComplete.class), any(ExecutionAttemptID.class)); - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -1000,12 +1014,13 @@ public class CheckpointCoordinatorTest { 200000, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex1, ackVertex2 }, new ExecutionVertex[] { commitVertex }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(2), + null, new DisabledCheckpointStatsTracker()); assertTrue(coord.triggerCheckpoint(timestamp)); @@ -1027,7 +1042,7 @@ public class CheckpointCoordinatorTest { // unknown ack vertex coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointMetaData)); - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -1081,12 +1096,13 @@ public class CheckpointCoordinatorTest { 200000, // timeout is very long (200 s) 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex }, new ExecutionVertex[] { commitVertex }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(2), + null, new DisabledCheckpointStatsTracker()); @@ -1133,7 +1149,7 @@ public class CheckpointCoordinatorTest { assertTrue(numCallsSoFar == numCalls.get() || numCallsSoFar + 1 == numCalls.get()); - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -1172,12 +1188,13 @@ public class CheckpointCoordinatorTest { 200000, // timeout is very long (200 s) 500, // 500ms delay between checkpoints 10, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(2), + null, new DisabledCheckpointStatsTracker()); coord.startCheckpointScheduler(); @@ -1215,7 +1232,7 @@ public class CheckpointCoordinatorTest { coord.stopCheckpointScheduler(); - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -1256,20 +1273,22 @@ public class CheckpointCoordinatorTest { 600000, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(1), + null, new DisabledCheckpointStatsTracker()); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); // trigger the first checkpoint. this should succeed - Future savepointFuture = coord.triggerSavepoint(timestamp); - assertFalse(savepointFuture.isCompleted()); + String savepointDir = tmpFolder.newFolder().getAbsolutePath(); + Future savepointFuture = coord.triggerSavepoint(timestamp, savepointDir); + assertFalse(savepointFuture.isDone()); // validate that we have a pending savepoint assertEquals(1, coord.getNumberOfPendingCheckpoints()); @@ -1287,7 +1306,7 @@ public class CheckpointCoordinatorTest { assertFalse(pending.isDiscarded()); assertFalse(pending.isFullyAcknowledged()); assertFalse(pending.canBeSubsumed()); - assertTrue(pending instanceof PendingSavepoint); + assertTrue(pending instanceof PendingCheckpoint); CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, 0L); @@ -1297,13 +1316,13 @@ public class CheckpointCoordinatorTest { assertEquals(1, pending.getNumberOfNonAcknowledgedTasks()); assertFalse(pending.isDiscarded()); assertFalse(pending.isFullyAcknowledged()); - assertFalse(savepointFuture.isCompleted()); + assertFalse(savepointFuture.isDone()); // acknowledge the same task again (should not matter) coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointMetaData)); assertFalse(pending.isDiscarded()); assertFalse(pending.isFullyAcknowledged()); - assertFalse(savepointFuture.isCompleted()); + assertFalse(savepointFuture.isDone()); // acknowledge the other task. coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaData)); @@ -1311,7 +1330,7 @@ public class CheckpointCoordinatorTest { // the checkpoint is internally converted to a successful checkpoint and the // pending checkpoint object is disposed assertTrue(pending.isDiscarded()); - assertTrue(savepointFuture.isCompleted()); + assertTrue(savepointFuture.isDone()); // the now we should have a completed checkpoint assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1335,9 +1354,8 @@ public class CheckpointCoordinatorTest { // trigger another checkpoint and see that this one replaces the other checkpoint // --------------- final long timestampNew = timestamp + 7; - savepointFuture = coord.triggerSavepoint(timestampNew); - assertFalse(savepointFuture.isCompleted()); - + savepointFuture = coord.triggerSavepoint(timestampNew, savepointDir); + assertFalse(savepointFuture.isDone()); long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); CheckpointMetaData checkpointMetaDataNew = new CheckpointMetaData(checkpointIdNew, 0L); @@ -1352,7 +1370,7 @@ public class CheckpointCoordinatorTest { assertEquals(timestampNew, successNew.getTimestamp()); assertEquals(checkpointIdNew, successNew.getCheckpointID()); assertTrue(successNew.getTaskStates().isEmpty()); - assertTrue(savepointFuture.isCompleted()); + assertTrue(savepointFuture.isDone()); // validate that the relevant tasks got a confirmation message { @@ -1367,7 +1385,7 @@ public class CheckpointCoordinatorTest { verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2)); } - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } /** @@ -1396,16 +1414,19 @@ public class CheckpointCoordinatorTest { 600000, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, new ExecutionVertex[] { vertex1, vertex2 }, counter, - new StandaloneCompletedCheckpointStore(10, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(10), + null, new DisabledCheckpointStatsTracker()); + String savepointDir = tmpFolder.newFolder().getAbsolutePath(); + // Trigger savepoint and checkpoint - Future savepointFuture1 = coord.triggerSavepoint(timestamp); + Future savepointFuture1 = coord.triggerSavepoint(timestamp, savepointDir); long savepointId1 = counter.getLast(); CheckpointMetaData checkpointMetaDataS1 = new CheckpointMetaData(savepointId1, 0L); assertEquals(1, coord.getNumberOfPendingCheckpoints()); @@ -1427,12 +1448,12 @@ public class CheckpointCoordinatorTest { assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints()); assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded()); - assertFalse(savepointFuture1.isCompleted()); + assertFalse(savepointFuture1.isDone()); assertTrue(coord.triggerCheckpoint(timestamp + 3)); assertEquals(2, coord.getNumberOfPendingCheckpoints()); - Future savepointFuture2 = coord.triggerSavepoint(timestamp + 4); + Future savepointFuture2 = coord.triggerSavepoint(timestamp + 4, savepointDir); long savepointId2 = counter.getLast(); CheckpointMetaData checkpointMetaDataS2 = new CheckpointMetaData(savepointId2, 0L); assertEquals(3, coord.getNumberOfPendingCheckpoints()); @@ -1445,8 +1466,8 @@ public class CheckpointCoordinatorTest { assertEquals(2, coord.getNumberOfRetainedSuccessfulCheckpoints()); assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded()); - assertFalse(savepointFuture1.isCompleted()); - assertTrue(savepointFuture2.isCompleted()); + assertFalse(savepointFuture1.isDone()); + assertTrue(savepointFuture2.isDone()); // Ack first savepoint coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointMetaDataS1)); @@ -1454,7 +1475,7 @@ public class CheckpointCoordinatorTest { assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(3, coord.getNumberOfRetainedSuccessfulCheckpoints()); - assertTrue(savepointFuture1.isCompleted()); + assertTrue(savepointFuture1.isDone()); } private void testMaxConcurrentAttempts(int maxConcurrentAttempts) { @@ -1486,12 +1507,13 @@ public class CheckpointCoordinatorTest { 200000, // timeout is very long (200 s) 0L, // no extra delay maxConcurrentAttempts, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex }, new ExecutionVertex[] { commitVertex }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(2), + null, new DisabledCheckpointStatsTracker()); coord.startCheckpointScheduler(); @@ -1529,7 +1551,7 @@ public class CheckpointCoordinatorTest { Thread.sleep(200); assertEquals(maxConcurrentAttempts + 1, numCalls.get()); - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -1558,12 +1580,13 @@ public class CheckpointCoordinatorTest { 200000, // timeout is very long (200 s) 0L, // no extra delay maxConcurrentAttempts, // max two concurrent checkpoints + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex }, new ExecutionVertex[] { commitVertex }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(2), + null, new DisabledCheckpointStatsTracker()); coord.startCheckpointScheduler(); @@ -1602,7 +1625,7 @@ public class CheckpointCoordinatorTest { assertNotNull(coord.getPendingCheckpoints().get(3L)); assertNotNull(coord.getPendingCheckpoints().get(4L)); - coord.shutdown(); + coord.shutdown(JobStatus.FINISHED); } catch (Exception e) { e.printStackTrace(); @@ -1639,12 +1662,13 @@ public class CheckpointCoordinatorTest { 200000, // timeout is very long (200 s) 0L, // no extra delay 2, // max two concurrent checkpoints + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { triggerVertex }, new ExecutionVertex[] { ackVertex }, new ExecutionVertex[] { commitVertex }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(2), + null, new DisabledCheckpointStatsTracker()); coord.startCheckpointScheduler(); @@ -1690,26 +1714,29 @@ public class CheckpointCoordinatorTest { 200000, 0L, 1, // max one checkpoint at a time => should not affect savepoints + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, checkpointIDCounter, - new StandaloneCompletedCheckpointStore(2, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(2), + null, new DisabledCheckpointStatsTracker()); - List> savepointFutures = new ArrayList<>(); + List> savepointFutures = new ArrayList<>(); int numSavepoints = 5; + String savepointDir = tmpFolder.newFolder().getAbsolutePath(); + // Trigger savepoints for (int i = 0; i < numSavepoints; i++) { - savepointFutures.add(coord.triggerSavepoint(i)); + savepointFutures.add(coord.triggerSavepoint(i, savepointDir)); } // After triggering multiple savepoints, all should in progress - for (Future savepointFuture : savepointFutures) { - assertFalse(savepointFuture.isCompleted()); + for (Future savepointFuture : savepointFutures) { + assertFalse(savepointFuture.isDone()); } // ACK all savepoints @@ -1719,8 +1746,8 @@ public class CheckpointCoordinatorTest { } // After ACKs, all should be completed - for (Future savepointFuture : savepointFutures) { - assertTrue(savepointFuture.isCompleted()); + for (Future savepointFuture : savepointFutures) { + assertTrue(savepointFuture.isDone()); } } @@ -1740,19 +1767,22 @@ public class CheckpointCoordinatorTest { 200000, 100000000L, // very long min delay => should not affect savepoints 1, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new ExecutionVertex[] { vertex1 }, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(2, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(2), + null, new DisabledCheckpointStatsTracker()); - Future savepoint0 = coord.triggerSavepoint(0); - assertFalse("Did not trigger savepoint", savepoint0.isCompleted()); + String savepointDir = tmpFolder.newFolder().getAbsolutePath(); + + Future savepoint0 = coord.triggerSavepoint(0, savepointDir); + assertFalse("Did not trigger savepoint", savepoint0.isDone()); - Future savepoint1 = coord.triggerSavepoint(1); - assertFalse("Did not trigger savepoint", savepoint1.isCompleted()); + Future savepoint1 = coord.triggerSavepoint(1, savepointDir); + assertFalse("Did not trigger savepoint", savepoint1.isDone()); } // ------------------------------------------------------------------------ @@ -1796,18 +1826,19 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, + jid, + 600000, + 600000, 0, Integer.MAX_VALUE, - arrayExecutionVertices, - arrayExecutionVertices, - arrayExecutionVertices, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), - new DisabledCheckpointStatsTracker()); + ExternalizedCheckpointSettings.none(), + arrayExecutionVertices, + arrayExecutionVertices, + arrayExecutionVertices, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker()); // trigger the checkpoint coord.triggerCheckpoint(timestamp); @@ -1900,18 +1931,19 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - arrayExecutionVertices, - arrayExecutionVertices, - arrayExecutionVertices, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + arrayExecutionVertices, + arrayExecutionVertices, + arrayExecutionVertices, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker()); // trigger the checkpoint coord.triggerCheckpoint(timestamp); @@ -2014,18 +2046,19 @@ public class CheckpointCoordinatorTest { // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( - jid, - 600000, - 600000, - 0, - Integer.MAX_VALUE, - arrayExecutionVertices, - arrayExecutionVertices, - arrayExecutionVertices, - new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), - new DisabledCheckpointStatsTracker()); + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + arrayExecutionVertices, + arrayExecutionVertices, + arrayExecutionVertices, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker()); // trigger the checkpoint coord.triggerCheckpoint(timestamp); @@ -2141,12 +2174,13 @@ public class CheckpointCoordinatorTest { 600000, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), arrayExecutionVertices, arrayExecutionVertices, arrayExecutionVertices, new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(1), + null, new DisabledCheckpointStatsTracker()); // trigger the checkpoint @@ -2235,34 +2269,57 @@ public class CheckpointCoordinatorTest { comparePartitionableState(originalPartitionableStates, actualPartitionableStates); } - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - static void sendAckMessageToCoordinator( - CheckpointCoordinator coord, - long checkpointId, JobID jid, - ExecutionJobVertex jobVertex, - JobVertexID jobVertexID, - List keyGroupPartitions) throws Exception { + /** + * Tests that the externalized checkpoint configuration is respected. + */ + @Test + public void testExternalizedCheckpoints() throws Exception { + try { + final JobID jid = new JobID(); + final long timestamp = System.currentTimeMillis(); - for (int index = 0; index < jobVertex.getParallelism(); index++) { - ChainedStateHandle state = generateStateForVertex(jobVertexID, index); - List keyGroupState = generateKeyGroupState( - jobVertexID, - keyGroupPartitions.get(index)); + // create some mock Execution vertices that receive the checkpoint trigger messages + final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); + ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); - CheckpointStateHandles checkpointStateHandles = new CheckpointStateHandles(state, null, keyGroupState); - AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint( + // set up the coordinator and validate the initial state + CheckpointCoordinator coord = new CheckpointCoordinator( jid, - jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), - new CheckpointMetaData(checkpointId, 0L), - checkpointStateHandles); + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.externalizeCheckpoints(true), + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + "fake-directory", + new DisabledCheckpointStatsTracker()); - coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); + assertTrue(coord.triggerCheckpoint(timestamp)); + + for (PendingCheckpoint checkpoint : coord.getPendingCheckpoints().values()) { + CheckpointProperties props = checkpoint.getProps(); + CheckpointProperties expected = CheckpointProperties.forExternalizedCheckpoint(true); + + assertEquals(expected, props); + } + + // the now we should have a completed checkpoint + coord.shutdown(JobStatus.FINISHED); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); } } + // ------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------ + public static List generateKeyGroupState( JobVertexID jobVertexID, KeyGroupRange keyGroupPartition) throws IOException { @@ -2720,4 +2777,96 @@ public class CheckpointCoordinatorTest { Assert.assertEquals(expected, actual); } + @Test + public void testDeclineCheckpointRespectsProperties() throws Exception { + final JobID jid = new JobID(); + final long timestamp = System.currentTimeMillis(); + + // create some mock Execution vertices that receive the checkpoint trigger messages + final ExecutionAttemptID attemptID1 = new ExecutionAttemptID(); + ExecutionVertex vertex1 = mockExecutionVertex(attemptID1); + + // set up the coordinator and validate the initial state + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new ExecutionVertex[] { vertex1 }, + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(1), + null, + new DisabledCheckpointStatsTracker()); + + assertEquals(0, coord.getNumberOfPendingCheckpoints()); + assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + + // trigger the first checkpoint. this should succeed + CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); + String targetDirectory = "xjasdkjakshdmmmxna"; + + CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(timestamp, props, targetDirectory); + assertEquals(true, triggerResult.isSuccess()); + + // validate that we have a pending checkpoint + assertEquals(1, coord.getNumberOfPendingCheckpoints()); + assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + + long checkpointId = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); + PendingCheckpoint checkpoint = coord.getPendingCheckpoints().get(checkpointId); + + assertNotNull(checkpoint); + assertEquals(checkpointId, checkpoint.getCheckpointId()); + assertEquals(timestamp, checkpoint.getCheckpointTimestamp()); + assertEquals(jid, checkpoint.getJobId()); + assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks()); + assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks()); + assertEquals(0, checkpoint.getTaskStates().size()); + assertFalse(checkpoint.isDiscarded()); + assertFalse(checkpoint.isFullyAcknowledged()); + assertEquals(props, checkpoint.getProps()); + assertEquals(targetDirectory, checkpoint.getTargetDirectory()); + + { + // check that the vertices received the trigger checkpoint message + TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointId, timestamp); + verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1)); + } + + // decline checkpoint, this should cancel the checkpoint and re-trigger with correct properties + coord.receiveDeclineMessage(new DeclineCheckpoint(jid, attemptID1, checkpointId, checkpoint.getCheckpointTimestamp())); + assertTrue(checkpoint.isDiscarded()); + + // validate that we have a new pending checkpoint + assertEquals(1, coord.getNumberOfPendingCheckpoints()); + assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); + + long checkpointIdNew = coord.getPendingCheckpoints().entrySet().iterator().next().getKey(); + PendingCheckpoint checkpointNew = coord.getPendingCheckpoints().get(checkpointIdNew); + + assertNotNull(checkpointNew); + assertEquals(checkpointIdNew, checkpointNew.getCheckpointId()); + assertEquals(jid, checkpointNew.getJobId()); + assertEquals(1, checkpointNew.getNumberOfNonAcknowledgedTasks()); + assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks()); + assertEquals(0, checkpointNew.getTaskStates().size()); + assertFalse(checkpointNew.isDiscarded()); + assertFalse(checkpointNew.isFullyAcknowledged()); + assertNotEquals(checkpoint.getCheckpointId(), checkpointNew.getCheckpointId()); + // Respect the properties and target directory from the initial trigger + assertEquals(props, checkpointNew.getProps()); + assertEquals(targetDirectory, checkpointNew.getTargetDirectory()); + + // check that the vertices received the new trigger checkpoint message + { + TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew, checkpointNew.getCheckpointTimestamp()); + verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1)); + } + + coord.shutdown(JobStatus.FINISHED); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java index 49b5fe7..9ece607 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; @@ -76,7 +77,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger { CuratorFramework client = ZooKeeper.getClient(); assertNotNull(client.checkExists().forPath("/checkpoint-id-counter")); - counter.shutdown(); + counter.shutdown(JobStatus.FINISHED); assertNull(client.checkExists().forPath("/checkpoint-id-counter")); } @@ -91,7 +92,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger { CuratorFramework client = ZooKeeper.getClient(); assertNotNull(client.checkExists().forPath("/checkpoint-id-counter")); - counter.suspend(); + counter.shutdown(JobStatus.SUSPENDED); assertNotNull(client.checkExists().forPath("/checkpoint-id-counter")); } @@ -120,7 +121,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger { assertEquals(4, counter.getAndIncrement()); } finally { - counter.shutdown(); + counter.shutdown(JobStatus.FINISHED); } } @@ -183,7 +184,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger { executor.shutdown(); } - counter.shutdown(); + counter.shutdown(JobStatus.FINISHED); } } @@ -200,7 +201,7 @@ public abstract class CheckpointIDCounterTest extends TestLogger { assertEquals(1337, counter.getAndIncrement()); assertEquals(1338, counter.getAndIncrement()); - counter.shutdown(); + counter.shutdown(JobStatus.FINISHED); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java index 5772fae..c996886 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java @@ -23,15 +23,66 @@ import org.junit.Test; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +/** + * Tests for the default checkpoint properties. + */ public class CheckpointPropertiesTest { + /** + * Tests the default checkpoint properties. + */ @Test public void testCheckpointProperties() { - assertFalse(CheckpointProperties.forStandardCheckpoint().isSavepoint()); + CheckpointProperties props = CheckpointProperties.forStandardCheckpoint(); + + assertFalse(props.forceCheckpoint()); + assertFalse(props.externalizeCheckpoint()); + assertTrue(props.discardOnSubsumed()); + assertTrue(props.discardOnJobFinished()); + assertTrue(props.discardOnJobCancelled()); + assertTrue(props.discardOnJobFailed()); + assertTrue(props.discardOnJobSuspended()); } + /** + * Tests the external checkpoints properties. + */ + @Test + public void testPersistentCheckpointProperties() { + CheckpointProperties props = CheckpointProperties.forExternalizedCheckpoint(true); + + assertFalse(props.forceCheckpoint()); + assertTrue(props.externalizeCheckpoint()); + assertTrue(props.discardOnSubsumed()); + assertTrue(props.discardOnJobFinished()); + assertTrue(props.discardOnJobCancelled()); + assertFalse(props.discardOnJobFailed()); + assertTrue(props.discardOnJobSuspended()); + + props = CheckpointProperties.forExternalizedCheckpoint(false); + + assertFalse(props.forceCheckpoint()); + assertTrue(props.externalizeCheckpoint()); + assertTrue(props.discardOnSubsumed()); + assertTrue(props.discardOnJobFinished()); + assertFalse(props.discardOnJobCancelled()); + assertFalse(props.discardOnJobFailed()); + assertTrue(props.discardOnJobSuspended()); + } + + /** + * Tests the default (manually triggered) savepoint properties. + */ @Test public void testSavepointProperties() { - assertTrue(CheckpointProperties.forStandardSavepoint().isSavepoint()); + CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); + + assertTrue(props.forceCheckpoint()); + assertTrue(props.externalizeCheckpoint()); + assertFalse(props.discardOnSubsumed()); + assertFalse(props.discardOnJobFinished()); + assertFalse(props.discardOnJobCancelled()); + assertFalse(props.discardOnJobFailed()); + assertFalse(props.discardOnJobSuspended()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index c20c604..b4dcab5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore; import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.Execution; @@ -27,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.ChainedStateHandle; import org.apache.flink.runtime.state.CheckpointStateHandles; @@ -58,8 +58,6 @@ import static org.mockito.Mockito.when; */ public class CheckpointStateRestoreTest { - private static final ClassLoader cl = Thread.currentThread().getContextClassLoader(); - @Test public void testSetState() { try { @@ -101,12 +99,13 @@ public class CheckpointStateRestoreTest { 200000L, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[0], new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(1), + null, new DisabledCheckpointStatsTracker()); // create ourselves a checkpoint with state @@ -199,12 +198,13 @@ public class CheckpointStateRestoreTest { 200000L, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 }, new ExecutionVertex[0], new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(1), + null, new DisabledCheckpointStatsTracker()); // create ourselves a checkpoint with state @@ -252,12 +252,13 @@ public class CheckpointStateRestoreTest { 200000L, 0, Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), new ExecutionVertex[] { mock(ExecutionVertex.class) }, new ExecutionVertex[] { mock(ExecutionVertex.class) }, new ExecutionVertex[0], new StandaloneCheckpointIDCounter(), - new StandaloneCompletedCheckpointStore(1, cl), - new HeapSavepointStore(), + new StandaloneCompletedCheckpointStore(1), + null, new DisabledCheckpointStatsTracker()); try {