flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject flink git commit: [FLINK-3107] [runtime] Defer start of checkpoint ID counter
Date Fri, 12 Feb 2016 22:17:49 GMT
Repository: flink
Updated Branches:
  refs/heads/master 8e55bbd41 -> 937963e33


[FLINK-3107] [runtime] Defer start of checkpoint ID counter


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/937963e3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/937963e3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/937963e3

Branch: refs/heads/master
Commit: 937963e339ddd57cc65e3da8af5398600e4d9ad2
Parents: 8e55bbd
Author: Ufuk Celebi <uce@apache.org>
Authored: Fri Feb 12 22:30:13 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Fri Feb 12 23:17:04 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/checkpoint/SavepointCoordinator.java |  1 +
 .../runtime/checkpoint/SavepointCoordinatorTest.java   | 13 ++++++++++++-
 2 files changed, 13 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/937963e3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
index 6ce6502..ea4b8ae 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SavepointCoordinator.java
@@ -230,6 +230,7 @@ public class SavepointCoordinator extends CheckpointCoordinator {
 
 			// Reset the checkpoint ID counter
 			long nextCheckpointId = checkpoint.getCheckpointID();
+			checkpointIdCounter.start();
 			checkpointIdCounter.setCount(nextCheckpointId + 1);
 			LOG.info("Reset the checkpoint ID to {}", nextCheckpointId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/937963e3/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
index 4f9ae60..6bbdf62 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/SavepointCoordinatorTest.java
@@ -175,6 +175,7 @@ public class SavepointCoordinatorTest {
 			}
 		}
 
+		MockCheckpointIdCounter idCounter = new MockCheckpointIdCounter();
 		StateStore<Savepoint> savepointStore = new HeapStateStore<>();
 
 		SavepointCoordinator coordinator = createSavepointCoordinator(
@@ -184,7 +185,7 @@ public class SavepointCoordinatorTest {
 				triggerVertices,
 				ackVertices,
 				new ExecutionVertex[] {},
-				new MockCheckpointIdCounter(),
+				idCounter,
 				savepointStore);
 
 		Future<String> savepointPathFuture = coordinator.triggerSavepoint(1231273123);
@@ -213,6 +214,9 @@ public class SavepointCoordinatorTest {
 		// Verify all promises removed
 		assertEquals(0, getSavepointPromises(coordinator).size());
 
+		// Verify checkpoint ID counter started
+		assertTrue(idCounter.isStarted());
+
 		coordinator.shutdown();
 	}
 
@@ -1083,15 +1087,18 @@ public class SavepointCoordinatorTest {
 
 	private static class MockCheckpointIdCounter implements CheckpointIDCounter {
 
+		private boolean started;
 		private long count;
 		private long lastReturnedCount;
 
 		@Override
 		public void start() throws Exception {
+			started = true;
 		}
 
 		@Override
 		public void stop() throws Exception {
+			started = false;
 		}
 
 		@Override
@@ -1108,5 +1115,9 @@ public class SavepointCoordinatorTest {
 		long getLastReturnedCount() {
 			return lastReturnedCount;
 		}
+
+		public boolean isStarted() {
+			return started;
+		}
 	}
 }


Mime
View raw message