Repository: flink
Updated Branches:
refs/heads/master c607b9a22 -> 42328bd9b
[hotfix] [checkpoints] Rename JobSnapshottingSettings to JobCheckpointingSettings
Cleanup to consistently use
- checkpoint for the overall fault tolerance mechanism and procedure
- snapshot for an operators state snapshot that is part of a checkpoint
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f0d6769
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f0d6769
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f0d6769
Branch: refs/heads/master
Commit: 5f0d6769051be8db8ee12659e13d22e5c5fd2f2d
Parents: 4d8627c
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Mar 27 18:23:13 2017 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Apr 20 10:59:50 2017 +0200
----------------------------------------------------------------------
.../jobmanager/JMXJobManagerMetricTest.java | 4 +-
.../checkpoints/CheckpointConfigHandler.java | 4 +-
.../CheckpointConfigHandlerTest.java | 14 +-
.../checkpoint/CheckpointStatsTracker.java | 14 +-
.../executiongraph/AccessExecutionGraph.java | 6 +-
.../executiongraph/ArchivedExecutionGraph.java | 13 +-
.../runtime/executiongraph/ExecutionGraph.java | 7 +-
.../executiongraph/ExecutionGraphBuilder.java | 4 +-
.../apache/flink/runtime/jobgraph/JobGraph.java | 12 +-
.../tasks/JobCheckpointingSettings.java | 151 +++++++++++++++++++
.../jobgraph/tasks/JobSnapshottingSettings.java | 151 -------------------
.../checkpoint/CheckpointStatsTrackerTest.java | 16 +-
.../checkpoint/CoordinatorShutdownTest.java | 6 +-
.../ArchivedExecutionGraphTest.java | 4 +-
.../ExecutionGraphDeploymentTest.java | 4 +-
.../tasks/JobCheckpointingSettingsTest.java | 65 ++++++++
.../tasks/JobSnapshottingSettingsTest.java | 65 --------
.../jobmanager/JobManagerHARecoveryTest.java | 4 +-
.../runtime/jobmanager/JobManagerTest.java | 12 +-
.../flink/runtime/jobmanager/JobSubmitTest.java | 4 +-
.../runtime/jobmanager/JobManagerITCase.scala | 8 +-
.../api/graph/StreamingJobGraphGenerator.java | 4 +-
.../graph/StreamingJobGraphGeneratorTest.java | 4 +-
23 files changed, 287 insertions(+), 289 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 1fdac65..934a621 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.junit.Assert;
@@ -70,7 +70,7 @@ public class JMXJobManagerMetricTest {
sourceJobVertex.setInvokableClass(BlockingInvokable.class);
JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
- jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+ jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index 947b7c3..7914c29 100644
--- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
@@ -69,7 +69,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle
private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException {
StringWriter writer = new StringWriter();
JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer);
- JobSnapshottingSettings settings = graph.getJobSnapshottingSettings();
+ JobCheckpointingSettings settings = graph.getJobCheckpointingSettings();
if (settings == null) {
return "{}";
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index 9d339f5..6e48973 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
@@ -50,7 +50,7 @@ public class CheckpointConfigHandlerTest {
AccessExecutionGraph graph = graphAndSettings.graph;
when(graph.getJobID()).thenReturn(new JobID());
- JobSnapshottingSettings settings = graphAndSettings.snapshottingSettings;
+ JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings;
ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings;
Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(graph);
@@ -90,7 +90,7 @@ public class CheckpointConfigHandlerTest {
GraphAndSettings graphAndSettings = createGraphAndSettings(false, true);
AccessExecutionGraph graph = graphAndSettings.graph;
- JobSnapshottingSettings settings = graphAndSettings.snapshottingSettings;
+ JobCheckpointingSettings settings = graphAndSettings.snapshottingSettings;
CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
String json = handler.handleRequest(graph, Collections.<String, String>emptyMap());
@@ -156,7 +156,7 @@ public class CheckpointConfigHandlerTest {
? ExternalizedCheckpointSettings.externalizeCheckpoints(true)
: ExternalizedCheckpointSettings.none();
- JobSnapshottingSettings settings = new JobSnapshottingSettings(
+ JobCheckpointingSettings settings = new JobCheckpointingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
@@ -169,19 +169,19 @@ public class CheckpointConfigHandlerTest {
exactlyOnce);
AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
- when(graph.getJobSnapshottingSettings()).thenReturn(settings);
+ when(graph.getJobCheckpointingSettings()).thenReturn(settings);
return new GraphAndSettings(graph, settings, externalizedSetting);
}
private static class GraphAndSettings {
public final AccessExecutionGraph graph;
- public final JobSnapshottingSettings snapshottingSettings;
+ public final JobCheckpointingSettings snapshottingSettings;
public final ExternalizedCheckpointSettings externalizedSettings;
public GraphAndSettings(
AccessExecutionGraph graph,
- JobSnapshottingSettings snapshottingSettings,
+ JobCheckpointingSettings snapshottingSettings,
ExternalizedCheckpointSettings externalizedSettings) {
this.graph = graph;
this.snapshottingSettings = snapshottingSettings;
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
index c7efb7b..313fe13 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTracker.java
@@ -31,7 +31,7 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
/**
* Tracker for checkpoint statistics.
@@ -67,7 +67,7 @@ public class CheckpointStatsTracker {
private final int totalSubtaskCount;
/** Snapshotting settings created from the CheckpointConfig. */
- private final JobSnapshottingSettings jobSnapshottingSettings;
+ private final JobCheckpointingSettings jobCheckpointingSettings;
/** Checkpoint counts. */
private final CheckpointStatsCounts counts = new CheckpointStatsCounts();
@@ -104,19 +104,19 @@ public class CheckpointStatsTracker {
*
* @param numRememberedCheckpoints Maximum number of checkpoints to remember, including in progress ones.
* @param jobVertices Job vertices involved in the checkpoints.
- * @param jobSnapshottingSettings Snapshotting settings created from the CheckpointConfig.
+ * @param jobCheckpointingSettings Snapshotting settings created from the CheckpointConfig.
* @param metricGroup Metric group for exposed metrics
*/
public CheckpointStatsTracker(
int numRememberedCheckpoints,
List<ExecutionJobVertex> jobVertices,
- JobSnapshottingSettings jobSnapshottingSettings,
+ JobCheckpointingSettings jobCheckpointingSettings,
MetricGroup metricGroup) {
checkArgument(numRememberedCheckpoints >= 0, "Negative number of remembered checkpoints");
this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
this.jobVertices = checkNotNull(jobVertices, "JobVertices");
- this.jobSnapshottingSettings = checkNotNull(jobSnapshottingSettings);
+ this.jobCheckpointingSettings = checkNotNull(jobCheckpointingSettings);
// Compute the total subtask count. We do this here in order to only
// do it once.
@@ -143,8 +143,8 @@ public class CheckpointStatsTracker {
*
* @return The job's snapshotting settings.
*/
- public JobSnapshottingSettings getSnapshottingSettings() {
- return jobSnapshottingSettings;
+ public JobCheckpointingSettings getSnapshottingSettings() {
+ return jobCheckpointingSettings;
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 18c2ec2..3b064c3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.util.SerializedValue;
import java.io.IOException;
@@ -116,12 +116,12 @@ public interface AccessExecutionGraph {
CheckpointCoordinator getCheckpointCoordinator();
/**
- * Returns the {@link JobSnapshottingSettings} or <code>null</code> if
+ * Returns the {@link JobCheckpointingSettings} or <code>null</code> if
* checkpointing is disabled.
*
* @return JobSnapshottingSettings for this execution graph
*/
- JobSnapshottingSettings getJobSnapshottingSettings();
+ JobCheckpointingSettings getJobCheckpointingSettings();
/**
* Returns a snapshot of the checkpoint statistics or <code>null</code> if
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
index 334b0d0..b9db1e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraph.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.util.SerializedValue;
import javax.annotation.Nullable;
@@ -81,7 +81,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
private final Map<String, SerializedValue<Object>> serializedUserAccumulators;
@Nullable
- private final JobSnapshottingSettings jobSnapshottingSettings;
+ private final JobCheckpointingSettings jobCheckpointingSettings;
@Nullable
private final CheckpointStatsSnapshot checkpointStatsSnapshot;
@@ -99,7 +99,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
Map<String, SerializedValue<Object>> serializedUserAccumulators,
ArchivedExecutionConfig executionConfig,
boolean isStoppable,
- @Nullable JobSnapshottingSettings jobSnapshottingSettings,
+ @Nullable JobCheckpointingSettings jobCheckpointingSettings,
@Nullable CheckpointStatsSnapshot checkpointStatsSnapshot) {
this.jobID = jobID;
@@ -114,7 +114,7 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
this.serializedUserAccumulators = serializedUserAccumulators;
this.archivedExecutionConfig = executionConfig;
this.isStoppable = isStoppable;
- this.jobSnapshottingSettings = jobSnapshottingSettings;
+ this.jobCheckpointingSettings = jobCheckpointingSettings;
this.checkpointStatsSnapshot = checkpointStatsSnapshot;
}
@@ -210,9 +210,8 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl
return null;
}
- @Override
- public JobSnapshottingSettings getJobSnapshottingSettings() {
- return jobSnapshottingSettings;
+ public JobCheckpointingSettings getJobCheckpointingSettings() {
+ return jobCheckpointingSettings;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index b21b72b..29b9806 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -55,7 +55,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
@@ -419,8 +419,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
return restartStrategy;
}
- @Override
- public JobSnapshottingSettings getJobSnapshottingSettings() {
+ public JobCheckpointingSettings getJobCheckpointingSettings() {
if (checkpointStatsTracker != null) {
return checkpointStatsTracker.getSnapshottingSettings();
} else {
@@ -1477,7 +1476,7 @@ public class ExecutionGraph implements AccessExecutionGraph, Archiveable<Archive
serializedUserAccumulators,
getArchivedExecutionConfig(),
isStoppable(),
- getJobSnapshottingSettings(),
+ getJobCheckpointingSettings(),
getCheckpointStatsSnapshot());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index a6455f5..a10c62e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -42,7 +42,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.util.DynamicCodeLoadingException;
@@ -159,7 +159,7 @@ public class ExecutionGraphBuilder {
}
// configure the state checkpointing
- JobSnapshottingSettings snapshotSettings = jobGraph.getSnapshotSettings();
+ JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
if (snapshotSettings != null) {
List<ExecutionJobVertex> triggerVertices =
idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
index f6377e5..2a8af37 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java
@@ -28,7 +28,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.util.SerializedValue;
import scala.concurrent.duration.FiniteDuration;
@@ -92,7 +92,7 @@ public class JobGraph implements Serializable {
private SerializedValue<ExecutionConfig> serializedExecutionConfig;
/** The settings for the job checkpoints */
- private JobSnapshottingSettings snapshotSettings;
+ private JobCheckpointingSettings snapshotSettings;
/** Savepoint restore settings. */
private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none();
@@ -331,17 +331,17 @@ public class JobGraph implements Serializable {
*
* @param settings The snapshot settings, or null, to disable snapshotting.
*/
- public void setSnapshotSettings(JobSnapshottingSettings settings) {
+ public void setSnapshotSettings(JobCheckpointingSettings settings) {
this.snapshotSettings = settings;
}
/**
* Gets the settings for asynchronous snapshots. This method returns null, when
- * snapshotting is not enabled.
+ * checkpointing is not enabled.
*
- * @return The snapshot settings, or null, if snapshotting is not enabled.
+ * @return The snapshot settings, or null, if checkpointing is not enabled.
*/
- public JobSnapshottingSettings getSnapshotSettings() {
+ public JobCheckpointingSettings getCheckpointingSettings() {
return snapshotSettings;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
new file mode 100644
index 0000000..38130d4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettings.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.StateBackend;
+
+import javax.annotation.Nullable;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * The JobCheckpointingSettings are attached to a JobGraph and describe the settings
+ * for the asynchronous checkpoints of the JobGraph, such as interval, and which vertices
+ * need to participate.
+ */
+public class JobCheckpointingSettings implements java.io.Serializable {
+
+ private static final long serialVersionUID = -2593319571078198180L;
+
+ private final List<JobVertexID> verticesToTrigger;
+
+ private final List<JobVertexID> verticesToAcknowledge;
+
+ private final List<JobVertexID> verticesToConfirm;
+
+ private final long checkpointInterval;
+
+ private final long checkpointTimeout;
+
+ private final long minPauseBetweenCheckpoints;
+
+ private final int maxConcurrentCheckpoints;
+
+ /** Settings for externalized checkpoints. */
+ private final ExternalizedCheckpointSettings externalizedCheckpointSettings;
+
+ /** The default state backend, if configured by the user in the job */
+ @Nullable
+ private final StateBackend defaultStateBackend;
+
+ /**
+ * Flag indicating whether exactly once checkpoint mode has been configured.
+ * If <code>false</code>, at least once mode has been configured. This is
+ * not a necessary attribute, because the checkpointing mode is only relevant
+ * for the stream tasks, but we expose it here to forward it to the web runtime
+ * UI.
+ */
+ private final boolean isExactlyOnce;
+
+ public JobCheckpointingSettings(
+ List<JobVertexID> verticesToTrigger,
+ List<JobVertexID> verticesToAcknowledge,
+ List<JobVertexID> verticesToConfirm,
+ long checkpointInterval,
+ long checkpointTimeout,
+ long minPauseBetweenCheckpoints,
+ int maxConcurrentCheckpoints,
+ ExternalizedCheckpointSettings externalizedCheckpointSettings,
+ @Nullable StateBackend defaultStateBackend,
+ boolean isExactlyOnce) {
+
+ // sanity checks
+ if (checkpointInterval < 1 || checkpointTimeout < 1 ||
+ minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) {
+ throw new IllegalArgumentException();
+ }
+
+ this.verticesToTrigger = requireNonNull(verticesToTrigger);
+ this.verticesToAcknowledge = requireNonNull(verticesToAcknowledge);
+ this.verticesToConfirm = requireNonNull(verticesToConfirm);
+ this.checkpointInterval = checkpointInterval;
+ this.checkpointTimeout = checkpointTimeout;
+ this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+ this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+ this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings);
+ this.defaultStateBackend = defaultStateBackend;
+ this.isExactlyOnce = isExactlyOnce;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ public List<JobVertexID> getVerticesToTrigger() {
+ return verticesToTrigger;
+ }
+
+ public List<JobVertexID> getVerticesToAcknowledge() {
+ return verticesToAcknowledge;
+ }
+
+ public List<JobVertexID> getVerticesToConfirm() {
+ return verticesToConfirm;
+ }
+
+ public long getCheckpointInterval() {
+ return checkpointInterval;
+ }
+
+ public long getCheckpointTimeout() {
+ return checkpointTimeout;
+ }
+
+ public long getMinPauseBetweenCheckpoints() {
+ return minPauseBetweenCheckpoints;
+ }
+
+ public int getMaxConcurrentCheckpoints() {
+ return maxConcurrentCheckpoints;
+ }
+
+ public ExternalizedCheckpointSettings getExternalizedCheckpointSettings() {
+ return externalizedCheckpointSettings;
+ }
+
+ @Nullable
+ public StateBackend getDefaultStateBackend() {
+ return defaultStateBackend;
+ }
+
+ public boolean isExactlyOnce() {
+ return isExactlyOnce;
+ }
+
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public String toString() {
+ return String.format("SnapshotSettings: interval=%d, timeout=%d, pause-between=%d, " +
+ "maxConcurrent=%d, trigger=%s, ack=%s, commit=%s",
+ checkpointInterval, checkpointTimeout,
+ minPauseBetweenCheckpoints, maxConcurrentCheckpoints,
+ verticesToTrigger, verticesToAcknowledge, verticesToConfirm);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
deleted file mode 100644
index 233aa88..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateBackend;
-
-import javax.annotation.Nullable;
-import java.util.List;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * The JobCheckpointingSettings are attached to a JobGraph and describe the settings
- * for the asynchronous checkpoints of the JobGraph, such as interval, and which vertices
- * need to participate.
- */
-public class JobSnapshottingSettings implements java.io.Serializable {
-
- private static final long serialVersionUID = -2593319571078198180L;
-
- private final List<JobVertexID> verticesToTrigger;
-
- private final List<JobVertexID> verticesToAcknowledge;
-
- private final List<JobVertexID> verticesToConfirm;
-
- private final long checkpointInterval;
-
- private final long checkpointTimeout;
-
- private final long minPauseBetweenCheckpoints;
-
- private final int maxConcurrentCheckpoints;
-
- /** Settings for externalized checkpoints. */
- private final ExternalizedCheckpointSettings externalizedCheckpointSettings;
-
- /** The default state backend, if configured by the user in the job */
- @Nullable
- private final StateBackend defaultStateBackend;
-
- /**
- * Flag indicating whether exactly once checkpoint mode has been configured.
- * If <code>false</code>, at least once mode has been configured. This is
- * not a necessary attribute, because the checkpointing mode is only relevant
- * for the stream tasks, but we expose it here to forward it to the web runtime
- * UI.
- */
- private final boolean isExactlyOnce;
-
- public JobSnapshottingSettings(
- List<JobVertexID> verticesToTrigger,
- List<JobVertexID> verticesToAcknowledge,
- List<JobVertexID> verticesToConfirm,
- long checkpointInterval,
- long checkpointTimeout,
- long minPauseBetweenCheckpoints,
- int maxConcurrentCheckpoints,
- ExternalizedCheckpointSettings externalizedCheckpointSettings,
- @Nullable StateBackend defaultStateBackend,
- boolean isExactlyOnce) {
-
- // sanity checks
- if (checkpointInterval < 1 || checkpointTimeout < 1 ||
- minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1) {
- throw new IllegalArgumentException();
- }
-
- this.verticesToTrigger = requireNonNull(verticesToTrigger);
- this.verticesToAcknowledge = requireNonNull(verticesToAcknowledge);
- this.verticesToConfirm = requireNonNull(verticesToConfirm);
- this.checkpointInterval = checkpointInterval;
- this.checkpointTimeout = checkpointTimeout;
- this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
- this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
- this.externalizedCheckpointSettings = requireNonNull(externalizedCheckpointSettings);
- this.defaultStateBackend = defaultStateBackend;
- this.isExactlyOnce = isExactlyOnce;
- }
-
- // --------------------------------------------------------------------------------------------
-
- public List<JobVertexID> getVerticesToTrigger() {
- return verticesToTrigger;
- }
-
- public List<JobVertexID> getVerticesToAcknowledge() {
- return verticesToAcknowledge;
- }
-
- public List<JobVertexID> getVerticesToConfirm() {
- return verticesToConfirm;
- }
-
- public long getCheckpointInterval() {
- return checkpointInterval;
- }
-
- public long getCheckpointTimeout() {
- return checkpointTimeout;
- }
-
- public long getMinPauseBetweenCheckpoints() {
- return minPauseBetweenCheckpoints;
- }
-
- public int getMaxConcurrentCheckpoints() {
- return maxConcurrentCheckpoints;
- }
-
- public ExternalizedCheckpointSettings getExternalizedCheckpointSettings() {
- return externalizedCheckpointSettings;
- }
-
- @Nullable
- public StateBackend getDefaultStateBackend() {
- return defaultStateBackend;
- }
-
- public boolean isExactlyOnce() {
- return isExactlyOnce;
- }
-
- // --------------------------------------------------------------------------------------------
-
- @Override
- public String toString() {
- return String.format("SnapshotSettings: interval=%d, timeout=%d, pause-between=%d, " +
- "maxConcurrent=%d, trigger=%s, ack=%s, commit=%s",
- checkpointInterval, checkpointTimeout,
- minPauseBetweenCheckpoints, maxConcurrentCheckpoints,
- verticesToTrigger, verticesToAcknowledge, verticesToConfirm);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
index aaf1774..d66d0be 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java
@@ -44,7 +44,7 @@ import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.junit.Test;
public class CheckpointStatsTrackerTest {
@@ -58,7 +58,7 @@ public class CheckpointStatsTrackerTest {
when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID());
when(jobVertex.getParallelism()).thenReturn(1);
- JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+ JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
Collections.singletonList(new JobVertexID()),
Collections.singletonList(new JobVertexID()),
Collections.singletonList(new JobVertexID()),
@@ -94,7 +94,7 @@ public class CheckpointStatsTrackerTest {
CheckpointStatsTracker tracker = new CheckpointStatsTracker(
0,
Collections.singletonList(jobVertex),
- mock(JobSnapshottingSettings.class),
+ mock(JobCheckpointingSettings.class),
new UnregisteredMetricsGroup());
PendingCheckpointStats pending = tracker.reportPendingCheckpoint(
@@ -142,7 +142,7 @@ public class CheckpointStatsTrackerTest {
CheckpointStatsTracker tracker = new CheckpointStatsTracker(
10,
Collections.singletonList(jobVertex),
- mock(JobSnapshottingSettings.class),
+ mock(JobCheckpointingSettings.class),
new UnregisteredMetricsGroup());
// Completed checkpoint
@@ -247,7 +247,7 @@ public class CheckpointStatsTrackerTest {
CheckpointStatsTracker tracker = new CheckpointStatsTracker(
10,
Collections.singletonList(jobVertex),
- mock(JobSnapshottingSettings.class),
+ mock(JobCheckpointingSettings.class),
new UnregisteredMetricsGroup());
CheckpointStatsSnapshot snapshot1 = tracker.createSnapshot();
@@ -293,7 +293,7 @@ public class CheckpointStatsTrackerTest {
new CheckpointStatsTracker(
0,
Collections.singletonList(jobVertex),
- mock(JobSnapshottingSettings.class),
+ mock(JobCheckpointingSettings.class),
metricGroup);
verify(metricGroup, times(1)).gauge(eq(CheckpointStatsTracker.NUMBER_OF_CHECKPOINTS_METRIC), any(Gauge.class));
@@ -409,7 +409,7 @@ public class CheckpointStatsTrackerTest {
CheckpointStatsTracker stats = new CheckpointStatsTracker(
0,
Collections.singletonList(jobVertex),
- mock(JobSnapshottingSettings.class),
+ mock(JobCheckpointingSettings.class),
metricGroup);
// Make sure to adjust this test if metrics are added/removed
@@ -522,7 +522,7 @@ public class CheckpointStatsTrackerTest {
return new CheckpointStatsTracker(
0,
Collections.singletonList(jobVertex),
- mock(JobSnapshottingSettings.class),
+ mock(JobCheckpointingSettings.class),
new UnregisteredMetricsGroup());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index 976da48..2e0bd76 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -65,7 +65,7 @@ public class CoordinatorShutdownTest {
List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
JobGraph testGraph = new JobGraph("test job", vertex);
- testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
+ testGraph.setSnapshotSettings(new JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList,
5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true));
ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
@@ -124,7 +124,7 @@ public class CoordinatorShutdownTest {
List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
JobGraph testGraph = new JobGraph("test job", vertex);
- testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
+ testGraph.setSnapshotSettings(new JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList,
5000, 60000, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), null, true));
ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 077ab53..f96b624 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -41,7 +41,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.BeforeClass;
@@ -116,7 +116,7 @@ public class ArchivedExecutionGraphTest {
CheckpointStatsTracker statsTracker = new CheckpointStatsTracker(
0,
jobVertices,
- mock(JobSnapshottingSettings.class),
+ mock(JobCheckpointingSettings.class),
new UnregisteredMetricsGroup());
runtimeGraph.enableCheckpointing(
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 8d91b84..866f55c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -61,7 +61,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.operators.BatchTask;
@@ -559,7 +559,7 @@ public class ExecutionGraphDeploymentTest {
final JobID jobId = new JobID();
final JobGraph jobGraph = new JobGraph(jobId, "test");
- jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+ jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
Collections.<JobVertexID>emptyList(),
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
new file mode 100644
index 0000000..c3524fa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class JobCheckpointingSettingsTest {
+
+ /**
+ * Tests that the settings are actually serializable.
+ */
+ @Test
+ public void testIsJavaSerializable() throws Exception {
+ JobCheckpointingSettings settings = new JobCheckpointingSettings(
+ Arrays.asList(new JobVertexID(), new JobVertexID()),
+ Arrays.asList(new JobVertexID(), new JobVertexID()),
+ Arrays.asList(new JobVertexID(), new JobVertexID()),
+ 1231231,
+ 1231,
+ 112,
+ 12,
+ ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+ new MemoryStateBackend(),
+ false);
+
+ JobCheckpointingSettings copy = CommonTestUtils.createCopySerializable(settings);
+ assertEquals(settings.getVerticesToAcknowledge(), copy.getVerticesToAcknowledge());
+ assertEquals(settings.getVerticesToConfirm(), copy.getVerticesToConfirm());
+ assertEquals(settings.getVerticesToTrigger(), copy.getVerticesToTrigger());
+ assertEquals(settings.getCheckpointInterval(), copy.getCheckpointInterval());
+ assertEquals(settings.getCheckpointTimeout(), copy.getCheckpointTimeout());
+ assertEquals(settings.getMinPauseBetweenCheckpoints(), copy.getMinPauseBetweenCheckpoints());
+ assertEquals(settings.getMaxConcurrentCheckpoints(), copy.getMaxConcurrentCheckpoints());
+ assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(), copy.getExternalizedCheckpointSettings().externalizeCheckpoints());
+ assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation());
+ assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
+ assertNotNull(copy.getDefaultStateBackend());
+ assertTrue(copy.getDefaultStateBackend().getClass() == MemoryStateBackend.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
deleted file mode 100644
index 2508d5c..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettingsTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobgraph.tasks;
-
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.junit.Test;
-
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-public class JobSnapshottingSettingsTest {
-
- /**
- * Tests that the settings are actually serializable.
- */
- @Test
- public void testIsJavaSerializable() throws Exception {
- JobSnapshottingSettings settings = new JobSnapshottingSettings(
- Arrays.asList(new JobVertexID(), new JobVertexID()),
- Arrays.asList(new JobVertexID(), new JobVertexID()),
- Arrays.asList(new JobVertexID(), new JobVertexID()),
- 1231231,
- 1231,
- 112,
- 12,
- ExternalizedCheckpointSettings.externalizeCheckpoints(true),
- new MemoryStateBackend(),
- false);
-
- JobSnapshottingSettings copy = CommonTestUtils.createCopySerializable(settings);
- assertEquals(settings.getVerticesToAcknowledge(), copy.getVerticesToAcknowledge());
- assertEquals(settings.getVerticesToConfirm(), copy.getVerticesToConfirm());
- assertEquals(settings.getVerticesToTrigger(), copy.getVerticesToTrigger());
- assertEquals(settings.getCheckpointInterval(), copy.getCheckpointInterval());
- assertEquals(settings.getCheckpointTimeout(), copy.getCheckpointTimeout());
- assertEquals(settings.getMinPauseBetweenCheckpoints(), copy.getMinPauseBetweenCheckpoints());
- assertEquals(settings.getMaxConcurrentCheckpoints(), copy.getMaxConcurrentCheckpoints());
- assertEquals(settings.getExternalizedCheckpointSettings().externalizeCheckpoints(), copy.getExternalizedCheckpointSettings().externalizeCheckpoints());
- assertEquals(settings.getExternalizedCheckpointSettings().deleteOnCancellation(), copy.getExternalizedCheckpointSettings().deleteOnCancellation());
- assertEquals(settings.isExactlyOnce(), copy.isExactlyOnce());
- assertNotNull(copy.getDefaultStateBackend());
- assertTrue(copy.getDefaultStateBackend().getClass() == MemoryStateBackend.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index dcf4722..6eacaac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -61,7 +61,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
@@ -220,7 +220,7 @@ public class JobManagerHARecoveryTest {
JobGraph jobGraph = new JobGraph("TestingJob", sourceJobVertex);
List<JobVertexID> vertexId = Collections.singletonList(sourceJobVertex.getID());
- jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+ jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
vertexId,
vertexId,
vertexId,
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 4dec84b..d7fc71d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -48,7 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
@@ -821,7 +821,7 @@ public class JobManagerTest extends TestLogger {
JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
- JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+ JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
Collections.singletonList(sourceVertex.getID()),
Collections.singletonList(sourceVertex.getID()),
Collections.singletonList(sourceVertex.getID()),
@@ -947,7 +947,7 @@ public class JobManagerTest extends TestLogger {
JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
- JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+ JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
Collections.singletonList(sourceVertex.getID()),
Collections.singletonList(sourceVertex.getID()),
Collections.singletonList(sourceVertex.getID()),
@@ -1053,7 +1053,7 @@ public class JobManagerTest extends TestLogger {
JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
- JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+ JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
Collections.singletonList(sourceVertex.getID()),
Collections.singletonList(sourceVertex.getID()),
Collections.singletonList(sourceVertex.getID()),
@@ -1156,7 +1156,7 @@ public class JobManagerTest extends TestLogger {
JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
- JobSnapshottingSettings snapshottingSettings = new JobSnapshottingSettings(
+ JobCheckpointingSettings snapshottingSettings = new JobCheckpointingSettings(
Collections.singletonList(sourceVertex.getID()),
Collections.singletonList(sourceVertex.getID()),
Collections.singletonList(sourceVertex.getID()),
@@ -1203,7 +1203,7 @@ public class JobManagerTest extends TestLogger {
JobGraph newJobGraph = new JobGraph("NewTestingJob", newSourceVertex);
- JobSnapshottingSettings newSnapshottingSettings = new JobSnapshottingSettings(
+ JobCheckpointingSettings newSnapshottingSettings = new JobCheckpointingSettings(
Collections.singletonList(newSourceVertex.getID()),
Collections.singletonList(newSourceVertex.getID()),
Collections.singletonList(newSourceVertex.getID()),
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index ba5f973..bdff401 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -32,7 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -228,7 +228,7 @@ public class JobSubmitTest {
List<JobVertexID> vertexIdList = Collections.singletonList(jobVertex.getID());
JobGraph jg = new JobGraph("test job", jobVertex);
- jg.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
+ jg.setSnapshotSettings(new JobCheckpointingSettings(vertexIdList, vertexIdList, vertexIdList,
5000, 5000, 0L, 10, ExternalizedCheckpointSettings.none(), null, true));
return jg;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
index 5374d01..ce8517e 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala
@@ -27,7 +27,7 @@ import org.apache.flink.runtime.checkpoint.{CheckpointCoordinator, CompletedChec
import org.apache.flink.runtime.client.JobExecutionException
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture
import org.apache.flink.runtime.io.network.partition.ResultPartitionType
-import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobSnapshottingSettings}
+import org.apache.flink.runtime.jobgraph.tasks.{ExternalizedCheckpointSettings, JobCheckpointingSettings}
import org.apache.flink.runtime.jobgraph.{DistributionPattern, JobGraph, JobVertex, ScheduleMode}
import org.apache.flink.runtime.jobmanager.Tasks._
import org.apache.flink.runtime.jobmanager.scheduler.{NoResourceAvailableException, SlotSharingGroup}
@@ -827,7 +827,7 @@ class JobManagerITCase(_system: ActorSystem)
val jobVertex = new JobVertex("Blocking vertex")
jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
val jobGraph = new JobGraph(jobVertex)
- jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+ jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
java.util.Collections.emptyList(),
java.util.Collections.emptyList(),
java.util.Collections.emptyList(),
@@ -887,7 +887,7 @@ class JobManagerITCase(_system: ActorSystem)
val jobVertex = new JobVertex("Blocking vertex")
jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
val jobGraph = new JobGraph(jobVertex)
- jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+ jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
java.util.Collections.emptyList(),
java.util.Collections.emptyList(),
java.util.Collections.emptyList(),
@@ -955,7 +955,7 @@ class JobManagerITCase(_system: ActorSystem)
val jobVertex = new JobVertex("Blocking vertex")
jobVertex.setInvokableClass(classOf[BlockingNoOpInvokable])
val jobGraph = new JobGraph(jobVertex)
- jobGraph.setSnapshotSettings(new JobSnapshottingSettings(
+ jobGraph.setSnapshotSettings(new JobCheckpointingSettings(
java.util.Collections.emptyList(),
java.util.Collections.emptyList(),
java.util.Collections.emptyList(),
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 794de5a..7d62273 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -36,7 +36,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
@@ -579,7 +579,7 @@ public class StreamingJobGraphGenerator {
"exactly-once or at-least-once.");
}
- JobSnapshottingSettings settings = new JobSnapshottingSettings(
+ JobCheckpointingSettings settings = new JobCheckpointingSettings(
triggerVertices, ackVertices, commitVertices, interval,
cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
cfg.getMaxConcurrentCheckpoints(),
http://git-wip-us.apache.org/repos/asf/flink/blob/5f0d6769/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index 6d2fcaa..2c71a07 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -26,7 +26,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.IterativeStream;
@@ -118,7 +118,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger {
StreamingJobGraphGenerator jobGraphGenerator = new StreamingJobGraphGenerator(streamGraph);
JobGraph jobGraph = jobGraphGenerator.createJobGraph();
- JobSnapshottingSettings snapshottingSettings = jobGraph.getSnapshotSettings();
+ JobCheckpointingSettings snapshottingSettings = jobGraph.getCheckpointingSettings();
assertEquals(Long.MAX_VALUE, snapshottingSettings.getCheckpointInterval());
}
|