flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [09/22] flink git commit: [FLINK-8531] [checkpoints] (part 2) Add CheckpointType to CheckpointProperties
Date Thu, 01 Feb 2018 15:46:41 GMT
[FLINK-8531] [checkpoints] (part 2) Add CheckpointType to CheckpointProperties


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/35c7d93e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/35c7d93e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/35c7d93e

Branch: refs/heads/master
Commit: 35c7d93ee85aa8689e804b713affa65b46af1acc
Parents: 99495c9
Author: Stephan Ewen <sewen@apache.org>
Authored: Fri Jan 19 15:18:57 2018 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Feb 1 13:54:54 2018 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointProperties.java        | 52 +++++++++++++-------
 .../checkpoint/CompletedCheckpointTest.java     |  8 +--
 .../checkpoint/PendingCheckpointTest.java       | 10 ++--
 .../checkpoint/RestoredCheckpointStatsTest.java |  2 +-
 4 files changed, 43 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/35c7d93e/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 8d6346c..07780c2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -18,27 +18,31 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * The configuration of a checkpoint, such as whether
+ * The configuration of a checkpoint. This describes whether
  * <ul>
- *     <li>The checkpoint should be persisted</li>
- *     <li>The checkpoint must be full, or may be incremental (not yet implemented)</li>
- *     <li>The checkpoint format must be the common (cross backend) format,
- *     or may be state-backend specific (not yet implemented)</li>
- *     <li>when the checkpoint should be garbage collected</li>
+ *     <li>The checkpoint is s regular checkpoint or a savepoint.</li>
+ *     <li>When the checkpoint should be garbage collected.</li>
  * </ul>
  */
 public class CheckpointProperties implements Serializable {
 
-	private static final long serialVersionUID = -8835900655844879470L;
+	private static final long serialVersionUID = 2L;
 
-	private final boolean forced;
+	/** Type - checkpoint / savepoint. */
+	private final CheckpointType checkpointType;
 
-	private final boolean savepoint;
+	/** This has a misleading name and actually means whether the snapshot must be triggered,
+	 * or whether it may be rejected by the checkpoint coordinator if too many checkpoints are
+	 * currently in progress. */
+	private final boolean forced;
 
 	private final boolean discardSubsumed;
 	private final boolean discardFinished;
@@ -46,9 +50,10 @@ public class CheckpointProperties implements Serializable {
 	private final boolean discardFailed;
 	private final boolean discardSuspended;
 
+	@VisibleForTesting
 	CheckpointProperties(
 			boolean forced,
-			boolean savepoint,
+			CheckpointType checkpointType,
 			boolean discardSubsumed,
 			boolean discardFinished,
 			boolean discardCancelled,
@@ -56,7 +61,7 @@ public class CheckpointProperties implements Serializable {
 			boolean discardSuspended) {
 
 		this.forced = forced;
-		this.savepoint = savepoint;
+		this.checkpointType = checkNotNull(checkpointType);
 		this.discardSubsumed = discardSubsumed;
 		this.discardFinished = discardFinished;
 		this.discardCancelled = discardCancelled;
@@ -159,12 +164,19 @@ public class CheckpointProperties implements Serializable {
 	}
 
 	/**
+	 * Gets the type of the checkpoint (checkpoint / savepoint).
+	 */
+	public CheckpointType getCheckpointType() {
+		return checkpointType;
+	}
+
+	/**
 	 * Returns whether the checkpoint properties describe a standard savepoint.
 	 *
 	 * @return <code>true</code> if the properties describe a savepoint, <code>false</code>
otherwise.
 	 */
 	public boolean isSavepoint() {
-		return savepoint;
+		return checkpointType == CheckpointType.SAVEPOINT;
 	}
 
 	// ------------------------------------------------------------------------
@@ -181,7 +193,7 @@ public class CheckpointProperties implements Serializable {
 
 		CheckpointProperties that = (CheckpointProperties) o;
 		return forced == that.forced &&
-				savepoint == that.savepoint &&
+				checkpointType == that.checkpointType &&
 				discardSubsumed == that.discardSubsumed &&
 				discardFinished == that.discardFinished &&
 				discardCancelled == that.discardCancelled &&
@@ -192,7 +204,7 @@ public class CheckpointProperties implements Serializable {
 	@Override
 	public int hashCode() {
 		int result = (forced ? 1 : 0);
-		result = 31 * result + (savepoint ? 1 : 0);
+		result = 31 * result + checkpointType.hashCode();
 		result = 31 * result + (discardSubsumed ? 1 : 0);
 		result = 31 * result + (discardFinished ? 1 : 0);
 		result = 31 * result + (discardCancelled ? 1 : 0);
@@ -205,7 +217,7 @@ public class CheckpointProperties implements Serializable {
 	public String toString() {
 		return "CheckpointProperties{" +
 				"forced=" + forced +
-				", savepoint=" + savepoint +
+				", checkpointType=" + checkpointType +
 				", discardSubsumed=" + discardSubsumed +
 				", discardFinished=" + discardFinished +
 				", discardCancelled=" + discardCancelled +
@@ -215,10 +227,12 @@ public class CheckpointProperties implements Serializable {
 	}
 
 	// ------------------------------------------------------------------------
+	//  Factories and pre-configured properties
+	// ------------------------------------------------------------------------
 
 	private static final CheckpointProperties SAVEPOINT = new CheckpointProperties(
 			true,
-			true,
+			CheckpointType.SAVEPOINT,
 			false,
 			false,
 			false,
@@ -227,7 +241,7 @@ public class CheckpointProperties implements Serializable {
 
 	private static final CheckpointProperties CHECKPOINT_NEVER_RETAINED = new CheckpointProperties(
 			false,
-			false,
+			CheckpointType.CHECKPOINT,
 			true,
 			true,  // Delete on success
 			true,  // Delete on cancellation
@@ -236,7 +250,7 @@ public class CheckpointProperties implements Serializable {
 
 	private static final CheckpointProperties CHECKPOINT_RETAINED_ON_FAILURE = new CheckpointProperties(
 			false,
-			false,
+			CheckpointType.CHECKPOINT,
 			true,
 			true,  // Delete on success
 			true,  // Delete on cancellation
@@ -245,7 +259,7 @@ public class CheckpointProperties implements Serializable {
 
 	private static final CheckpointProperties CHECKPOINT_RETAINED_ON_CANCELLATION = new CheckpointProperties(
 			false,
-			false,
+			CheckpointType.CHECKPOINT,
 			true,
 			true,   // Delete on success
 			false,  // Retain on cancellation

http://git-wip-us.apache.org/repos/asf/flink/blob/35c7d93e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 98b6647..1bdb947 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -85,8 +85,8 @@ public class CompletedCheckpointTest {
 		StreamStateHandle metadataHandle = mock(StreamStateHandle.class);
 
 		boolean discardSubsumed = true;
-		CheckpointProperties props = new CheckpointProperties(false, false, discardSubsumed, true,
true, true, true);
-		
+		CheckpointProperties props = new CheckpointProperties(false, CheckpointType.CHECKPOINT,
discardSubsumed, true, true, true, true);
+
 		CompletedCheckpoint checkpoint = new CompletedCheckpoint(
 				new JobID(), 0, 0, 1,
 				operatorStates,
@@ -124,7 +124,7 @@ public class CompletedCheckpointTest {
 			StreamStateHandle metadataHandle = mock(StreamStateHandle.class);
 
 			// Keep
-			CheckpointProperties props = new CheckpointProperties(false, true, false, false, false,
false, false);
+			CheckpointProperties props = new CheckpointProperties(false, CheckpointType.SAVEPOINT,
false, false, false, false, false);
 			CompletedCheckpoint checkpoint = new CompletedCheckpoint(
 					new JobID(), 0, 0, 1,
 					new HashMap<>(operatorStates),
@@ -141,7 +141,7 @@ public class CompletedCheckpointTest {
 			verify(metadataHandle, times(0)).discardState();
 
 			// Discard
-			props = new CheckpointProperties(false, false, true, true, true, true, true);
+			props = new CheckpointProperties(false, CheckpointType.CHECKPOINT, true, true, true, true,
true);
 			checkpoint = new CompletedCheckpoint(
 					new JobID(), 0, 0, 1,
 					new HashMap<>(operatorStates),

http://git-wip-us.apache.org/repos/asf/flink/blob/35c7d93e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 7b6992b..284a4b1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -86,7 +86,7 @@ public class PendingCheckpointTest {
 	@Test
 	public void testCanBeSubsumed() throws Exception {
 		// Forced checkpoints cannot be subsumed
-		CheckpointProperties forced = new CheckpointProperties(true, true, false, false, false,
false, false);
+		CheckpointProperties forced = new CheckpointProperties(true, CheckpointType.SAVEPOINT,
false, false, false, false, false);
 		PendingCheckpoint pending = createPendingCheckpoint(forced);
 		assertFalse(pending.canBeSubsumed());
 
@@ -98,7 +98,7 @@ public class PendingCheckpointTest {
 		}
 
 		// Non-forced checkpoints can be subsumed
-		CheckpointProperties subsumed = new CheckpointProperties(false, true, false, false, false,
false, false);
+		CheckpointProperties subsumed = new CheckpointProperties(false, CheckpointType.SAVEPOINT,
false, false, false, false, false);
 		pending = createPendingCheckpoint(subsumed);
 		assertTrue(pending.canBeSubsumed());
 	}
@@ -109,7 +109,7 @@ public class PendingCheckpointTest {
 	 */
 	@Test
 	public void testCompletionFuture() throws Exception {
-		CheckpointProperties props = new CheckpointProperties(false, true, false, false, false,
false, false);
+		CheckpointProperties props = new CheckpointProperties(false, CheckpointType.SAVEPOINT,
false, false, false, false, false);
 
 		// Abort declined
 		PendingCheckpoint pending = createPendingCheckpoint(props);
@@ -164,7 +164,7 @@ public class PendingCheckpointTest {
 	@Test
 	@SuppressWarnings("unchecked")
 	public void testAbortDiscardsState() throws Exception {
-		CheckpointProperties props = new CheckpointProperties(false, true, false, false, false,
false, false);
+		CheckpointProperties props = new CheckpointProperties(false, CheckpointType.SAVEPOINT,
false, false, false, false, false);
 		QueueExecutor executor = new QueueExecutor();
 
 		OperatorState state = mock(OperatorState.class);
@@ -307,7 +307,7 @@ public class PendingCheckpointTest {
 
 	@Test
 	public void testSetCanceller() throws Exception {
-		final CheckpointProperties props = new CheckpointProperties(false, false, true, true, true,
true, true);
+		final CheckpointProperties props = new CheckpointProperties(false, CheckpointType.CHECKPOINT,
true, true, true, true, true);
 
 		PendingCheckpoint aborted = createPendingCheckpoint(props);
 		aborted.abortDeclined();

http://git-wip-us.apache.org/repos/asf/flink/blob/35c7d93e/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
index 85b1516..97b6a91 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/RestoredCheckpointStatsTest.java
@@ -31,7 +31,7 @@ public class RestoredCheckpointStatsTest {
 	public void testSimpleAccess() throws Exception {
 		long checkpointId = Integer.MAX_VALUE + 1L;
 		long triggerTimestamp = Integer.MAX_VALUE + 1L;
-		CheckpointProperties props = new CheckpointProperties(true, true, false, false, true, false,
true);
+		CheckpointProperties props = new CheckpointProperties(true, CheckpointType.SAVEPOINT, false,
false, true, false, true);
 		long restoreTimestamp = Integer.MAX_VALUE + 1L;
 		String externalPath = "external-path";
 


Mime
View raw message