flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/2] flink git commit: [FLINK-3051] [streaming] Add mechanisms to control the maximum number of concurrent checkpoints
Date Thu, 26 Nov 2015 17:48:46 GMT
[FLINK-3051] [streaming] Add mechanisms to control the maximum number of concurrent checkpoints

This closes #1408


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55fd5f32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55fd5f32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55fd5f32

Branch: refs/heads/master
Commit: 55fd5f32d7ef0292a01192ab08456fae49b91791
Parents: 4097666
Author: Stephan Ewen <sewen@apache.org>
Authored: Thu Nov 19 19:05:47 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Thu Nov 26 17:16:29 2015 +0100

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinator.java       | 402 ++++++++++++-------
 .../CheckpointCoordinatorDeActivator.java       |  11 +-
 .../runtime/checkpoint/PendingCheckpoint.java   |  10 +-
 .../runtime/executiongraph/ExecutionGraph.java  |  22 +-
 .../jobgraph/tasks/JobSnapshottingSettings.java |  55 ++-
 .../flink/runtime/jobmanager/JobManager.scala   |   2 +
 .../checkpoint/CheckpointCoordinatorTest.java   | 357 +++++++++++++++-
 .../checkpoint/CheckpointStateRestoreTest.java  |   6 +-
 .../checkpoint/CoordinatorShutdownTest.java     |   6 +-
 .../api/environment/CheckpointConfig.java       | 221 ++++++++++
 .../environment/StreamExecutionEnvironment.java |  61 +--
 .../flink/streaming/api/graph/StreamGraph.java  | 105 ++---
 .../api/graph/StreamGraphGenerator.java         |  11 -
 .../api/graph/StreamingJobGraphGenerator.java   |  36 +-
 .../api/scala/StreamExecutionEnvironment.scala  |   7 +
 15 files changed, 990 insertions(+), 322 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 09dd2d9..454b88a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import akka.actor.ActorSystem;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -34,6 +35,7 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,7 +48,6 @@ import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -63,11 +64,12 @@ import static com.google.common.base.Preconditions.checkNotNull;
  */
 public class CheckpointCoordinator {
 	
-	private static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
+	static final Logger LOG = LoggerFactory.getLogger(CheckpointCoordinator.class);
 	
 	/** The number of recent checkpoints whose IDs are remembered */
 	private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
 	
+	
 	/** Coordinator-wide lock to safeguard the checkpoint updates */
 	private final Object lock = new Object();
 	
@@ -83,35 +85,58 @@ public class CheckpointCoordinator {
 	/** Tasks who need to be sent a message when a checkpoint is confirmed */
 	private final ExecutionVertex[] tasksToCommitTo;
 
+	/** Map from checkpoint ID to the pending checkpoint */
 	private final Map<Long, PendingCheckpoint> pendingCheckpoints;
 
-	/**
-	 * Completed checkpoints. Implementations can be blocking. Make sure calls to methods
-	 * accessing this don't block the job manager actor and run asynchronously.
-	 */
+	/** Completed checkpoints. Implementations can be blocking. Make sure calls to methods
+	 * accessing this don't block the job manager actor and run asynchronously. */
 	private final CompletedCheckpointStore completedCheckpointStore;
 	
+	/** A list of recent checkpoint IDs, to identify late messages (vs invalid ones) */
 	private final ArrayDeque<Long> recentPendingCheckpoints;
 
-	/**
-	 * Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these
-	 * need to be ascending across job managers.
-	 */
+	/** Checkpoint ID counter to ensure ascending IDs. In case of job manager failures, these
+	 * need to be ascending across job managers. */
 	private final CheckpointIDCounter checkpointIdCounter;
 
-	private final AtomicInteger numUnsuccessfulCheckpointsTriggers = new AtomicInteger();
+	/** Class loader used to deserialize the state handles (as they may be user-defined) */
+	private final ClassLoader userClassLoader;
 
-	/** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
-	private final Timer timer;
+	/** The base checkpoint interval. Actual trigger time may be affected by the
+	 * max concurrent checkpoints and minimum-pause values */
+	private final long baseInterval;
 	
+	/** The max time (in ms) that a checkpoint may take */
 	private final long checkpointTimeout;
+
+	/** The min time(in ms) to delay after a checkpoint could be triggered. Allows to 
+	 * enforce minimum processing time between checkpoint attempts */
+	private final long minPauseBetweenCheckpoints;
+	
+	/** The maximum number of checkpoints that may be in progress at the same time */
+	private final int maxConcurrentCheckpointAttempts;
 	
-	private TimerTask periodicScheduler;
+	/** The timer that handles the checkpoint timeouts and triggers periodic checkpoints */
+	private final Timer timer;
 	
+	/** Actor that receives status updates from the execution graph this coordinator works for */ 
 	private ActorGateway jobStatusListener;
+
+	/** The number of consecutive failed trigger attempts */ 
+	private int numUnsuccessfulCheckpointsTriggers;
+	
+	
+	private ScheduledTrigger currentPeriodicTrigger;
+
+	/** Flag whether a triggered checkpoint should immediately schedule the next checkpoint.
+	 * Non-volatile, because only accessed in synchronized scope */
+	private boolean periodicScheduling;
 	
-	private ClassLoader userClassLoader;
+	/** Flag whether a trigger request could not be handled immediately. Non-volatile, because only
+	 * accessed in synchronized scope */
+	private boolean triggerRequestQueued;
 	
+	/** Flag marking the coordinator as shut down (not accepting any messages any more) */
 	private volatile boolean shutdown;
 
 	/** Shutdown hook thread to clean up state handles. */
@@ -121,6 +146,7 @@ public class CheckpointCoordinator {
 
 	public CheckpointCoordinator(
 			JobID job,
+			long baseInterval,
 			long checkpointTimeout,
 			ExecutionVertex[] tasksToTrigger,
 			ExecutionVertex[] tasksToWaitFor,
@@ -130,11 +156,36 @@ public class CheckpointCoordinator {
 			CompletedCheckpointStore completedCheckpointStore,
 			RecoveryMode recoveryMode) throws Exception {
 		
+		this(job, baseInterval, checkpointTimeout, 0L, Integer.MAX_VALUE,
+				tasksToTrigger, tasksToWaitFor, tasksToCommitTo,
+				userClassLoader, checkpointIDCounter, completedCheckpointStore, recoveryMode);
+	}
+	
+	public CheckpointCoordinator(
+			JobID job,
+			long baseInterval,
+			long checkpointTimeout,
+			long minPauseBetweenCheckpoints,
+			int maxConcurrentCheckpointAttempts,
+			ExecutionVertex[] tasksToTrigger,
+			ExecutionVertex[] tasksToWaitFor,
+			ExecutionVertex[] tasksToCommitTo,
+			ClassLoader userClassLoader,
+			CheckpointIDCounter checkpointIDCounter,
+			CompletedCheckpointStore completedCheckpointStore,
+			RecoveryMode recoveryMode) throws Exception {
+		
 		// Sanity check
+		checkArgument(baseInterval > 0, "Checkpoint timeout must be larger than zero");
 		checkArgument(checkpointTimeout >= 1, "Checkpoint timeout must be larger than zero");
+		checkArgument(minPauseBetweenCheckpoints >= 0, "minPauseBetweenCheckpoints must be >= 0");
+		checkArgument(maxConcurrentCheckpointAttempts >= 1, "maxConcurrentCheckpointAttempts must be >= 1");
 		
 		this.job = checkNotNull(job);
+		this.baseInterval = baseInterval;
 		this.checkpointTimeout = checkpointTimeout;
+		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+		this.maxConcurrentCheckpointAttempts = maxConcurrentCheckpointAttempts;
 		this.tasksToTrigger = checkNotNull(tasksToTrigger);
 		this.tasksToWaitFor = checkNotNull(tasksToWaitFor);
 		this.tasksToCommitTo = checkNotNull(tasksToCommitTo);
@@ -143,10 +194,11 @@ public class CheckpointCoordinator {
 		this.recentPendingCheckpoints = new ArrayDeque<Long>(NUM_GHOST_CHECKPOINT_IDS);
 		this.userClassLoader = userClassLoader;
 		this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
+		
 		checkpointIDCounter.start();
 
 		this.timer = new Timer("Checkpoint Timer", true);
-
+		
 		if (recoveryMode == RecoveryMode.STANDALONE) {
 			// Add shutdown hook to clean up state handles when no checkpoint recovery is
 			// possible. In case of another configured recovery mode, the checkpoints need to be
@@ -158,7 +210,7 @@ public class CheckpointCoordinator {
 						CheckpointCoordinator.this.shutdown();
 					}
 					catch (Throwable t) {
-						LOG.error("Error during shutdown of checkpoint coordniator via " +
+						LOG.error("Error during shutdown of checkpoint coordinator via " +
 								"JVM shutdown hook: " + t.getMessage(), t);
 					}
 				}
@@ -197,7 +249,10 @@ public class CheckpointCoordinator {
 					shutdown = true;
 					LOG.info("Stopping checkpoint coordinator for job " + job);
 
-					// shut down the thread that handles the timeouts
+					periodicScheduling = false;
+					triggerRequestQueued = false;
+					
+					// shut down the thread that handles the timeouts and pending triggers
 					timer.cancel();
 
 					// make sure that the actor does not linger
@@ -206,17 +261,11 @@ public class CheckpointCoordinator {
 						jobStatusListener = null;
 					}
 
-					// the scheduling thread needs also to go away
-					if (periodicScheduler != null) {
-						periodicScheduler.cancel();
-						periodicScheduler = null;
-					}
-
 					checkpointIdCounter.stop();
 
 					// clear and discard all pending checkpoints
 					for (PendingCheckpoint pending : pendingCheckpoints.values()) {
-							pending.discard(userClassLoader, true);
+						pending.discard(userClassLoader);
 					}
 					pendingCheckpoints.clear();
 
@@ -235,7 +284,7 @@ public class CheckpointCoordinator {
 						// race, JVM is in shutdown already, we can safely ignore this
 					}
 					catch (Throwable t) {
-						LOG.warn("Error unregistering checkpoint cooordniator shutdown hook.", t);
+						LOG.warn("Error unregistering checkpoint coordinator shutdown hook.", t);
 					}
 				}
 			}
@@ -251,93 +300,136 @@ public class CheckpointCoordinator {
 	// --------------------------------------------------------------------------------------------
 	
 	/**
-	 * Triggers a new checkpoint and uses the current system time as the
-	 * checkpoint time.
-	 */
-	public void triggerCheckpoint() throws Exception {
-		triggerCheckpoint(System.currentTimeMillis());
-	}
-
-	/**
 	 * Triggers a new checkpoint and uses the given timestamp as the checkpoint
 	 * timestamp.
 	 * 
 	 * @param timestamp The timestamp for the checkpoint.
 	 */
-	public boolean triggerCheckpoint(final long timestamp) throws Exception {
-		if (shutdown) {
-			LOG.error("Cannot trigger checkpoint, checkpoint coordinator has been shutdown.");
-			return false;
-		}
-		
-		final long checkpointID = checkpointIdCounter.getAndIncrement();
-		LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
-		
-		try {
-			// first check if all tasks that we need to trigger are running.
-			// if not, abort the checkpoint
-			ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length];
-			for (int i = 0; i < tasksToTrigger.length; i++) {
-				Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
-				if (ee != null && ee.getState() == ExecutionState.RUNNING) {
-					triggerIDs[i] = ee.getAttemptId();
-				} else {
-					LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
-							tasksToTrigger[i].getSimpleName());
-					return false;
-				}
+	public boolean triggerCheckpoint(long timestamp) throws Exception {
+		// make some eager pre-checks
+		synchronized (lock) {
+			// abort if the coordinator has been shutdown in the meantime
+			if (shutdown) {
+				return false;
+			}
+			
+			// sanity check: there should never be more than one trigger request queued
+			if (triggerRequestQueued) {
+				LOG.warn("Trying to trigger another checkpoint while one was queued already");
+				return false;
 			}
 
-			// next, check if all tasks that need to acknowledge the checkpoint are running.
-			// if not, abort the checkpoint
-			Map<ExecutionAttemptID, ExecutionVertex> ackTasks =
-								new HashMap<ExecutionAttemptID, ExecutionVertex>(tasksToWaitFor.length);
-
-			for (ExecutionVertex ev : tasksToWaitFor) {
-				Execution ee = ev.getCurrentExecutionAttempt();
-				if (ee != null) {
-					ackTasks.put(ee.getAttemptId(), ev);
-				} else {
-					LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
-							ev.getSimpleName());
-					return false;
+			// if too many checkpoints are currently in progress, we need to mark that a request is queued
+			if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
+				triggerRequestQueued = true;
+				if (currentPeriodicTrigger != null) {
+					currentPeriodicTrigger.cancel();
+					currentPeriodicTrigger = null;
 				}
+				return false;
 			}
-			
-			// register a new pending checkpoint. this makes sure we can properly receive acknowledgements
-			final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
+		}
 
-			// schedule the timer that will clean up the expired checkpoints
-			TimerTask canceller = new TimerTask() {
-				@Override
-				public void run() {
-					try {
-						synchronized (lock) {
-							// only do the work if the checkpoint is not discarded anyways
-							// note that checkpoint completion discards the pending checkpoint object
-							if (!checkpoint.isDiscarded()) {
-								LOG.info("Checkpoint " + checkpointID + " expired before completing.");
-								
-								checkpoint.discard(userClassLoader, true);
-								
-								pendingCheckpoints.remove(checkpointID);
-								rememberRecentCheckpointId(checkpointID);
-							}
+		// first check if all tasks that we need to trigger are running.
+		// if not, abort the checkpoint
+		ExecutionAttemptID[] triggerIDs = new ExecutionAttemptID[tasksToTrigger.length];
+		for (int i = 0; i < tasksToTrigger.length; i++) {
+			Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();
+			if (ee != null && ee.getState() == ExecutionState.RUNNING) {
+				triggerIDs[i] = ee.getAttemptId();
+			} else {
+				LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.",
+						tasksToTrigger[i].getSimpleName());
+				return false;
+			}
+		}
+	
+		// next, check if all tasks that need to acknowledge the checkpoint are running.
+		// if not, abort the checkpoint
+		Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new HashMap<>(tasksToWaitFor.length);
+
+		for (ExecutionVertex ev : tasksToWaitFor) {
+			Execution ee = ev.getCurrentExecutionAttempt();
+			if (ee != null) {
+				ackTasks.put(ee.getAttemptId(), ev);
+			} else {
+				LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.",
+						ev.getSimpleName());
+				return false;
+			}
+		}
+
+		// we will actually trigger this checkpoint!
+		
+		final long checkpointID;
+		try {
+			// this must happen outside the locked scope, because it communicates
+			// with external services (in HA mode) and may block for a while.
+			checkpointID = checkpointIdCounter.getAndIncrement();
+		}
+		catch (Throwable t) {
+			int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
+			LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+			return false;
+		}
+
+		LOG.info("Triggering checkpoint " + checkpointID + " @ " + timestamp);
+		
+		final PendingCheckpoint checkpoint = new PendingCheckpoint(job, checkpointID, timestamp, ackTasks);
+
+		// schedule the timer that will clean up the expired checkpoints
+		TimerTask canceller = new TimerTask() {
+			@Override
+			public void run() {
+				try {
+					synchronized (lock) {
+						// only do the work if the checkpoint is not discarded anyways
+						// note that checkpoint completion discards the pending checkpoint object
+						if (!checkpoint.isDiscarded()) {
+							LOG.info("Checkpoint " + checkpointID + " expired before completing.");
+
+							checkpoint.discard(userClassLoader);
+							pendingCheckpoints.remove(checkpointID);
+							rememberRecentCheckpointId(checkpointID);
+
+							triggerQueuedRequests();
 						}
 					}
-					catch (Throwable t) {
-						LOG.error("Exception while handling checkpoint timeout", t);
-					}
 				}
-			};
-			
+				catch (Throwable t) {
+					LOG.error("Exception while handling checkpoint timeout", t);
+				}
+			}
+		};
+		
+		try {
+			// re-acquire the lock
 			synchronized (lock) {
+				// since we released the lock in the meantime, we need to re-check
+				// that the conditions still hold. this is clumsy, but it allows us to
+				// release the lock in the meantime while calls to external services are
+				// blocking progress, and still gives us early checks that skip work
+				// if no checkpoint can happen anyways
 				if (shutdown) {
-					throw new IllegalStateException("Checkpoint coordinator has been shutdown.");
+					return false;
+				}
+				else if (triggerRequestQueued) {
+					LOG.warn("Trying to trigger another checkpoint while one was queued already");
+					return false;
+				}
+				else if (pendingCheckpoints.size() >= maxConcurrentCheckpointAttempts) {
+					triggerRequestQueued = true;
+					if (currentPeriodicTrigger != null) {
+						currentPeriodicTrigger.cancel();
+						currentPeriodicTrigger = null;
+					}
+					return false;
 				}
+				
 				pendingCheckpoints.put(checkpointID, checkpoint);
 				timer.schedule(canceller, checkpointTimeout);
 			}
+			// end of lock scope
 
 			// send the messages to the tasks that trigger their checkpoint
 			for (int i = 0; i < tasksToTrigger.length; i++) {
@@ -345,21 +437,21 @@ public class CheckpointCoordinator {
 				TriggerCheckpoint message = new TriggerCheckpoint(job, id, checkpointID, timestamp);
 				tasksToTrigger[i].sendMessageToCurrentExecution(message, id);
 			}
-			
-			numUnsuccessfulCheckpointsTriggers.set(0);
+
+			numUnsuccessfulCheckpointsTriggers = 0;
 			return true;
 		}
 		catch (Throwable t) {
-			int numUnsuccessful = numUnsuccessfulCheckpointsTriggers.incrementAndGet();
-			LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
-			
+			// guard the map against concurrent modifications
 			synchronized (lock) {
-				PendingCheckpoint checkpoint = pendingCheckpoints.remove(checkpointID);
-				if (checkpoint != null && !checkpoint.isDiscarded()) {
-					checkpoint.discard(userClassLoader, true);
-				}
+				pendingCheckpoints.remove(checkpointID);
 			}
 			
+			int numUnsuccessful = ++numUnsuccessfulCheckpointsTriggers;
+			LOG.warn("Failed to trigger checkpoint (" + numUnsuccessful + " consecutive failed attempts so far)", t);
+			if (!checkpoint.isDiscarded()) {
+				checkpoint.discard(userClassLoader);
+			}
 			return false;
 		}
 	}
@@ -401,6 +493,8 @@ public class CheckpointCoordinator {
 						rememberRecentCheckpointId(checkpointId);
 						
 						dropSubsumedCheckpoints(completed.getTimestamp());
+
+						triggerQueuedRequests();
 					}
 				}
 				else {
@@ -455,13 +549,38 @@ public class CheckpointCoordinator {
 			if (p.getCheckpointTimestamp() < timestamp) {
 				rememberRecentCheckpointId(p.getCheckpointId());
 
-				p.discard(userClassLoader, true);
+				p.discard(userClassLoader);
 
 				entries.remove();
 			}
 		}
 	}
 
+	/**
+	 * Triggers the queued request, if there is one.
+	 * 
+	 * <p>NOTE: The caller of this method must hold the lock when invoking the method!
+	 */
+	private void triggerQueuedRequests() throws Exception {
+		if (triggerRequestQueued) {
+			triggerRequestQueued = false;
+
+			// trigger the checkpoint from the trigger timer, to finish the work of this thread before
+			// starting with the next checkpoint
+			ScheduledTrigger trigger = new ScheduledTrigger();
+			if (periodicScheduling) {
+				if (currentPeriodicTrigger != null) {
+					currentPeriodicTrigger.cancel();
+				}
+				currentPeriodicTrigger = trigger; 
+				timer.scheduleAtFixedRate(trigger, 0L, baseInterval);
+			}
+			else {
+				timer.schedule(trigger, 0L);
+			}
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Checkpoint State Restoring
 	// --------------------------------------------------------------------------------------------
@@ -557,63 +676,74 @@ public class CheckpointCoordinator {
 	//  Periodic scheduling of checkpoints
 	// --------------------------------------------------------------------------------------------
 	
-	public void startPeriodicCheckpointScheduler(long interval) {
+	public void startCheckpointScheduler() {
 		synchronized (lock) {
 			if (shutdown) {
 				throw new IllegalArgumentException("Checkpoint coordinator is shut down");
 			}
 			
-			// cancel any previous scheduler
-			stopPeriodicCheckpointScheduler();
+			// make sure all prior timers are cancelled 
+			stopCheckpointScheduler();
 			
-			// start a new scheduler
-			periodicScheduler = new TimerTask() {
-				@Override
-				public void run() {
-					try {
-						triggerCheckpoint();
-					}
-					catch (Exception e) {
-						LOG.error("Exception while triggering checkpoint", e);
-					}
-				}
-			};
-			timer.scheduleAtFixedRate(periodicScheduler, interval, interval);
+			periodicScheduling = true;
+			currentPeriodicTrigger = new ScheduledTrigger();
+			timer.scheduleAtFixedRate(currentPeriodicTrigger, baseInterval, baseInterval);
 		}
 	}
 	
-	public void stopPeriodicCheckpointScheduler() {
+	public void stopCheckpointScheduler() {
 		synchronized (lock) {
-			if (periodicScheduler != null) {
-				periodicScheduler.cancel();
-				periodicScheduler = null;
+			triggerRequestQueued = false;
+			periodicScheduling = false;
+			
+			if (currentPeriodicTrigger != null) {
+				currentPeriodicTrigger.cancel();
+				currentPeriodicTrigger = null;
+			}
+			
+			for (PendingCheckpoint p : pendingCheckpoints.values()) {
+				p.discard(userClassLoader);
 			}
+			pendingCheckpoints.clear();
+			
+			numUnsuccessfulCheckpointsTriggers = 0;
 		}
 	}
 	
-	public ActorGateway createJobStatusListener(
-			ActorSystem actorSystem,
-			long checkpointInterval,
-			UUID leaderSessionID) {
+	// ------------------------------------------------------------------------
+	//  job status listener that schedules / cancels periodic checkpoints 
+	// ------------------------------------------------------------------------
+	
+	public ActorGateway createActivatorDeactivator(ActorSystem actorSystem, UUID leaderSessionID) {
+		
 		synchronized (lock) {
 			if (shutdown) {
 				throw new IllegalArgumentException("Checkpoint coordinator is shut down");
 			}
 
 			if (jobStatusListener == null) {
-				Props props = Props.create(
-						CheckpointCoordinatorDeActivator.class,
-						this,
-						checkpointInterval,
-						leaderSessionID);
+				Props props = Props.create(CheckpointCoordinatorDeActivator.class, this, leaderSessionID);
 
 				// wrap the ActorRef in a AkkaActorGateway to support message decoration
-				jobStatusListener = new AkkaActorGateway(
-						actorSystem.actorOf(props),
-						leaderSessionID);
+				jobStatusListener = new AkkaActorGateway(actorSystem.actorOf(props), leaderSessionID);
 			}
 
 			return jobStatusListener;
 		}
 	}
+	
+	// ------------------------------------------------------------------------
+	
+	private class ScheduledTrigger extends TimerTask {
+		
+		@Override
+		public void run() {
+			try {
+				triggerCheckpoint(System.currentTimeMillis());
+			}
+			catch (Exception e) {
+				LOG.error("Exception while triggering checkpoint", e);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
index 7e32b72..8bdab7f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorDeActivator.java
@@ -32,19 +32,15 @@ import java.util.UUID;
 public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
 
 	private final CheckpointCoordinator coordinator;
-	private final long interval;
 	private final UUID leaderSessionID;
 	
 	public CheckpointCoordinatorDeActivator(
 			CheckpointCoordinator coordinator,
-			long interval,
 			UUID leaderSessionID) {
 
 		LOG.info("Create CheckpointCoordinatorDeActivator");
 
 		this.coordinator = Preconditions.checkNotNull(coordinator, "The checkpointCoordinator must not be null.");
-
-		this.interval = interval;
 		this.leaderSessionID = leaderSessionID;
 	}
 
@@ -55,11 +51,10 @@ public class CheckpointCoordinatorDeActivator extends FlinkUntypedActor {
 			
 			if (status == JobStatus.RUNNING) {
 				// start the checkpoint scheduler
-				coordinator.startPeriodicCheckpointScheduler(interval);
-			}
-			else {
+				coordinator.startCheckpointScheduler();
+			} else {
 				// anything else should stop the trigger for now
-				coordinator.stopPeriodicCheckpointScheduler();
+				coordinator.stopCheckpointScheduler();
 			}
 		}
 		

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 19c65d4..b94e5bb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -117,7 +117,7 @@ public class PendingCheckpoint {
 			if (notYetAcknowledgedTasks.isEmpty()) {
 				CompletedCheckpoint completed =  new CompletedCheckpoint(jobId, checkpointId,
 						checkpointTimestamp, new ArrayList<StateForTask>(collectedStates));
-				discard(null, false);
+				dispose(null, false);
 				
 				return completed;
 			}
@@ -150,11 +150,15 @@ public class PendingCheckpoint {
 	/**
 	 * Discards the pending checkpoint, releasing all held resources.
 	 */
-	public void discard(ClassLoader userClassLoader, boolean discardStateHandle) {
+	public void discard(ClassLoader userClassLoader) {
+		dispose(userClassLoader, true);
+	}
+
+	private void dispose(ClassLoader userClassLoader, boolean releaseState) {
 		synchronized (lock) {
 			discarded = true;
 			numAcknowledgedTasks = -1;
-			if (discardStateHandle) {
+			if (releaseState) {
 				for (StateForTask state : collectedStates) {
 					state.discard(userClassLoader);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index d10aac1..9218fe4 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -182,9 +182,6 @@ public class ExecutionGraph implements Serializable {
 	 * from results than need to be materialized. */
 	private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
 
-	/** Flag that indicate whether the executed dataflow should be periodically snapshotted */
-	private boolean snapshotCheckpointsEnabled;
-
 	/** Flag to indicate whether the Graph has been archived */
 	private boolean isArchived = false;
 		
@@ -341,9 +338,12 @@ public class ExecutionGraph implements Serializable {
 	public boolean isArchived() {
 		return isArchived;
 	}
+	
 	public void enableSnapshotCheckpointing(
 			long interval,
 			long checkpointTimeout,
+			long minPauseBetweenCheckpoints,
+			int maxConcurrentCheckpoints,
 			List<ExecutionJobVertex> verticesToTrigger,
 			List<ExecutionJobVertex> verticesToWaitFor,
 			List<ExecutionJobVertex> verticesToCommitTo,
@@ -368,11 +368,13 @@ public class ExecutionGraph implements Serializable {
 		// disable to make sure existing checkpoint coordinators are cleared
 		disableSnaphotCheckpointing();
 		
-		// create the coordinator that triggers and commits checkpoints and holds the state 
-		snapshotCheckpointsEnabled = true;
+		// create the coordinator that triggers and commits checkpoints and holds the state
 		checkpointCoordinator = new CheckpointCoordinator(
 				jobID,
+				interval,
 				checkpointTimeout,
+				minPauseBetweenCheckpoints,
+				maxConcurrentCheckpoints,
 				tasksToTrigger,
 				tasksToWaitFor,
 				tasksToCommitTo,
@@ -384,10 +386,7 @@ public class ExecutionGraph implements Serializable {
 		// the periodic checkpoint scheduler is activated and deactivated as a result of
 		// job status changes (running -> on, all other states -> off)
 		registerJobStatusListener(
-				checkpointCoordinator.createJobStatusListener(
-						actorSystem,
-						interval,
-						leaderSessionID));
+				checkpointCoordinator.createActivatorDeactivator(actorSystem, leaderSessionID));
 	}
 
 	/**
@@ -401,16 +400,11 @@ public class ExecutionGraph implements Serializable {
 			throw new IllegalStateException("Job must be in CREATED state");
 		}
 		
-		snapshotCheckpointsEnabled = false;
 		if (checkpointCoordinator != null) {
 			checkpointCoordinator.shutdown();
 			checkpointCoordinator = null;
 		}
 	}
-	
-	public boolean isSnapshotCheckpointsEnabled() {
-		return snapshotCheckpointsEnabled;
-	}
 
 	public CheckpointCoordinator getCheckpointCoordinator() {
 		return checkpointCoordinator;

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
index 86c9b60..d58be52 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -22,17 +22,17 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import java.util.List;
 
+import static java.util.Objects.requireNonNull;
+
 /**
- * The JobSnapshottingSettings are attached to a JobGraph and describe the settings
- * for the asynchronous snapshotting of the JobGraph, such as interval, and which vertices
+ * The JobCheckpointingSettings are attached to a JobGraph and describe the settings
+ * for the asynchronous checkpoints of the JobGraph, such as interval, and which vertices
  * need to participate.
  */
 public class JobSnapshottingSettings implements java.io.Serializable{
 	
 	private static final long serialVersionUID = -2593319571078198180L;
 
-	/** The default time in which pending checkpoints need to be acknowledged before timing out */
-	public static final long DEFAULT_SNAPSHOT_TIMEOUT = 10 * 60 * 1000; // 10 minutes
 	
 	private final List<JobVertexID> verticesToTrigger;
 
@@ -43,26 +43,32 @@ public class JobSnapshottingSettings implements java.io.Serializable{
 	private final long checkpointInterval;
 	
 	private final long checkpointTimeout;
-
-
-	public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
-									List<JobVertexID> verticesToAcknowledge,
-									List<JobVertexID> verticesToConfirm,
-									long checkpointInterval)
-	{
-		this(verticesToTrigger, verticesToAcknowledge, verticesToConfirm, checkpointInterval, DEFAULT_SNAPSHOT_TIMEOUT);
-	}
+	
+	private final long minPauseBetweenCheckpoints;
+	
+	private final int maxConcurrentCheckpoints;
+	
 	
 	public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
 									List<JobVertexID> verticesToAcknowledge,
 									List<JobVertexID> verticesToConfirm,
-									long checkpointInterval, long checkpointTimeout)
+									long checkpointInterval, long checkpointTimeout,
+									long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints)
 	{
-		this.verticesToTrigger = verticesToTrigger;
-		this.verticesToAcknowledge = verticesToAcknowledge;
-		this.verticesToConfirm = verticesToConfirm;
+		// sanity checks
+		if (checkpointInterval < 1 || checkpointTimeout < 1 ||
+				minPauseBetweenCheckpoints < 0 || maxConcurrentCheckpoints < 1)
+		{
+			throw new IllegalArgumentException();
+		}
+		
+		this.verticesToTrigger = requireNonNull(verticesToTrigger);
+		this.verticesToAcknowledge = requireNonNull(verticesToAcknowledge);
+		this.verticesToConfirm = requireNonNull(verticesToConfirm);
 		this.checkpointInterval = checkpointInterval;
 		this.checkpointTimeout = checkpointTimeout;
+		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
 	}
 	
 	// --------------------------------------------------------------------------------------------
@@ -87,11 +93,22 @@ public class JobSnapshottingSettings implements java.io.Serializable{
 		return checkpointTimeout;
 	}
 
+	public long getMinPauseBetweenCheckpoints() {
+		return minPauseBetweenCheckpoints;
+	}
+
+	public int getMaxConcurrentCheckpoints() {
+		return maxConcurrentCheckpoints;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	
 	@Override
 	public String toString() {
-		return String.format("SnapshotSettings: interval=%d, timeout=%d, trigger=%s, ack=%s, commit=%s",
-				checkpointInterval, checkpointTimeout, verticesToTrigger, verticesToAcknowledge, verticesToConfirm);
+		return String.format("SnapshotSettings: interval=%d, timeout=%d, pause-between=%d, " +
+						"maxConcurrent=%d, trigger=%s, ack=%s, commit=%s",
+						checkpointInterval, checkpointTimeout,
+						minPauseBetweenCheckpoints, maxConcurrentCheckpoints,
+						verticesToTrigger, verticesToAcknowledge, verticesToConfirm);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index c4d0fbb..8cbb13a 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -913,6 +913,8 @@ class JobManager(
           executionGraph.enableSnapshotCheckpointing(
             snapshotSettings.getCheckpointInterval,
             snapshotSettings.getCheckpointTimeout,
+            snapshotSettings.getMinPauseBetweenCheckpoints,
+            snapshotSettings.getMaxConcurrentCheckpoints,
             triggerVertices,
             ackVertices,
             confirmVertices,

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index f6ee5c5..cd52fd4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -28,10 +28,17 @@ import org.apache.flink.runtime.jobmanager.RecoveryMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
+
 import org.junit.Test;
 
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.Serializable;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -39,6 +46,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -70,7 +78,7 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 600000,
+					jid, 600000, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] {}, cl,
@@ -116,7 +124,7 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 600000,
+					jid, 600000, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] {}, cl,
@@ -160,7 +168,7 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 600000,
+					jid, 600000, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] {}, cl, new StandaloneCheckpointIDCounter(), new
@@ -199,7 +207,7 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 600000,
+					jid, 600000, 600000,
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 },
 					new ExecutionVertex[] { vertex1, vertex2 }, cl,
@@ -343,7 +351,7 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 600000,
+					jid, 600000, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
 					new ExecutionVertex[] { commitVertex }, cl,
@@ -472,7 +480,7 @@ public class CheckpointCoordinatorTest {
 
 			// set up the coordinator and validate the initial state
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 600000,
+					jid, 600000, 600000,
 					new ExecutionVertex[] { triggerVertex1, triggerVertex2 },
 					new ExecutionVertex[] { ackVertex1, ackVertex2, ackVertex3 },
 					new ExecutionVertex[] { commitVertex }, cl,
@@ -587,7 +595,7 @@ public class CheckpointCoordinatorTest {
 			// the timeout for the checkpoint is a 200 milliseconds
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 200,
+					jid, 600000, 200,
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] { commitVertex }, cl,
@@ -649,7 +657,7 @@ public class CheckpointCoordinatorTest {
 			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
 
 			CheckpointCoordinator coord = new CheckpointCoordinator(
-					jid, 200000,
+					jid, 200000, 200000,
 					new ExecutionVertex[] { triggerVertex },
 					new ExecutionVertex[] { ackVertex1, ackVertex2 },
 					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
@@ -680,14 +688,343 @@ public class CheckpointCoordinatorTest {
 		}
 	}
 
+	@Test
+	public void testPeriodicTriggering() {
+		try {
+			final JobID jid = new JobID();
+			final long start = System.currentTimeMillis();
+
+			// create some mock execution vertices and trigger some checkpoint
+
+			final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
+			final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+			ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID);
+			ExecutionVertex ackVertex = mockExecutionVertex(ackAttemptID);
+			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+			final AtomicInteger numCalls = new AtomicInteger();
+			
+			doAnswer(new Answer<Void>() {
+				
+				private long lastId = -1;
+				private long lastTs = -1;
+				
+				@Override
+				public Void answer(InvocationOnMock invocation) throws Throwable {
+					TriggerCheckpoint message = (TriggerCheckpoint) invocation.getArguments()[0];
+					long id = message.getCheckpointId();
+					long ts = message.getTimestamp();
+					
+					assertTrue(id > lastId);
+					assertTrue(ts >= lastTs);
+					assertTrue(ts >= start);
+					
+					lastId = id;
+					lastTs = ts;
+					numCalls.incrementAndGet();
+					return null;
+				}
+			}).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class));
+			
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+					jid,
+					10,		// periodic interval is 10 ms
+					200000,	// timeout is very long (200 s)
+					new ExecutionVertex[] { triggerVertex },
+					new ExecutionVertex[] { ackVertex },
+					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
+					(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
+			
+			coord.startCheckpointScheduler();
+			
+			long timeout = System.currentTimeMillis() + 60000;
+			do {
+				Thread.sleep(20);
+			}
+			while (timeout > System.currentTimeMillis() && numCalls.get() < 5);
+			assertTrue(numCalls.get() >= 5);
+			
+			coord.stopCheckpointScheduler();
+			
+			
+			// for 400 ms, no further calls may come.
+			// there may be the case that one trigger was fired and about to
+			// acquire the lock, such that after cancelling it will still do
+			// the remainder of its work
+			int numCallsSoFar = numCalls.get();
+			Thread.sleep(400);
+			assertTrue(numCallsSoFar == numCalls.get() ||
+					numCallsSoFar+1 == numCalls.get());
+			
+			// start another sequence of periodic scheduling
+			numCalls.set(0);
+			coord.startCheckpointScheduler();
+
+			timeout = System.currentTimeMillis() + 60000;
+			do {
+				Thread.sleep(20);
+			}
+			while (timeout > System.currentTimeMillis() && numCalls.get() < 5);
+			assertTrue(numCalls.get() >= 5);
+			
+			coord.stopCheckpointScheduler();
+
+			// for 400 ms, no further calls may come
+			// there may be the case that one trigger was fired and about to
+			// acquire the lock, such that after cancelling it will still do
+			// the remainder of its work
+			numCallsSoFar = numCalls.get();
+			Thread.sleep(400);
+			assertTrue(numCallsSoFar == numCalls.get() ||
+					numCallsSoFar + 1 == numCalls.get());
+
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMaxConcurrentAttempts1() {
+		testMaxConcurrentAttemps(1);
+	}
+
+	@Test
+	public void testMaxConcurrentAttempts2() {
+		testMaxConcurrentAttemps(2);
+	}
+
+	@Test
+	public void testMaxConcurrentAttempts5() {
+		testMaxConcurrentAttemps(5);
+	}
+	
+	private void testMaxConcurrentAttemps(int maxConcurrentAttempts) {
+		try {
+			final JobID jid = new JobID();
+
+			// create some mock execution vertices and trigger some checkpoint
+			final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
+			final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+			ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID);
+			ExecutionVertex ackVertex = mockExecutionVertex(ackAttemptID);
+			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+			final AtomicInteger numCalls = new AtomicInteger();
+
+			doAnswer(new Answer<Void>() {
+				@Override
+				public Void answer(InvocationOnMock invocation) throws Throwable {
+					numCalls.incrementAndGet();
+					return null;
+				}
+			}).when(triggerVertex).sendMessageToCurrentExecution(any(Serializable.class), any(ExecutionAttemptID.class));
+
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+					jid,
+					10,		// periodic interval is 10 ms
+					200000,	// timeout is very long (200 s)
+					0L,		// no extra delay
+					maxConcurrentAttempts,
+					new ExecutionVertex[] { triggerVertex },
+					new ExecutionVertex[] { ackVertex },
+					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
+					(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
+
+			coord.startCheckpointScheduler();
+
+			// after a while, there should be exactly as many checkpoints
+			// as concurrently permitted
+			long now = System.currentTimeMillis();
+			long timeout = now + 60000;
+			long minDuration = now + 100;
+			do {
+				Thread.sleep(20);
+			}
+			while ((now = System.currentTimeMillis()) < minDuration ||
+					(numCalls.get() < maxConcurrentAttempts && now < timeout));
+			
+			assertEquals(maxConcurrentAttempts, numCalls.get());
+			
+			verify(triggerVertex, times(maxConcurrentAttempts))
+					.sendMessageToCurrentExecution(any(TriggerCheckpoint.class), eq(triggerAttemptID));
+			
+			// now, once we acknowledge one checkpoint, it should trigger the next one
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
+			
+			// this should have immediately triggered a new checkpoint
+			now = System.currentTimeMillis();
+			timeout = now + 60000;
+			do {
+				Thread.sleep(20);
+			}
+			while (numCalls.get() < maxConcurrentAttempts + 1 && now < timeout);
+
+			assertEquals(maxConcurrentAttempts + 1, numCalls.get());
+			
+			// no further checkpoints should happen
+			Thread.sleep(200);
+			assertEquals(maxConcurrentAttempts + 1, numCalls.get());
+			
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testMaxConcurrentAttempsWithSubsumption() {
+		try {
+			final int maxConcurrentAttempts = 2;
+			final JobID jid = new JobID();
+
+			// create some mock execution vertices and trigger some checkpoint
+			final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
+			final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+			ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID);
+			ExecutionVertex ackVertex = mockExecutionVertex(ackAttemptID);
+			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+					jid,
+					10,		// periodic interval is 10 ms
+					200000,	// timeout is very long (200 s)
+					0L,		// no extra delay
+					maxConcurrentAttempts, // max two concurrent checkpoints
+					new ExecutionVertex[] { triggerVertex },
+					new ExecutionVertex[] { ackVertex },
+					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter
+					(), new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+
+
+			coord.startCheckpointScheduler();
+
+			// after a while, there should be exactly as many checkpoints
+			// as concurrently permitted 
+			long now = System.currentTimeMillis();
+			long timeout = now + 60000;
+			long minDuration = now + 100;
+			do {
+				Thread.sleep(20);
+			}
+			while ((now = System.currentTimeMillis()) < minDuration ||
+					(coord.getNumberOfPendingCheckpoints() < maxConcurrentAttempts && now < timeout));
+			
+			// validate that the pending checkpoints are there
+			assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
+			assertNotNull(coord.getPendingCheckpoints().get(1L));
+			assertNotNull(coord.getPendingCheckpoints().get(2L));
+
+			// now we acknowledge the second checkpoint, which should subsume the first checkpoint
+			// and allow two more checkpoints to be triggered
+			// now, once we acknowledge one checkpoint, it should trigger the next one
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID, 2L));
+
+			// after a while, there should be the new checkpoints
+			final long newTimeout = System.currentTimeMillis() + 60000;
+			do {
+				Thread.sleep(20);
+			}
+			while (coord.getPendingCheckpoints().get(4L) == null && 
+					System.currentTimeMillis() < newTimeout);
+			
+			// do the final check
+			assertEquals(maxConcurrentAttempts, coord.getNumberOfPendingCheckpoints());
+			assertNotNull(coord.getPendingCheckpoints().get(3L));
+			assertNotNull(coord.getPendingCheckpoints().get(4L));
+			
+			coord.shutdown();
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testPeriodicSchedulingWithInactiveTasks() {
+		try {
+			final JobID jid = new JobID();
+
+			// create some mock execution vertices and trigger some checkpoint
+			final ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
+			final ExecutionAttemptID ackAttemptID = new ExecutionAttemptID();
+			final ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
+
+			ExecutionVertex triggerVertex = mockExecutionVertex(triggerAttemptID);
+			ExecutionVertex ackVertex = mockExecutionVertex(ackAttemptID);
+			ExecutionVertex commitVertex = mockExecutionVertex(commitAttemptID);
+
+			final AtomicReference<ExecutionState> currentState = new AtomicReference<>(ExecutionState.CREATED);
+			when(triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer(
+					new Answer<ExecutionState>() {
+						@Override
+						public ExecutionState answer(InvocationOnMock invocation){
+							return currentState.get();
+						}
+					});
+			
+			CheckpointCoordinator coord = new CheckpointCoordinator(
+					jid,
+					10,		// periodic interval is 10 ms
+					200000,	// timeout is very long (200 s)
+					0L,		// no extra delay
+					2, // max two concurrent checkpoints
+					new ExecutionVertex[] { triggerVertex },
+					new ExecutionVertex[] { ackVertex },
+					new ExecutionVertex[] { commitVertex }, cl, new StandaloneCheckpointIDCounter(),
+					new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
+			
+			coord.startCheckpointScheduler();
+
+			// no checkpoint should have started so far
+			Thread.sleep(200);
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			
+			// now move the state to RUNNING
+			currentState.set(ExecutionState.RUNNING);
+			
+			// the coordinator should start checkpointing now
+			final long timeout = System.currentTimeMillis() + 10000;
+			do {
+				Thread.sleep(20);
+			}
+			while (System.currentTimeMillis() < timeout && 
+					coord.getNumberOfPendingCheckpoints() == 0);
+			
+			assertTrue(coord.getNumberOfPendingCheckpoints() > 0);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+
 	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
 		return mockExecutionVertex(attemptID, ExecutionState.RUNNING);
 	}
 
-	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, ExecutionState state) {
+	private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, 
+														ExecutionState state, ExecutionState ... successiveStates) {
 		final Execution exec = mock(Execution.class);
 		when(exec.getAttemptId()).thenReturn(attemptID);
-		when(exec.getState()).thenReturn(state);
+		when(exec.getState()).thenReturn(state, successiveStates);
 
 		ExecutionVertex vertex = mock(ExecutionVertex.class);
 		when(vertex.getJobvertexId()).thenReturn(new JobVertexID());

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 7b2c2d4..bec04bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -80,7 +80,7 @@ public class CheckpointStateRestoreTest {
 			map.put(statelessId, stateless);
 
 
-			CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L,
+			CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L, 200000L,
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[0], cl,
@@ -151,7 +151,7 @@ public class CheckpointStateRestoreTest {
 			map.put(statelessId, stateless);
 
 
-			CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L,
+			CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L, 200000L,
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
 					new ExecutionVertex[0], cl,
@@ -193,7 +193,7 @@ public class CheckpointStateRestoreTest {
 	@Test
 	public void testNoCheckpointAvailable() {
 		try {
-			CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 200000L,
+			CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 200000L, 200000L,
 					new ExecutionVertex[] { mock(ExecutionVertex.class) },
 					new ExecutionVertex[] { mock(ExecutionVertex.class) },
 					new ExecutionVertex[0], cl,

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
index f6e4ab8..1c666e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -61,7 +61,8 @@ public class CoordinatorShutdownTest {
 			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
 			
 			JobGraph testGraph = new JobGraph("test job", vertex);
-			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
+			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 
+					5000, 60000, 0L, Integer.MAX_VALUE));
 			
 			ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 
@@ -112,7 +113,8 @@ public class CoordinatorShutdownTest {
 			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
 
 			JobGraph testGraph = new JobGraph("test job", vertex);
-			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
+			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList,
+					5000, 60000, 0L, Integer.MAX_VALUE));
 			
 			ActorGateway jmGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
new file mode 100644
index 0000000..0320d6b
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Configuration that captures all checkpointing related settings.
+ */
+public class CheckpointConfig implements java.io.Serializable {
+
+	private static final long serialVersionUID = -750378776078908147L;
+
+	/** The default checkpoint mode: exactly once */
+	public static final CheckpointingMode DEFAULT_MODE = CheckpointingMode.EXACTLY_ONCE;
+
+	/** The default timeout of a checkpoint attempt: 10 minutes */
+	public static final long DEFAULT_TIMEOUT = 10 * 60 * 1000;
+
+	/** The default minimum pause to be made between checkpoints: none */
+	public static final long DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS = 0;
+
+	/** The default limit of concurrently happening checkpoints: one */
+	public static final int DEFAULT_MAX_CONCURRENT_CHECKPOINTS = 1;
+
+	// ------------------------------------------------------------------------
+
+	/** Checkpointing mode (exactly-once vs. at-least-once). */
+	private CheckpointingMode checkpointingMode = DEFAULT_MODE;
+
+	/** Periodic checkpoint triggering interval */
+	private long checkpointInterval = -1; // disabled
+
+	/** Maximum time checkpoint may take before being discarded */
+	private long checkpointTimeout = DEFAULT_TIMEOUT;
+
+	/** Minimal pause between checkpointing attempts */
+	private long minPauseBetweenCheckpoints = DEFAULT_MIN_PAUSE_BETWEEN_CHECKPOINTS;
+
+	/** Maximum number of checkpoint attempts in progress at the same time */
+	private int maxConcurrentCheckpoints = DEFAULT_MAX_CONCURRENT_CHECKPOINTS;
+
+	/** Flag to force checkpointing in iterative jobs */
+	private boolean forceCheckpointing;
+
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Checks whether checkpointing is enabled.
+	 * 
+	 * @return True if checkpointing is enables, false otherwise.
+	 */
+	public boolean isCheckpointingEnabled() {
+		return checkpointInterval > 0;
+	}
+	
+	/**
+	 * Gets the checkpointing mode (exactly-once vs. at-least-once).
+	 * 
+	 * @return The checkpointing mode.
+	 */
+	public CheckpointingMode getCheckpointingMode() {
+		return checkpointingMode;
+	}
+
+	/**
+	 * Sets the checkpointing mode (exactly-once vs. at-least-once).
+	 * 
+	 * @param checkpointingMode The checkpointing mode.
+	 */
+	public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
+		this.checkpointingMode = requireNonNull(checkpointingMode);
+	}
+
+	/**
+	 * Gets the interval in which checkpoints are periodically scheduled.
+	 * 
+	 * <p>This setting defines the base interval. Checkpoint triggering may be delayed by the settings
+	 * {@link #getMaxConcurrentCheckpoints()} and {@link #getMinPauseBetweenCheckpoints()}.
+	 * 
+	 * @return The checkpoint interval, in milliseconds.
+	 */
+	public long getCheckpointInterval() {
+		return checkpointInterval;
+	}
+
+	/**
+	 * Sets the interval in which checkpoints are periodically scheduled.
+	 *
+	 * <p>This setting defines the base interval. Checkpoint triggering may be delayed by the settings
+	 * {@link #setMaxConcurrentCheckpoints(int)} and {@link #setMinPauseBetweenCheckpoints(long)}.
+	 *
+	 * @param checkpointInterval The checkpoint interval, in milliseconds.
+	 */
+	public void setCheckpointInterval(long checkpointInterval) {
+		if (checkpointInterval <= 0) {
+			throw new IllegalArgumentException("Checkpoint interval must be larger than zero");
+		}
+		this.checkpointInterval = checkpointInterval;
+	}
+
+	/**
+	 * Gets the maximum time that a checkpoint may take before being discarded.
+	 * 
+	 * @return The checkpoint timeout, in milliseconds.
+	 */
+	public long getCheckpointTimeout() {
+		return checkpointTimeout;
+	}
+
+	/**
+	 * Sets the maximum time that a checkpoint may take before being discarded.
+	 * 
+	 * @param checkpointTimeout The checkpoint timeout, in milliseconds.
+	 */
+	public void setCheckpointTimeout(long checkpointTimeout) {
+		if (checkpointInterval <= 0) {
+			throw new IllegalArgumentException("Checkpoint timeout must be larger than zero");
+		}
+		this.checkpointTimeout = checkpointTimeout;
+	}
+
+	/**
+	 * Gets the minimal pause between checkpointing attempts. This setting defines how soon the
+	 * checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger
+	 * another checkpoint with respect to the maximum number of concurrent checkpoints
+	 * (see {@link #getMaxConcurrentCheckpoints()}).
+	 *
+	 * @return The minimal pause before the next checkpoint is triggered.
+	 */
+	public long getMinPauseBetweenCheckpoints() {
+		return minPauseBetweenCheckpoints;
+	}
+
+//	/**
+//	 * Sets the minimal pause between checkpointing attempts. This setting defines how soon the
+//	 * checkpoint coordinator may trigger another checkpoint after it becomes possible to trigger
+//	 * another checkpoint with respect to the maximum number of concurrent checkpoints
+//	 * (see {@link #setMaxConcurrentCheckpoints(int)}).
+//	 * 
+//	 * <p>If the maximum number of concurrent checkpoints is set to one, this setting makes effectively sure
+//	 * that a minimum amount of time passes where no checkpoint is in progress at all.
+//	 * 
+//	 * @param minPauseBetweenCheckpoints The minimal pause before the next checkpoint is triggered.
+//	 */
+//	public void setMinPauseBetweenCheckpoints(long minPauseBetweenCheckpoints) {
+//		if (minPauseBetweenCheckpoints < 0) {
+//			throw new IllegalArgumentException("Pause value must be zero or positive");
+//		}
+//		this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+//	}
+
+	/**
+	 * Gets the maximum number of checkpoint attempts that may be in progress at the same time. If this
+	 * value is <i>n</i>, then no checkpoints will be triggered while <i>n</i> checkpoint attempts are
+	 * currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need
+	 * to finish or expire.
+	 * 
+	 * @return The maximum number of concurrent checkpoint attempts.
+	 */
+	public int getMaxConcurrentCheckpoints() {
+		return maxConcurrentCheckpoints;
+	}
+
+	/**
+	 * Sets the maximum number of checkpoint attempts that may be in progress at the same time. If this
+	 * value is <i>n</i>, then no checkpoints will be triggered while <i>n</i> checkpoint attempts are
+	 * currently in flight. For the next checkpoint to be triggered, one checkpoint attempt would need
+	 * to finish or expire.
+	 * 
+	 * @param maxConcurrentCheckpoints The maximum number of concurrent checkpoint attempts.
+	 */
+	public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
+		if (maxConcurrentCheckpoints < 1) {
+			throw new IllegalArgumentException("The maximum number of concurrent attempts must be at least one.");
+		}
+		this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+	}
+
+	/**
+	 * Checks whether checkpointing is forced, despite currently non-checkpointable iteration feedback.
+	 * 
+	 * @return True, if checkpointing is forced, false otherwise.
+	 * 
+	 * @deprecated This will be removed once iterations properly participate in checkpointing.
+	 */
+	@Deprecated
+	public boolean isForceCheckpointing() {
+		return forceCheckpointing;
+	}
+
+	/**
+	 * Checks whether checkpointing is forced, despite currently non-checkpointable iteration feedback.
+	 * 
+	 * @param forceCheckpointing The flag to force checkpointing. 
+	 * 
+	 * @deprecated This will be removed once iterations properly participate in checkpointing.
+	 */
+	@Deprecated
+	public void setForceCheckpointing(boolean forceCheckpointing) {
+		this.forceCheckpointing = forceCheckpointing;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 72722bf..cb5fce5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -114,17 +114,14 @@ public abstract class StreamExecutionEnvironment {
 	/** The execution configuration for this environment */
 	private final ExecutionConfig config = new ExecutionConfig();
 	
+	/** Settings that control the checkpointing behavior */ 
+	private final CheckpointConfig checkpointCfg = new CheckpointConfig();
+	
 	protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
 	
 	private long bufferTimeout = DEFAULT_NETWORK_BUFFER_TIMEOUT;
 	
 	protected boolean isChainingEnabled = true;
-
-	protected long checkpointInterval = -1; // disabled
-
-	protected CheckpointingMode checkpointingMode;
-
-	protected boolean forceCheckpointing = false;
 	
 	/** The state backend used for storing k/v state and state snapshots */
 	private StateBackend<?> defaultStateBackend;
@@ -239,7 +236,17 @@ public abstract class StreamExecutionEnvironment {
 	// ------------------------------------------------------------------------
 	//  Checkpointing Settings
 	// ------------------------------------------------------------------------
-	
+
+	/**
+	 * Gets the checkpoint config, which defines values like checkpoint interval, delay between
+	 * checkpoints, etc.
+	 * 
+	 * @return The checkpoint config.
+	 */
+	public CheckpointConfig getCheckpointConfig() {
+		return checkpointCfg;
+	}
+
 	/**
 	 * Enables checkpointing for the streaming job. The distributed state of the streaming
 	 * dataflow will be periodically snapshotted. In case of a failure, the streaming
@@ -257,7 +264,8 @@ public abstract class StreamExecutionEnvironment {
 	 * @param interval Time interval between state checkpoints in milliseconds.
 	 */
 	public StreamExecutionEnvironment enableCheckpointing(long interval) {
-		return enableCheckpointing(interval, CheckpointingMode.EXACTLY_ONCE);
+		checkpointCfg.setCheckpointInterval(interval);
+		return this;
 	}
 
 	/**
@@ -280,15 +288,8 @@ public abstract class StreamExecutionEnvironment {
 	 *             The checkpointing mode, selecting between "exactly once" and "at least once" guaranteed.
 	 */
 	public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode) {
-		if (mode == null) {
-			throw new NullPointerException("checkpoint mode must not be null");
-		}
-		if (interval <= 0) {
-			throw new IllegalArgumentException("the checkpoint interval must be positive");
-		}
-
-		this.checkpointInterval = interval;
-		this.checkpointingMode = mode;
+		checkpointCfg.setCheckpointingMode(mode);
+		checkpointCfg.setCheckpointInterval(interval);
 		return this;
 	}
 	
@@ -312,10 +313,11 @@ public abstract class StreamExecutionEnvironment {
 	 *            If true checkpointing will be enabled for iterative jobs as well.
 	 */
 	@Deprecated
+	@SuppressWarnings("deprecation")
 	public StreamExecutionEnvironment enableCheckpointing(long interval, CheckpointingMode mode, boolean force) {
-		this.enableCheckpointing(interval, mode);
-
-		this.forceCheckpointing = force;
+		checkpointCfg.setCheckpointingMode(mode);
+		checkpointCfg.setCheckpointInterval(interval);
+		checkpointCfg.setForceCheckpointing(force);
 		return this;
 	}
 
@@ -337,32 +339,39 @@ public abstract class StreamExecutionEnvironment {
 	 */
 	@Deprecated
 	public StreamExecutionEnvironment enableCheckpointing() {
-		enableCheckpointing(500, CheckpointingMode.EXACTLY_ONCE);
+		checkpointCfg.setCheckpointInterval(500);
 		return this;
 	}
 
 	/**
 	 * Returns the checkpointing interval or -1 if checkpointing is disabled.
+	 * 
+	 * <p>Shorthand for {@code getCheckpointConfig().getCheckpointInterval()}.
 	 *
 	 * @return The checkpointing interval or -1
 	 */
 	public long getCheckpointInterval() {
-		return checkpointInterval;
+		return checkpointCfg.getCheckpointInterval();
 	}
 
-
 	/**
 	 * Returns whether checkpointing is force-enabled.
 	 */
+	@Deprecated
+	@SuppressWarnings("deprecation")
 	public boolean isForceCheckpointing() {
-		return forceCheckpointing;
+		return checkpointCfg.isForceCheckpointing();
 	}
 
 	/**
-	 * Returns the {@link CheckpointingMode}.
+	 * Returns the checkpointing mode (exactly-once vs. at-least-once).
+	 * 
+	 * <p>Shorthand for {@code getCheckpointConfig().getCheckpointingMode()}.
+	 * 
+	 * @return The checkpoin
 	 */
 	public CheckpointingMode getCheckpointingMode() {
-		return checkpointingMode;
+		return checkpointCfg.getCheckpointingMode();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 7d8f9f9..bc3acb7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -23,15 +23,12 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -43,8 +40,8 @@ import org.apache.flink.api.java.typeutils.MissingTypeInfo;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -59,7 +56,6 @@ import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
-import org.apache.sling.commons.json.JSONException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -70,9 +66,6 @@ import org.slf4j.LoggerFactory;
  * 
  */
 public class StreamGraph extends StreamingPlan {
-
-	/** The default interval for checkpoints, in milliseconds */
-	public static final int DEFAULT_CHECKPOINTING_INTERVAL_MS = 5000;
 	
 	private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
 
@@ -80,11 +73,9 @@ public class StreamGraph extends StreamingPlan {
 
 	private final StreamExecutionEnvironment environemnt;
 	private final ExecutionConfig executionConfig;
-
-	private CheckpointingMode checkpointingMode;
-	private boolean checkpointingEnabled = false;
-	private long checkpointingInterval = DEFAULT_CHECKPOINTING_INTERVAL_MS;
-	private boolean chaining = true;
+	private final CheckpointConfig checkpointConfig;
+	
+	private boolean chaining;
 
 	private Map<Integer, StreamNode> streamNodes;
 	private Set<Integer> sources;
@@ -97,12 +88,11 @@ public class StreamGraph extends StreamingPlan {
 	private StateBackend<?> stateBackend;
 	private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
 
-	private boolean forceCheckpoint = false;
 
 	public StreamGraph(StreamExecutionEnvironment environment) {
-
 		this.environemnt = environment;
-		executionConfig = environment.getConfig();
+		this.executionConfig = environment.getConfig();
+		this.checkpointConfig = environment.getCheckpointConfig();
 
 		// create an empty new stream graph.
 		clear();
@@ -112,19 +102,23 @@ public class StreamGraph extends StreamingPlan {
 	 * Remove all registered nodes etc.
 	 */
 	public void clear() {
-		streamNodes = Maps.newHashMap();
-		virtualSelectNodes = Maps.newHashMap();
-		virtuaPartitionNodes = Maps.newHashMap();
-		vertexIDtoBrokerID = Maps.newHashMap();
-		vertexIDtoLoopTimeout = Maps.newHashMap();
-		iterationSourceSinkPairs = Sets.newHashSet();
-		sources = Sets.newHashSet();
-		sinks = Sets.newHashSet();
+		streamNodes = new HashMap<>();
+		virtualSelectNodes = new HashMap<>();
+		virtuaPartitionNodes = new HashMap<>();
+		vertexIDtoBrokerID = new HashMap<>();
+		vertexIDtoLoopTimeout  = new HashMap<>();
+		iterationSourceSinkPairs = new HashSet<>();
+		sources = new HashSet<>();
+		sinks = new HashSet<>();
 	}
 
-	protected ExecutionConfig getExecutionConfig() {
+	public ExecutionConfig getExecutionConfig() {
 		return executionConfig;
 	}
+	
+	public CheckpointConfig getCheckpointConfig() {
+		return checkpointConfig;
+	}
 
 	public String getJobName() {
 		return jobName;
@@ -138,18 +132,6 @@ public class StreamGraph extends StreamingPlan {
 		this.chaining = chaining;
 	}
 
-	public void setCheckpointingEnabled(boolean checkpointingEnabled) {
-		this.checkpointingEnabled = checkpointingEnabled;
-	}
-
-	public void setCheckpointingInterval(long checkpointingInterval) {
-		this.checkpointingInterval = checkpointingInterval;
-	}
-
-	public void forceCheckpoint() {
-		this.forceCheckpoint = true;
-	}
-
 	public void setStateBackend(StateBackend<?> backend) {
 		this.stateBackend = backend;
 	}
@@ -158,27 +140,11 @@ public class StreamGraph extends StreamingPlan {
 		return this.stateBackend;
 	}
 
-	public long getCheckpointingInterval() {
-		return checkpointingInterval;
-	}
-
 	// Checkpointing
 	
 	public boolean isChainingEnabled() {
 		return chaining;
 	}
-
-	public boolean isCheckpointingEnabled() {
-		return checkpointingEnabled;
-	}
-
-	public CheckpointingMode getCheckpointingMode() {
-		return checkpointingMode;
-	}
-
-	public void setCheckpointingMode(CheckpointingMode checkpointingMode) {
-		this.checkpointingMode = checkpointingMode;
-	}
 	
 
 	public boolean isIterative() {
@@ -322,7 +288,7 @@ public class StreamGraph extends StreamingPlan {
 				downStreamVertexID,
 				typeNumber,
 				null,
-				Lists.<String>newArrayList());
+				new ArrayList<String>());
 
 	}
 
@@ -463,10 +429,7 @@ public class StreamGraph extends StreamingPlan {
 	}
 
 	public StreamEdge getStreamEdge(int sourceId, int targetId) {
-		Iterator<StreamEdge> outIterator = getStreamNode(sourceId).getOutEdges().iterator();
-		while (outIterator.hasNext()) {
-			StreamEdge edge = outIterator.next();
-
+		for (StreamEdge edge : getStreamNode(sourceId).getOutEdges()) {
 			if (edge.getTargetId() == targetId) {
 				return edge;
 			}
@@ -505,8 +468,7 @@ public class StreamGraph extends StreamingPlan {
 		return vertexIDtoLoopTimeout.get(vertexID);
 	}
 
-	public  Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) {
-
+	public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) {
 		StreamNode source = this.addNode(sourceId,
 				StreamIterationHead.class,
 				null,
@@ -537,15 +499,12 @@ public class StreamGraph extends StreamingPlan {
 		return iterationSourceSinkPairs;
 	}
 
-	protected void removeEdge(StreamEdge edge) {
-
+	private void removeEdge(StreamEdge edge) {
 		edge.getSourceVertex().getOutEdges().remove(edge);
 		edge.getTargetVertex().getInEdges().remove(edge);
-
 	}
 
-	protected void removeVertex(StreamNode toRemove) {
-
+	private void removeVertex(StreamNode toRemove) {
 		Set<StreamEdge> edgesToRemove = new HashSet<StreamEdge>();
 
 		edgesToRemove.addAll(toRemove.getInEdges());
@@ -560,9 +519,10 @@ public class StreamGraph extends StreamingPlan {
 	/**
 	 * Gets the assembled {@link JobGraph}.
 	 */
+	@SuppressWarnings("deprecation")
 	public JobGraph getJobGraph() {
 		// temporarily forbid checkpointing for iterative jobs
-		if (isIterative() && isCheckpointingEnabled() && !forceCheckpoint) {
+		if (isIterative() && checkpointConfig.isCheckpointingEnabled() && !checkpointConfig.isForceCheckpointing()) {
 			throw new UnsupportedOperationException(
 					"Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. "
 							+ "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. "
@@ -576,16 +536,12 @@ public class StreamGraph extends StreamingPlan {
 
 	@Override
 	public String getStreamingPlanAsJSON() {
-
 		try {
 			return new JSONGenerator(this).getJSON();
-		} catch (JSONException e) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("JSON plan creation failed: {}", e);
-			}
-			return "";
 		}
-
+		catch (Exception e) {
+			throw new RuntimeException("JSON plan creation failed", e);
+		}
 	}
 
 	@Override
@@ -606,5 +562,4 @@ public class StreamGraph extends StreamingPlan {
 	public static enum ResourceStrategy {
 		DEFAULT, ISOLATE, NEWGROUP
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index 8bd0e48..4bd7a73 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -98,17 +98,6 @@ public class StreamGraphGenerator {
 	private StreamGraphGenerator(StreamExecutionEnvironment env) {
 		this.streamGraph = new StreamGraph(env);
 		this.streamGraph.setChaining(env.isChainingEnabled());
-		
-		if (env.getCheckpointInterval() > 0) {
-			this.streamGraph.setCheckpointingEnabled(true);
-			this.streamGraph.setCheckpointingInterval(env.getCheckpointInterval());
-			this.streamGraph.setCheckpointingMode(env.getCheckpointingMode());
-		}
-		this.streamGraph.setStateBackend(env.getStateBackend());
-		if (env.isForceCheckpointing()) {
-			this.streamGraph.forceCheckpoint();
-		}
-		
 		this.env = env;
 		this.alreadyTransformed = new HashMap<>();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 613d381..515e362 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.graph;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -27,10 +26,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
-import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang3.StringUtils;
+
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -45,6 +44,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
@@ -52,6 +52,7 @@ import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
 import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -274,18 +275,20 @@ public class StreamingJobGraphGenerator {
 		config.setNonChainedOutputs(nonChainableOutputs);
 		config.setChainedOutputs(chainableOutputs);
 
-		config.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
-		if (streamGraph.isCheckpointingEnabled()) {
-			config.setCheckpointMode(streamGraph.getCheckpointingMode());
+		final CheckpointConfig ceckpointCfg = streamGraph.getCheckpointConfig();
+		
+		config.setCheckpointingEnabled(ceckpointCfg.isCheckpointingEnabled());
+		if (ceckpointCfg.isCheckpointingEnabled()) {
+			config.setCheckpointMode(ceckpointCfg.getCheckpointingMode());
 			config.setStateBackend(streamGraph.getStateBackend());
-		} else {
-			// the at least once input handler is slightly cheaper (in the absence of checkpoints),
+		}
+		else {
+			// the "at-least-once" input handler is slightly cheaper (in the absence of checkpoints),
 			// so we use that one if checkpointing is not enabled
 			config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
 		}
-		config.setStatePartitioner((KeySelector<?, Serializable>) vertex.getStatePartitioner());
+		config.setStatePartitioner(vertex.getStatePartitioner());
 		config.setStateKeySerializer(vertex.getStateKeySerializer());
-
 		
 		Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
 
@@ -385,8 +388,10 @@ public class StreamingJobGraphGenerator {
 	}
 	
 	private void configureCheckpointing() {
-		if (streamGraph.isCheckpointingEnabled()) {
-			long interval = streamGraph.getCheckpointingInterval();
+		CheckpointConfig cfg = streamGraph.getCheckpointConfig();
+		
+		if (cfg.isCheckpointingEnabled()) {
+			long interval = cfg.getCheckpointInterval();
 			if (interval < 1) {
 				throw new IllegalArgumentException("The checkpoint interval must be positive");
 			}
@@ -400,10 +405,9 @@ public class StreamingJobGraphGenerator {
 			List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size());
 
 			// collect the vertices that receive "commit checkpoint" messages
-			// currently, these are all certices
+			// currently, these are all vertices
 			List<JobVertexID> commitVertices = new ArrayList<JobVertexID>();
 			
-			
 			for (JobVertex vertex : jobVertices.values()) {
 				if (vertex.isInputVertex()) {
 					triggerVertices.add(vertex.getID());
@@ -414,7 +418,9 @@ public class StreamingJobGraphGenerator {
 			}
 
 			JobSnapshottingSettings settings = new JobSnapshottingSettings(
-					triggerVertices, ackVertices, commitVertices, interval);
+					triggerVertices, ackVertices, commitVertices, interval,
+					cfg.getCheckpointTimeout(), cfg.getMinPauseBetweenCheckpoints(),
+					cfg.getMaxConcurrentCheckpoints());
 			jobGraph.setSnapshotSettings(settings);
 
 			// if the user enabled checkpointing, the default number of exec retries is infinitive.

http://git-wip-us.apache.org/repos/asf/flink/blob/55fd5f32/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 14f23e1..69147f6 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -96,6 +96,13 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   // ------------------------------------------------------------------------
   //  Checkpointing Settings
   // ------------------------------------------------------------------------
+  
+  /**
+   * Gets the checkpoint config, which defines values like checkpoint interval, delay between
+   * checkpoints, etc.
+   */
+  def getCheckpointConfig = javaEnv.getCheckpointConfig()
+  
   /**
    * Enables checkpointing for the streaming job. The distributed state of the streaming
    * dataflow will be periodically snapshotted. In case of a failure, the streaming


Mime
View raw message