flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [24/50] [abbrv] flink git commit: [FLINK-3107] [runtime] Start checkpoint ID counter with periodic scheduler
Date Fri, 12 Feb 2016 11:29:49 GMT
[FLINK-3107] [runtime] Start checkpoint ID counter with periodic scheduler

Problem: The job manager enables checkpoints during submission of streaming
programs. This can lead to call to a call to `ZooKeeperCheckpointIDCounter.start()`,
which communicates with ZooKeeper. This can block the job manager actor.

Solution: Start the counter in the `CheckpointCoordinatorDeActivator`.

This closes #1610.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8df0bbac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8df0bbac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8df0bbac

Branch: refs/heads/tableOnCalcite
Commit: 8df0bbacb8712342471c12cbf765a0a92b70abc9
Parents: 6968a57
Author: Ufuk Celebi <uce@apache.org>
Authored: Tue Feb 9 16:06:46 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Feb 10 19:51:59 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/checkpoint/CheckpointCoordinator.java | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8df0bbac/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 9963a20..b0e23d6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -197,9 +197,9 @@ public class CheckpointCoordinator {
 		this.completedCheckpointStore = checkNotNull(completedCheckpointStore);
 		this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
 		this.userClassLoader = userClassLoader;
-		this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
 
-		checkpointIDCounter.start();
+		// Started with the periodic scheduler
+		this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
 
 		this.timer = new Timer("Checkpoint Timer", true);
 
@@ -862,6 +862,14 @@ public class CheckpointCoordinator {
 			// make sure all prior timers are cancelled
 			stopCheckpointScheduler();
 
+			try {
+				// Multiple start calls are OK
+				checkpointIdCounter.start();
+			} catch (Exception e) {
+				String msg = "Failed to start checkpoint ID counter: " + e.getMessage();
+				throw new RuntimeException(msg, e);
+			}
+
 			periodicScheduling = true;
 			currentPeriodicTrigger = new ScheduledTrigger();
 			timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);


Mime
View raw message