flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [01/10] flink git commit: [FLINK-1953] [runtime] Integrate new snapshot checkpoint coordinator with jobgraph and execution graph
Date Tue, 12 May 2015 21:03:12 GMT
Repository: flink
Updated Branches:
  refs/heads/master ff750e61a -> e79ff4ebf


http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 1321336..5ab0150 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -35,6 +35,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.messages.accumulators.ReportAccumulatorResult;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
 
 import java.io.IOException;
 import java.util.Map;
@@ -224,7 +227,25 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
-	public ActorRef getJobManager() {
-		return jobManagerActor;
+	public void acknowledgeCheckpoint(long checkpointId) {
+		acknowledgeCheckpoint(checkpointId, null);
+	}
+
+	@Override
+	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
+		// try and create a serialized version of the state handle
+		SerializedValue<StateHandle<?>> serializedState;
+		if (state == null) {
+			serializedState = null;
+		} else {
+			try {
+				serializedState = new SerializedValue<StateHandle<?>>(state);
+			} catch (Exception e) {
+				throw new RuntimeException("Failed to serialize state handle during checkpoint confirmation", e);
+			}
+		}
+		
+		AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(jobId, executionId, checkpointId, serializedState);
+		jobManagerActor.tell(message, ActorRef.noSender());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index f12344b..1578e4b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -46,6 +46,8 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.messages.TaskMessages;
@@ -53,6 +55,8 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState;
 import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.state.StateHandle;
 
+import org.apache.flink.runtime.state.StateUtils;
+import org.apache.flink.runtime.util.SerializedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -139,9 +143,6 @@ public class Task implements Runnable {
 	/** The name of the class that holds the invokable code */
 	private final String nameOfInvokableClass;
 
-	/** The handle to the state that the operator was initialized with */
-	private final StateHandle operatorState;
-
 	/** The memory manager to be used by this task */
 	private final MemoryManager memoryManager;
 
@@ -171,10 +172,13 @@ public class Task implements Runnable {
 	/** The timeout for all ask operations on actors */
 	private final Timeout actorAskTimeout;
 
+	/** The library cache, from which the task can request its required JAR files */
 	private final LibraryCacheManager libraryCache;
 	
+	/** The cache for user-defined files that the invokable requires */
 	private final FileCache fileCache;
 	
+	/** The gateway to the network stack, which handles inputs and produced results */
 	private final NetworkEnvironment network;
 
 	/** The thread that executes the task */
@@ -182,10 +186,12 @@ public class Task implements Runnable {
 	
 
 	// ------------------------------------------------------------------------
-	//  Fields that control the task execution
+	//  Fields that control the task execution. All these fields are volatile
+	//  (which means that they introduce memory barriers), to establish
+	//  proper happens-before semantics on parallel modification
 	// ------------------------------------------------------------------------
 
-	private final AtomicBoolean invokableHasBeenCanceled = new AtomicBoolean(false);
+	private final AtomicBoolean invokableHasBeenCanceled;
 	
 	/** The invokable of this task, if initialized */
 	private volatile AbstractInvokable invokable;
@@ -196,6 +202,10 @@ public class Task implements Runnable {
 	/** The observed exception, in case the task execution failed */
 	private volatile Throwable failureCause;
 
+	/** The handle to the state that the operator was initialized with. Will be set to null after the
+	 * initialization, to be memory friendly */
+	private volatile SerializedValue<StateHandle<?>> operatorState;
+
 	
 	/**
 	 * <p><b>IMPORTANT:</b> This constructor may not start any work that would need to 
@@ -227,7 +237,7 @@ public class Task implements Runnable {
 		this.taskConfiguration = checkNotNull(tdd.getTaskConfiguration());
 		this.requiredJarFiles = checkNotNull(tdd.getRequiredJarFiles());
 		this.nameOfInvokableClass = checkNotNull(tdd.getInvokableClassName());
-		this.operatorState = tdd.getOperatorStates();
+		this.operatorState = tdd.getOperatorState();
 
 		this.memoryManager = checkNotNull(memManager);
 		this.ioManager = checkNotNull(ioManager);
@@ -286,7 +296,9 @@ public class Task implements Runnable {
 		}
 		
 		// finally, create the executing thread, but do not start it
-		executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask); 
+		executingThread = new Thread(TASK_THREADS_GROUP, this, taskNameWithSubtask);
+		
+		invokableHasBeenCanceled = new AtomicBoolean(false);
 	}
 
 	// ------------------------------------------------------------------------
@@ -423,9 +435,7 @@ public class Task implements Runnable {
 
 		// all resource acquisitions and registrations from here on
 		// need to be undone in the end
-
 		Map<String, Future<Path>> distributedCacheEntries = new HashMap<String, Future<Path>>();
-
 		AbstractInvokable invokable = null;
 
 		try {
@@ -500,15 +510,32 @@ public class Task implements Runnable {
 			// the very last thing before the actual execution starts running is to inject
 			// the state into the task. the state is non-empty if this is an execution
 			// of a task that failed but had backuped state from a checkpoint
+
+			// get our private reference onto the stack (be safe against concurrent changes) 
+			SerializedValue<StateHandle<?>> operatorState = this.operatorState;
+			
 			if (operatorState != null) {
 				if (invokable instanceof OperatorStateCarrier) {
-					((OperatorStateCarrier) invokable).injectState(operatorState);
+					try {
+						StateHandle<?> state = operatorState.deserializeValue(userCodeClassLoader);
+						OperatorStateCarrier<?> op = (OperatorStateCarrier<?>) invokable;
+						StateUtils.setOperatorState(op, state);
+					}
+					catch (Exception e) {
+						throw new Exception("Failed to deserialize state handle and setup initial operator state");
+					}
 				}
 				else {
 					throw new IllegalStateException("Found operator state for a non-stateful task invokable");
 				}
 			}
 
+			// be memory and GC friendly - since the code stays in invoke() for a potentially long time,
+			// we clear the reference to the state handle
+			//noinspection UnusedAssignment
+			operatorState = null;
+			this.operatorState = null;
+
 			// ----------------------------------------------------------------
 			//  actual task core work
 			// ----------------------------------------------------------------
@@ -568,7 +595,7 @@ public class Task implements Runnable {
 			// ----------------------------------------------------------------
 
 			try {
-				// transition into our final state. we should be either in RUNNING, CANCELING, or FAILED 
+				// transition into our final state. we should be either in DEPLOYING, RUNNING, CANCELING, or FAILED
 				// loop for multiple retries during concurrent state changes via calls to cancel() or
 				// to failExternally()
 				while (true) {
@@ -708,6 +735,14 @@ public class Task implements Runnable {
 	//  Canceling / Failing the task from the outside
 	// ----------------------------------------------------------------------------------------------------------------
 
+	/**
+	 * Cancels the task execution. If the task is already in a terminal state
+	 * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
+	 * Otherwise it sets the state to CANCELING, and, if the invokable code is running,
+	 * starts an asynchronous thread that aborts that code.
+	 * 
+	 * <p>This method never blocks.</p>
+	 */
 	public void cancelExecution() {
 		LOG.info("Attempting to cancel task " + taskNameWithSubtask);
 		if (cancelOrFailAndCancelInvokable(ExecutionState.CANCELING)) {
@@ -716,7 +751,13 @@ public class Task implements Runnable {
 	}
 
 	/**
-	 * Sets the tasks to be cancelled and reports a failure back to the master.
+	 * Marks task execution failed for an external reason (a reason other than th task code itself
+	 * throwing an exception). If the task is already in a terminal state
+	 * (such as FINISHED, CANCELED, FAILED), or if the task is already canceling this does nothing.
+	 * Otherwise it sets the state to FAILED, and, if the invokable code is running,
+	 * starts an asynchronous thread that aborts that code.
+	 *
+	 * <p>This method never blocks.</p>
 	 */
 	public void failExternally(Throwable cause) {
 		LOG.info("Attempting to fail task externally " + taskNameWithSubtask);
@@ -751,6 +792,8 @@ public class Task implements Runnable {
 						LOG.info("Triggering cancellation of task code {} ({}).", taskNameWithSubtask, executionId);
 
 						// because the canceling may block on user code, we cancel from a separate thread
+						// we do not reuse the async call handler, because that one may be blocked, in which
+						// case the canceling could not continue
 						Runnable canceler = new TaskCanceler(LOG, invokable, executingThread, taskNameWithSubtask);
 						Thread cancelThread = new Thread(executingThread.getThreadGroup(), canceler,
 								"Canceler for " + taskNameWithSubtask);
@@ -798,15 +841,42 @@ public class Task implements Runnable {
 	//  Notifications on the invokable
 	// ------------------------------------------------------------------------
 
-	public void triggerCheckpointBarrier(final long checkpointID) {
-		AbstractInvokable invokabe = this.invokable;
+	/**
+	 * Calls the invokable to trigger a checkpoint, if the invokable implements the interface
+	 * {@link org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator}.
+	 * 
+	 * @param checkpointID The ID identifying the checkpoint.
+	 * @param checkpointTimestamp The timestamp associated with the checkpoint.   
+	 */
+	public void triggerCheckpointBarrier(final long checkpointID, final long checkpointTimestamp) {
+		AbstractInvokable invokable = this.invokable;
 		
-		if (executionState == ExecutionState.RUNNING && invokabe != null) {
-			if (invokabe instanceof BarrierTransceiver) {
-				final BarrierTransceiver barrierTransceiver = (BarrierTransceiver) invokabe;
+		if (executionState == ExecutionState.RUNNING && invokable != null) {
+			if (invokable instanceof CheckpointedOperator) {
+				
+				// build a local closure 
+				final CheckpointedOperator checkpointer = (CheckpointedOperator) invokable;
+				final Logger logger = LOG;
+				final String taskName = taskNameWithSubtask;
+				
+				Runnable runnable = new Runnable() {
+					@Override
+					public void run() {
+						try {
+							checkpointer.triggerCheckpoint(checkpointID, checkpointTimestamp);
+						}
+						catch (Throwable t) {
+							logger.error("Error while triggering checkpoint for " + taskName, t);
+						}
+					}
+				};
+				executeAsyncCallRunnable(runnable, "Checkpoint Trigger");
+			}
+			else if (invokable instanceof BarrierTransceiver) {
+				final BarrierTransceiver barrierTransceiver = (BarrierTransceiver) invokable;
 				final Logger logger = LOG;
 				
-				Thread caller = new Thread("Barrier emitter") {
+				Runnable runnable = new Runnable() {
 					@Override
 					public void run() {
 						try {
@@ -817,8 +887,7 @@ public class Task implements Runnable {
 						}
 					}
 				};
-				caller.setDaemon(true);
-				caller.start();
+				executeAsyncCallRunnable(runnable, "Checkpoint Trigger");
 			}
 			else {
 				LOG.error("Task received a checkpoint request, but is not a checkpointing task - "
@@ -826,10 +895,49 @@ public class Task implements Runnable {
 			}
 		}
 		else {
-			LOG.debug("Ignoring request to trigger a checkpoint barrier");
+			LOG.debug("Ignoring request to trigger a checkpoint for non-running task.");
 		}
 	}
 	
+	public void confirmCheckpoint(final long checkpointID, final long checkpointTimestamp) {
+		AbstractInvokable invokable = this.invokable;
+
+		if (executionState == ExecutionState.RUNNING && invokable != null) {
+			if (invokable instanceof CheckpointCommittingOperator) {
+
+				// build a local closure 
+				final CheckpointCommittingOperator checkpointer = (CheckpointCommittingOperator) invokable;
+				final Logger logger = LOG;
+				final String taskName = taskNameWithSubtask;
+
+				Runnable runnable = new Runnable() {
+					@Override
+					public void run() {
+						try {
+							checkpointer.confirmCheckpoint(checkpointID, checkpointTimestamp);
+						}
+						catch (Throwable t) {
+							logger.error("Error while confirming checkpoint for " + taskName, t);
+						}
+					}
+				};
+				executeAsyncCallRunnable(runnable, "Checkpoint Confirmation");
+			}
+			else {
+				LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - "
+						+ taskNameWithSubtask);
+			}
+		}
+		else {
+			LOG.debug("Ignoring checkpoint commit notification for non-running task.");
+		}
+	}
+	
+	private void executeAsyncCallRunnable(Runnable runnable, String callName) {
+		Thread thread = new Thread(runnable, callName);
+		thread.setDaemon(true);
+		thread.start();
+	}
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableObject.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableObject.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableObject.java
new file mode 100644
index 0000000..af6fa16
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializableObject.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.util;
+
+/**
+ * A simple object that only implements {@link java.io.Serializable}, so it can be used
+ * in serializable classes.
+ */
+public class SerializableObject implements java.io.Serializable {
+	
+	private static final long serialVersionUID = -7322636177391854669L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 75463fd..4745fb6 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -33,11 +33,11 @@ import org.apache.flink.runtime.client._
 import org.apache.flink.runtime.executiongraph.{ExecutionJobVertex, ExecutionGraph}
 import org.apache.flink.runtime.jobmanager.web.WebInfoServer
 import org.apache.flink.runtime.messages.ArchiveMessages.ArchiveExecutionGraph
-import org.apache.flink.runtime.messages.CheckpointingMessages.{StateBarrierAck, BarrierAck}
 import org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
 import org.apache.flink.runtime.messages.Messages.{Disconnect, Acknowledge}
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.accumulators._
+import org.apache.flink.runtime.messages.checkpoint.{AcknowledgeCheckpoint, AbstractCheckpointMessage}
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
@@ -47,7 +47,7 @@ import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.instance.InstanceManager
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
+import org.apache.flink.runtime.jobgraph.{JobVertexID, JobGraph, JobStatus}
 import org.apache.flink.runtime.jobmanager.accumulators.AccumulatorManager
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkScheduler}
 import org.apache.flink.runtime.messages.JobManagerMessages._
@@ -89,19 +89,19 @@ import scala.collection.JavaConverters._
  * - [[JobStatusChanged]] indicates that the status of job (RUNNING, CANCELING, FINISHED, etc.) has
  * changed. This message is sent by the ExecutionGraph.
  */
-class JobManager(val flinkConfiguration: Configuration,
-                 val instanceManager: InstanceManager,
-                 val scheduler: FlinkScheduler,
-                 val libraryCacheManager: BlobLibraryCacheManager,
-                 val archive: ActorRef,
-                 val accumulatorManager: AccumulatorManager,
-                 val defaultExecutionRetries: Int,
-                 val delayBetweenRetries: Long,
-                 val timeout: FiniteDuration)
+class JobManager(protected val flinkConfiguration: Configuration,
+                 protected val instanceManager: InstanceManager,
+                 protected val scheduler: FlinkScheduler,
+                 protected val libraryCacheManager: BlobLibraryCacheManager,
+                 protected val archive: ActorRef,
+                 protected val accumulatorManager: AccumulatorManager,
+                 protected val defaultExecutionRetries: Int,
+                 protected val delayBetweenRetries: Long,
+                 protected val timeout: FiniteDuration)
   extends Actor with ActorLogMessages with ActorSynchronousLogging {
 
   /** List of current jobs running jobs */
-  val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
+  protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]()
 
 
   /**
@@ -278,6 +278,9 @@ class JobManager(val flinkConfiguration: Configuration,
 
       sender ! NextInputSplit(serializedInputSplit)
 
+    case checkpointMessage : AbstractCheckpointMessage =>
+      handleCheckpointMessage(checkpointMessage)
+      
     case JobStatusChanged(jobID, newJobStatus, timeStamp, error) =>
       currentJobs.get(jobID) match {
         case Some((executionGraph, jobInfo)) => executionGraph.getJobName
@@ -323,19 +326,6 @@ class JobManager(val flinkConfiguration: Configuration,
           removeJob(jobID)
       }
 
-    case msg: BarrierAck =>
-      currentJobs.get(msg.jobID) match {
-        case Some(jobExecution) =>
-          jobExecution._1.getStateCheckpointerActor forward  msg
-        case None =>
-      }
-    case msg: StateBarrierAck =>
-      currentJobs.get(msg.jobID) match {
-        case Some(jobExecution) =>
-          jobExecution._1.getStateCheckpointerActor forward  msg
-        case None =>
-      }
-
     case ScheduleOrUpdateConsumers(jobId, partitionId) =>
       currentJobs.get(jobId) match {
         case Some((executionGraph, _)) =>
@@ -486,10 +476,7 @@ class JobManager(val flinkConfiguration: Configuration,
         executionGraph.setDelayBeforeRetrying(delayBetweenRetries)
         executionGraph.setScheduleMode(jobGraph.getScheduleMode)
         executionGraph.setQueuedSchedulingAllowed(jobGraph.getAllowQueuedScheduling)
-
-        executionGraph.setCheckpointingEnabled(jobGraph.isCheckpointingEnabled)
-        executionGraph.setCheckpointingInterval(jobGraph.getCheckpointingInterval)
-
+        
         // initialize the vertices that have a master initialization hook
         // file output formats create directories here, input formats create splits
         if (log.isDebugEnabled) {
@@ -531,8 +518,33 @@ class JobManager(val flinkConfiguration: Configuration,
           log.debug(s"Successfully created execution graph from job graph ${jobId} (${jobName}).")
         }
 
-        // give an actorContext
-        executionGraph.setParentContext(context)
+        // configure the state checkpointing
+        val snapshotSettings = jobGraph.getSnapshotSettings
+        if (snapshotSettings != null) {
+
+          val idToVertex: JobVertexID => ExecutionJobVertex = id => {
+            val vertex = executionGraph.getJobVertex(id)
+            if (vertex == null) {
+              throw new JobSubmissionException(jobId,
+                "The snapshot checkpointing settings refer to non-existent vertex " + id)
+            }
+            vertex
+          }
+
+          val triggerVertices: java.util.List[ExecutionJobVertex] =
+            snapshotSettings.getVerticesToTrigger.asScala.map(idToVertex).asJava
+
+          val ackVertices: java.util.List[ExecutionJobVertex] =
+            snapshotSettings.getVerticesToAcknowledge.asScala.map(idToVertex).asJava
+
+          val confirmVertices: java.util.List[ExecutionJobVertex] =
+            snapshotSettings.getVerticesToConfirm.asScala.map(idToVertex).asJava
+
+          executionGraph.enableSnaphotCheckpointing(
+            snapshotSettings.getCheckpointInterval, snapshotSettings.getCheckpointTimeout,
+            triggerVertices, ackVertices, confirmVertices,
+            context.system)
+        }
 
         // get notified about job status changes
         executionGraph.registerJobStatusListener(self)
@@ -588,6 +600,40 @@ class JobManager(val flinkConfiguration: Configuration,
   }
 
   /**
+   * Dedicated handler for checkpoint messages.
+   * 
+   * @param actorMessage The checkpoint actor message.
+   */
+  private def handleCheckpointMessage(actorMessage: AbstractCheckpointMessage): Unit = {
+    actorMessage match {
+      case ackMessage: AcknowledgeCheckpoint =>
+        val jid = ackMessage.getJob()
+        currentJobs.get(jid) match {
+          case Some((graph, _)) =>
+            val coordinator = graph.getCheckpointCoordinator()
+            if (coordinator != null) {
+              try {
+                coordinator.receiveAcknowledgeMessage(ackMessage)
+              }
+              catch {
+                case t: Throwable =>
+                  log.error(s"Error in CheckpointCoordinator while processing $ackMessage", t)
+              }
+            }
+            else {
+              log.error(
+                s"Received ConfirmCheckpoint message for job $jid with no CheckpointCoordinator")
+            }
+            
+          case None => log.error(s"Received ConfirmCheckpoint for unavailable job $jid")
+        }
+
+      // unknown checkpoint message
+      case _ => unhandled(actorMessage)
+    }
+  }
+  
+  /**
    * Handle unmatched messages with an exception.
    */
   override def unhandled(message: Any): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
deleted file mode 100644
index 8bb1274..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/StreamCheckpointCoordinator.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager
-
-import java.lang.{Long => JLong}
-
-import akka.actor._
-import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
-import org.apache.flink.runtime.execution.ExecutionState
-import org.apache.flink.runtime.executiongraph.{ExecutionGraph, ExecutionVertex}
-import org.apache.flink.runtime.jobgraph.JobStatus._
-import org.apache.flink.runtime.jobgraph.JobVertexID
-import org.apache.flink.runtime.messages.CheckpointingMessages._
-import org.apache.flink.runtime.state.StateHandle
-
-import scala.collection.JavaConversions._
-import scala.collection.immutable.TreeMap
-import scala.concurrent.duration.{FiniteDuration, _}
-
-/**
- * The StreamCheckpointCoordinator is responsible for operator state management and checkpoint
- * coordination in streaming jobs. It periodically sends checkpoint barriers to the sources of a
- * running job and constantly collects acknowledgements from operators while the barriers are being 
- * disseminated throughout the execution graph. Upon time intervals it finds the last globally
- * acknowledged checkpoint barrier to be used for a consistent recovery and loads all associated 
- * state handles to the respected execution vertices.
- * 
- * The following messages describe this actor's expected behavior: 
- *
- *  - [[InitBarrierScheduler]] initiates the actor and schedules the periodic [[BarrierTimeout]] 
- *  and [[CompactAndUpdate]] messages that are used for maintaining the state checkpointing logic. 
- *
- *  - [[BarrierTimeout]] is periodically triggered upon initiation in order to start a new 
- *  checkpoint barrier. That is when the barriers are being disseminated to the source vertices.
- *
- *  - [[BarrierAck]] is being sent by each operator upon the completion of a state checkpoint. All
- *  such acknowledgements are being collected and inspected upon [[CompactAndUpdate]] handling in
- *  order to find out the last consistent checkpoint.
- *  
- *  - [[StateBarrierAck]] describes an acknowledgement such as the case of a [[BarrierAck]] that 
- *  additionally carries operatorState with it.
- *
- * - [[CompactAndUpdate]] marks the last globally consistent checkpoint barrier to be used for 
- * recovery purposes and removes all older states and acknowledgements up to that barrier.
- * Furthermore, it updates the current ExecutionGraph with the current operator state handles 
- * 
- */
-
-class StreamCheckpointCoordinator(val executionGraph: ExecutionGraph,
-                                  val vertices: Iterable[ExecutionVertex],
-                                  var acks: Map[(JobVertexID,Int),List[JLong]],
-                                  var states: Map[(JobVertexID, Integer, JLong), StateHandle],
-                                  val interval: FiniteDuration,
-                                  var curId: JLong,
-                                  var ackId: JLong)
-extends Actor with ActorLogMessages with ActorSynchronousLogging {
-
-  implicit private val executor = context.dispatcher
-
-  override def receiveWithLogMessages: Receive = {
-
-    case InitBarrierScheduler =>
-      context.system.scheduler.schedule(interval,interval,self,BarrierTimeout)
-      context.system.scheduler.schedule(2 * interval,2 * interval,self,CompactAndUpdate)
-      log.info("Started Stream State Monitor for job " +
-        s"${executionGraph.getJobID}${executionGraph.getJobName}")
-      
-    case BarrierTimeout =>
-      executionGraph.getState match {
-        case FAILED | CANCELED | FINISHED =>
-          log.info(s"Stopping monitor for terminated job ${executionGraph.getJobID}.")
-          self ! PoisonPill
-        case RUNNING =>
-          curId += 1
-          log.debug("Sending Barrier to vertices of Job " + executionGraph.getJobName)
-          vertices.filter(v => v.getJobVertex.getJobVertex.isInputVertex &&
-                  v.getExecutionState == ExecutionState.RUNNING).foreach(vertex
-          => vertex.getCurrentAssignedResource.getInstance.getTaskManager
-                    ! BarrierReq(vertex.getCurrentExecutionAttempt.getAttemptId,curId))
-        case _ =>
-          log.debug("Omitting sending barrier since graph is in " +
-            s"${executionGraph.getState} state for job ${executionGraph.getJobID}.")
-      }
-      
-    case StateBarrierAck(jobID, jobVertexID, instanceID, checkpointID, opState) =>
-      states += (jobVertexID, instanceID, checkpointID) -> opState
-      self ! BarrierAck(jobID, jobVertexID, instanceID, checkpointID)
-      
-    case BarrierAck(jobID, jobVertexID,instanceID,checkpointID) =>
-          acks.get(jobVertexID,instanceID) match {
-            case Some(acklist) =>
-              acks += (jobVertexID,instanceID) -> (checkpointID :: acklist)
-            case None =>
-          }
-          log.debug(acks.toString())
-      
-    case CompactAndUpdate =>
-      val barrierCount =
-        acks.values.foldLeft(TreeMap[JLong,Int]().withDefaultValue(0))((dict,myList)
-      => myList.foldLeft(dict)((dict2,elem) => dict2.updated(elem,dict2(elem) + 1)))
-      val keysToKeep = barrierCount.filter(_._2 == acks.size).keys
-      ackId = if(keysToKeep.nonEmpty) keysToKeep.max else ackId
-      acks.keys.foreach(x => acks = acks.updated(x,acks(x).filter(_ >= ackId)))
-      states = states.filterKeys(_._3 >= ackId)
-      log.debug("[FT-MONITOR] Last global barrier is " + ackId)
-      executionGraph.loadOperatorStates(states)
-      
-  }
-}
-
-object StreamCheckpointCoordinator {
-
-  def spawn(context: ActorContext,executionGraph: ExecutionGraph,
-            interval: FiniteDuration = 5 seconds): ActorRef = {
-
-    val vertices: Iterable[ExecutionVertex] = getExecutionVertices(executionGraph)
-    val monitor = context.system.actorOf(Props(new StreamCheckpointCoordinator(executionGraph,
-      vertices,vertices.map(x => ((x.getJobVertex.getJobVertexId,x.getParallelSubtaskIndex),
-              List.empty[JLong])).toMap, Map() ,interval,0L,-1L)))
-    monitor ! InitBarrierScheduler
-    monitor
-  }
-
-  private def getExecutionVertices(executionGraph: ExecutionGraph): Iterable[ExecutionVertex] = {
-    for((_,execJobVertex) <- executionGraph.getAllVertices;
-        execVertex: ExecutionVertex <- execJobVertex.getTaskVertices)
-    yield execVertex
-  }
-}
-
-case class BarrierTimeout()
-
-case class InitBarrierScheduler()
-
-case class CompactAndUpdate()

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/CheckpointingMessages.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/CheckpointingMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/CheckpointingMessages.scala
deleted file mode 100644
index 9f6f51a..0000000
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/CheckpointingMessages.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.messages
-
-import org.apache.flink.api.common.JobID
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.JobVertexID
-import org.apache.flink.runtime.state.StateHandle
-
-/**
- * Actor messages specific to checkpoints (triggering, acknowledging,
- * state transfer, ...)
- */
-object CheckpointingMessages {
-
-  /**
-   * Abstract base trait for all checkpoint messages.
-   */
-  trait CheckpointingMessage
-
-  // --------------------------------------------------------------------------
-
-  case class BarrierReq(attemptID: ExecutionAttemptID,
-                        checkpointID: Long) extends CheckpointingMessage
-
-  case class BarrierAck(jobID: JobID,
-                        jobVertexID:JobVertexID,
-                        instanceID: Int,
-                        checkpointID: Long) extends CheckpointingMessage
-
-  case class StateBarrierAck(jobID: JobID,
-                             jobVertexID: JobVertexID,
-                             instanceID: Integer,
-                             checkpointID: java.lang.Long,
-                             states: StateHandle) extends CheckpointingMessage
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 2e580cc..ed63db0 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -37,6 +37,7 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import grizzled.slf4j.Logger
 
 import org.apache.flink.configuration._
+import org.apache.flink.runtime.messages.checkpoint.{ConfirmCheckpoint, TriggerCheckpoint, AbstractCheckpointMessage}
 import org.apache.flink.runtime.{ActorSynchronousLogging, ActorLogMessages}
 import org.apache.flink.runtime.akka.AkkaUtils
 import org.apache.flink.runtime.blob.{BlobService, BlobCache}
@@ -53,7 +54,6 @@ import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.memorymanager.{MemoryManager, DefaultMemoryManager}
-import org.apache.flink.runtime.messages.CheckpointingMessages.{CheckpointingMessage, BarrierReq}
 import org.apache.flink.runtime.messages.Messages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages._
@@ -241,7 +241,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
     case message: TaskMessage => handleTaskMessage(message)
 
     // messages for coordinating checkpoints
-    case message: CheckpointingMessage => handleCheckpointingMessage(message)
+    case message: AbstractCheckpointMessage => handleCheckpointingMessage(message)
 
     // registration messages for connecting and disconnecting from / to the JobManager
     case message: RegistrationMessage => handleRegistrationMessage(message)
@@ -345,7 +345,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
         currentJobManager foreach {
           jobManager => {
             val futureResponse = (jobManager ? updateMsg)(askTimeout)
-            
+
             val executionID = taskExecutionState.getID
 
             futureResponse.mapTo[Boolean].onComplete {
@@ -399,25 +399,43 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging {
   /**
    * Handler for messages related to checkpoints.
    *
-   * @param message The checkpoint message.
+   * @param actorMessage The checkpoint message.
    */
-  private def handleCheckpointingMessage(message: CheckpointingMessage): Unit = {
+  private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = {
 
-    message match {
+    actorMessage match {
 
-      case BarrierReq(attemptID, checkpointID) =>
-        log.debug(s"[FT-TaskManager] Barrier $checkpointID request received " +
-          s"for attempt $attemptID.")
+      case message: TriggerCheckpoint =>
+        val taskExecutionId = message.getTaskExecutionId
+        val checkpointId = message.getCheckpointId
+        val timestamp = message.getTimestamp
+        
+        log.debug(s"Receiver TriggerCheckpoint ${checkpointId}@${timestamp} for $taskExecutionId.")
 
-        val task = runningTasks.get(attemptID)
+        val task = runningTasks.get(taskExecutionId)
         if (task != null) {
-          task.triggerCheckpointBarrier(checkpointID)
+          task.triggerCheckpointBarrier(checkpointId, timestamp)
         } else {
-          log.debug(s"Taskmanager received a checkpoint request for unknown task $attemptID.")
+          log.debug(s"Taskmanager received a checkpoint request for unknown task $taskExecutionId.")
+        }
+
+      case message: ConfirmCheckpoint =>
+        val taskExecutionId = message.getTaskExecutionId
+        val checkpointId = message.getCheckpointId
+        val timestamp = message.getTimestamp
+
+        log.debug(s"Receiver ConfirmCheckpoint ${checkpointId}@${timestamp} for $taskExecutionId.")
+
+        val task = runningTasks.get(taskExecutionId)
+        if (task != null) {
+          task.confirmCheckpoint(checkpointId, timestamp)
+        } else {
+          log.debug(
+            s"Taskmanager received a checkpoint confirmation for unknown task $taskExecutionId.")
         }
 
       // unknown checkpoint message
-      case _ => unhandled(message)
+      case _ => unhandled(actorMessage)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index aee0e63..10c8074 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -197,8 +197,8 @@ public class CheckpointCoordinatorTest {
 			
 			// validate that the relevant tasks got a confirmation message
 			{
-				ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointId);
-				ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointId);
+				ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointId, timestamp);
+				ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointId, timestamp);
 				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
 				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
 			}
@@ -235,8 +235,8 @@ public class CheckpointCoordinatorTest {
 				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
 				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2));
 
-				ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointIdNew);
-				ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointIdNew);
+				ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew);
+				ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew);
 				verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1));
 				verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
 			}
@@ -341,7 +341,7 @@ public class CheckpointCoordinatorTest {
 			
 			// the first confirm message should be out
 			verify(commitVertex, times(1)).sendMessageToCurrentExecution(
-					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId1), commitAttemptID);
+					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId1, timestamp1), commitAttemptID);
 			
 			// send the last remaining ack for the second checkpoint
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
@@ -353,7 +353,7 @@ public class CheckpointCoordinatorTest {
 
 			// the second commit message should be out
 			verify(commitVertex, times(1)).sendMessageToCurrentExecution(
-					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2), commitAttemptID);
+					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
 			
 			// validate the committed checkpoints
 			List<SuccessfulCheckpoint> scs = coord.getSuccessfulCheckpoints();
@@ -480,7 +480,7 @@ public class CheckpointCoordinatorTest {
 
 			// the first confirm message should be out
 			verify(commitVertex, times(1)).sendMessageToCurrentExecution(
-					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2), commitAttemptID);
+					new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
 
 			// send the last remaining ack for the first checkpoint. This should not do anything
 			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
new file mode 100644
index 0000000..51c4890
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.LocalStateHandle;
+import org.apache.flink.runtime.state.OperatorState;
+import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.util.SerializedValue;
+
+import org.junit.Test;
+
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
+/**
+ * Tests concerning the restoring of state from a checkpoint to the task executions.
+ */
+public class CheckpointStateRestoreTest {
+	
+	@Test
+	public void testSetState() {
+		try {
+			final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>(
+					new LocalStateHandle(Collections.<String,OperatorState<?>>emptyMap()));
+			
+			final JobID jid = new JobID();
+			final JobVertexID statefulId = new JobVertexID();
+			final JobVertexID statelessId = new JobVertexID();
+			
+			Execution statefulExec1 = mockExecution();
+			Execution statefulExec2 = mockExecution();
+			Execution statefulExec3 = mockExecution();
+			Execution statelessExec1 = mockExecution();
+			Execution statelessExec2 = mockExecution();
+			
+			ExecutionVertex stateful1 = mockExecutionVertex(statefulExec1, statefulId, 0);
+			ExecutionVertex stateful2 = mockExecutionVertex(statefulExec2, statefulId, 1);
+			ExecutionVertex stateful3 = mockExecutionVertex(statefulExec3, statefulId, 2);
+			ExecutionVertex stateless1 = mockExecutionVertex(statelessExec1, statelessId, 0);
+			ExecutionVertex stateless2 = mockExecutionVertex(statelessExec2, statelessId, 1);
+
+			ExecutionJobVertex stateful = mockExecutionJobVertex(statefulId,
+					new ExecutionVertex[] { stateful1, stateful2, stateful3 });
+			ExecutionJobVertex stateless = mockExecutionJobVertex(statelessId,
+					new ExecutionVertex[] { stateless1, stateless2 });
+			
+			Map<JobVertexID, ExecutionJobVertex> map = new HashMap<JobVertexID, ExecutionJobVertex>();
+			map.put(statefulId, stateful);
+			map.put(statelessId, stateless);
+			
+			
+			CheckpointCoordinator coord = new CheckpointCoordinator(jid, 1, 200000L, 
+					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
+					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
+					new ExecutionVertex[0]);
+			
+			// create ourselves a checkpoint with state
+			final long timestamp = 34623786L;
+			coord.triggerCheckpoint(timestamp);
+			
+			PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next();
+			final long checkpointId = pending.getCheckpointId();
+			
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, serializedState));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
+			
+			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+			
+			// let the coordinator inject the state
+			coord.restoreLatestCheckpointedState(map, true, false);
+			
+			// verify that each stateful vertex got the state
+			verify(statefulExec1, times(1)).setInitialState(serializedState);
+			verify(statefulExec2, times(1)).setInitialState(serializedState);
+			verify(statefulExec3, times(1)).setInitialState(serializedState);
+			verify(statelessExec1, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any());
+			verify(statelessExec2, times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@Test
+	public void testStateOnlyPartiallyAvailable() {
+		try {
+			final SerializedValue<StateHandle<?>> serializedState = new SerializedValue<StateHandle<?>>(
+					new LocalStateHandle(Collections.<String,OperatorState<?>>emptyMap()));
+
+			final JobID jid = new JobID();
+			final JobVertexID statefulId = new JobVertexID();
+			final JobVertexID statelessId = new JobVertexID();
+
+			Execution statefulExec1 = mockExecution();
+			Execution statefulExec2 = mockExecution();
+			Execution statefulExec3 = mockExecution();
+			Execution statelessExec1 = mockExecution();
+			Execution statelessExec2 = mockExecution();
+
+			ExecutionVertex stateful1 = mockExecutionVertex(statefulExec1, statefulId, 0);
+			ExecutionVertex stateful2 = mockExecutionVertex(statefulExec2, statefulId, 1);
+			ExecutionVertex stateful3 = mockExecutionVertex(statefulExec3, statefulId, 2);
+			ExecutionVertex stateless1 = mockExecutionVertex(statelessExec1, statelessId, 0);
+			ExecutionVertex stateless2 = mockExecutionVertex(statelessExec2, statelessId, 1);
+
+			ExecutionJobVertex stateful = mockExecutionJobVertex(statefulId,
+					new ExecutionVertex[] { stateful1, stateful2, stateful3 });
+			ExecutionJobVertex stateless = mockExecutionJobVertex(statelessId,
+					new ExecutionVertex[] { stateless1, stateless2 });
+
+			Map<JobVertexID, ExecutionJobVertex> map = new HashMap<JobVertexID, ExecutionJobVertex>();
+			map.put(statefulId, stateful);
+			map.put(statelessId, stateless);
+
+
+			CheckpointCoordinator coord = new CheckpointCoordinator(jid, 1, 200000L,
+					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
+					new ExecutionVertex[] { stateful1, stateful2, stateful3, stateless1, stateless2 },
+					new ExecutionVertex[0]);
+
+			// create ourselves a checkpoint with state
+			final long timestamp = 34623786L;
+			coord.triggerCheckpoint(timestamp);
+
+			PendingCheckpoint pending = coord.getPendingCheckpoints().values().iterator().next();
+			final long checkpointId = pending.getCheckpointId();
+
+			// the difference to the test "testSetState" is that one stateful subtask does not report state
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, serializedState));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, serializedState));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
+			coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
+
+			assertEquals(1, coord.getNumberOfRetainedSuccessfulCheckpoints());
+			assertEquals(0, coord.getNumberOfPendingCheckpoints());
+
+			// let the coordinator inject the state
+			try {
+				coord.restoreLatestCheckpointedState(map, true, true);
+				fail("this should fail with an exception");
+			}
+			catch (IllegalStateException e) {
+				// swish!
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@Test
+	public void testNoCheckpointAvailable() {
+		try {
+			CheckpointCoordinator coord = new CheckpointCoordinator(new JobID(), 1, 200000L,
+					new ExecutionVertex[] { mock(ExecutionVertex.class) },
+					new ExecutionVertex[] { mock(ExecutionVertex.class) },
+					new ExecutionVertex[0]);
+
+			try {
+				coord.restoreLatestCheckpointedState(new HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
+				fail("this should throw an exception");
+			}
+			catch (IllegalStateException e) {
+				// expected
+			}
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	
+	private Execution mockExecution() {
+		Execution mock = mock(Execution.class);
+		when(mock.getAttemptId()).thenReturn(new ExecutionAttemptID());
+		when(mock.getState()).thenReturn(ExecutionState.CREATED);
+		return mock;
+	}
+	
+	private ExecutionVertex mockExecutionVertex(Execution execution, JobVertexID vertexId, int subtask) {
+		ExecutionVertex mock = mock(ExecutionVertex.class);
+		when(mock.getJobvertexId()).thenReturn(vertexId);
+		when(mock.getParallelSubtaskIndex()).thenReturn(subtask);
+		when(mock.getCurrentExecutionAttempt()).thenReturn(execution);
+		return mock;
+	}
+	
+	private ExecutionJobVertex mockExecutionJobVertex(JobVertexID id, ExecutionVertex[] vertices) {
+		ExecutionJobVertex vertex = mock(ExecutionJobVertex.class);
+		when(vertex.getParallelism()).thenReturn(vertices.length);
+		when(vertex.getJobVertexId()).thenReturn(id);
+		when(vertex.getTaskVertices()).thenReturn(vertices);
+		return vertex;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
new file mode 100644
index 0000000..3d9a155
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CoordinatorShutdownTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import akka.actor.ActorRef;
+import akka.pattern.Patterns;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
+import org.apache.flink.runtime.jobmanager.Tasks;
+
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.junit.Test;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+public class CoordinatorShutdownTest {
+	
+	@Test
+	public void testCoordinatorShutsDownOnFailure() {
+		LocalFlinkMiniCluster cluster = null;
+		try {
+			Configuration noTaskManagerConfig = new Configuration();
+			noTaskManagerConfig.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 0);
+			cluster = new LocalFlinkMiniCluster(noTaskManagerConfig, true);
+			
+			// build a test graph with snapshotting enabled
+			AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+			vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
+			
+			JobGraph testGraph = new JobGraph("test job", vertex);
+			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
+			
+			ActorRef jobManager = cluster.getJobManager();
+
+			FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS);
+			JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, false);
+			
+			// submit is successful, but then the job dies because no TaskManager / slot is available
+			Future<Object> submitFuture = Patterns.ask(jobManager, submitMessage, timeout.toMillis());
+			Await.result(submitFuture, timeout);
+
+			// get the execution graph and make sure the coordinator is properly shut down
+			Future<Object> jobRequestFuture = Patterns.ask(jobManager,
+					new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout.toMillis());
+			
+			ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
+			
+			assertNotNull(graph);
+			graph.waitUntilFinished();
+			
+			CheckpointCoordinator coord = graph.getCheckpointCoordinator();
+			assertTrue(coord == null || coord.isShutdown());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (cluster != null) {
+				cluster.shutdown();
+				cluster.awaitTermination();
+			}
+		}
+	}
+
+	@Test
+	public void testCoordinatorShutsDownOnSuccess() {
+		LocalFlinkMiniCluster cluster = null;
+		try {
+			cluster = new LocalFlinkMiniCluster(new Configuration(), true);
+			
+			// build a test graph with snapshotting enabled
+			AbstractJobVertex vertex = new AbstractJobVertex("Test Vertex");
+			vertex.setInvokableClass(Tasks.NoOpInvokable.class);
+			List<JobVertexID> vertexIdList = Collections.singletonList(vertex.getID());
+
+			JobGraph testGraph = new JobGraph("test job", vertex);
+			testGraph.setSnapshotSettings(new JobSnapshottingSettings(vertexIdList, vertexIdList, vertexIdList, 5000));
+			
+			ActorRef jobManager = cluster.getJobManager();
+
+			FiniteDuration timeout = new FiniteDuration(60, TimeUnit.SECONDS);
+			JobManagerMessages.SubmitJob submitMessage = new JobManagerMessages.SubmitJob(testGraph, false);
+
+			// submit is successful, but then the job dies because no TaskManager / slot is available
+			Future<Object> submitFuture = Patterns.ask(jobManager, submitMessage, timeout.toMillis());
+			Await.result(submitFuture, timeout);
+
+			// get the execution graph and make sure the coordinator is properly shut down
+			Future<Object> jobRequestFuture = Patterns.ask(jobManager,
+					new JobManagerMessages.RequestJob(testGraph.getJobID()), timeout.toMillis());
+
+			ExecutionGraph graph = ((JobManagerMessages.JobFound) Await.result(jobRequestFuture, timeout)).executionGraph();
+
+			assertNotNull(graph);
+			graph.waitUntilFinished();
+
+			CheckpointCoordinator coord = graph.getCheckpointCoordinator();
+			assertTrue(coord == null || coord.isShutdown());
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+		finally {
+			if (cluster != null) {
+				cluster.shutdown();
+				cluster.awaitTermination();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index fe1a598..9a9f486 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -22,32 +22,28 @@ import static org.junit.Assert.*;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.messages.checkpoint.AbortCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
-import org.apache.flink.runtime.state.OperatorState;
 import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.util.SerializedValue;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Map;
 
 public class CheckpointMessagesTest {
 	
 	@Test
 	public void testTriggerAndConfirmCheckpoint() {
 		try {
-			ConfirmCheckpoint cc = new ConfirmCheckpoint(new JobID(), new ExecutionAttemptID(), 45287698767345L);
+			ConfirmCheckpoint cc = new ConfirmCheckpoint(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L);
 			testSerializabilityEqualsHashCode(cc);
 			
 			TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);
 			testSerializabilityEqualsHashCode(tc);
-
-			AbortCheckpoint ac = new AbortCheckpoint(new JobID(), new ExecutionAttemptID(), 1365762983745L);
-			testSerializabilityEqualsHashCode(ac);
+			
 		}
 		catch (Exception e) {
 			e.printStackTrace();
@@ -62,7 +58,8 @@ public class CheckpointMessagesTest {
 											new JobID(), new ExecutionAttemptID(), 569345L);
 
 			AcknowledgeCheckpoint withState = new AcknowledgeCheckpoint(
-											new JobID(), new ExecutionAttemptID(), 87658976143L, new MyHandle());
+											new JobID(), new ExecutionAttemptID(), 87658976143L, 
+											new SerializedValue<StateHandle<?>>(new MyHandle()));
 			
 			testSerializabilityEqualsHashCode(noState);
 			testSerializabilityEqualsHashCode(withState);
@@ -81,12 +78,12 @@ public class CheckpointMessagesTest {
 		assertNotNull(copy.toString());
 	}
 	
-	private static class MyHandle implements StateHandle {
+	private static class MyHandle implements StateHandle<Serializable> {
 
 		private static final long serialVersionUID = 8128146204128728332L;
 
 		@Override
-		public Map<String, OperatorState<?>> getState(ClassLoader userClassloader) {
+		public Serializable getState() {
 			return null;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 735f67e..0f62b27 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.operators.testutils;
 
-import akka.actor.ActorRef;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -41,6 +40,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
 import org.mockito.invocation.InvocationOnMock;
@@ -264,7 +264,12 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public ActorRef getJobManager() {
-		return ActorRef.noSender();
+	public void acknowledgeCheckpoint(long checkpointId) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
+		throw new UnsupportedOperationException();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 1016fa0..e8acccc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -30,8 +30,10 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.streaming.api.graph.StreamGraph.StreamLoop;
@@ -77,19 +79,10 @@ public class StreamingJobGraphGenerator {
 	public JobGraph createJobGraph(String jobName) {
 		jobGraph = new JobGraph(jobName);
 
-		// Turn lazy scheduling off
+		// make sure that all vertices start immediately
 		jobGraph.setScheduleMode(ScheduleMode.ALL);
-		jobGraph.setCheckpointingEnabled(streamGraph.isCheckpointingEnabled());
-		jobGraph.setCheckpointingInterval(streamGraph.getCheckpointingInterval());
-
-		if (jobGraph.isCheckpointingEnabled()) {
-			int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
-			if (executionRetries != -1) {
-				jobGraph.setNumberOfExecutionRetries(executionRetries);
-			} else {
-				jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
-			}
-		}
+		
+		
 		init();
 
 		setChaining();
@@ -97,6 +90,8 @@ public class StreamingJobGraphGenerator {
 		setPhysicalEdges();
 
 		setSlotSharing();
+		
+		configureCheckpointing();
 
 		return jobGraph;
 	}
@@ -356,4 +351,55 @@ public class StreamingJobGraphGenerator {
 			ccg.addVertex(tail);
 		}
 	}
+	
+	private void configureCheckpointing() {
+
+		if (streamGraph.isCheckpointingEnabled()) {
+			long interval = streamGraph.getCheckpointingInterval();
+			if (interval < 1) {
+				throw new IllegalArgumentException("The checkpoint interval must be positive");
+			}
+
+			// gather source and sink IDs
+			HashSet<JobVertexID> sourceIds = new HashSet<JobVertexID>();
+			HashSet<JobVertexID> sinkIds = new HashSet<JobVertexID>();
+			for (AbstractJobVertex vertex : jobVertices.values()) {
+				if (vertex.isInputVertex()) {
+					sourceIds.add(vertex.getID());
+				}
+				if (vertex.isOutputVertex()) {
+					sinkIds.add(vertex.getID());
+				}
+			}
+
+			HashSet<JobVertexID> sourceorSink = new HashSet<JobVertexID>();
+			sourceorSink.addAll(sourceIds);
+			sourceorSink.addAll(sinkIds);
+			
+			// collect the vertices that receive "trigger checkpoint" messages.
+			// currently, these are all the sources
+			List<JobVertexID> triggerVertices = new ArrayList<JobVertexID>(sourceIds);
+
+			// collect the vertices that need to acknowledge the checkpoint
+			// currently, these are the sources and sinks
+			// the sources acknowledge their state backup, the sinks the arrival of the barriers
+			List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(sourceorSink);
+
+			// collect the vertices that receive "commit checkpoint" messages
+			// currently, these are only the sources
+			List<JobVertexID> commitVertices = new ArrayList<JobVertexID>(sourceIds);
+
+			JobSnapshottingSettings settings = new JobSnapshottingSettings(
+					triggerVertices, ackVertices, commitVertices, interval);
+			
+			jobGraph.setSnapshotSettings(settings);
+
+			int executionRetries = streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
+			if (executionRetries != -1) {
+				jobGraph.setNumberOfExecutionRetries(executionRetries);
+			} else {
+				jobGraph.setNumberOfExecutionRetries(Integer.MAX_VALUE);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 82486e8..b4a11aa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -25,11 +25,11 @@ import org.apache.flink.runtime.event.task.TaskEvent;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.BarrierTransceiver;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
+import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
 import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
-import org.apache.flink.runtime.messages.CheckpointingMessages;
 import org.apache.flink.runtime.state.LocalStateHandle;
 import org.apache.flink.runtime.state.OperatorState;
-import org.apache.flink.runtime.state.StateHandle;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.ChainableStreamOperator;
@@ -43,10 +43,10 @@ import org.apache.flink.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import akka.actor.ActorRef;
 
 public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTaskContext<OUT>,
-		BarrierTransceiver, OperatorStateCarrier {
+		OperatorStateCarrier<LocalStateHandle>, CheckpointedOperator, CheckpointCommittingOperator,
+		BarrierTransceiver {
 
 	private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
 
@@ -110,18 +110,12 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
 	 */
 	@Override
 	public void confirmBarrier(long barrierID) throws IOException {
-
 		if (configuration.getStateMonitoring() && !states.isEmpty()) {
-			getEnvironment().getJobManager().tell(
-					new CheckpointingMessages.StateBarrierAck(getEnvironment().getJobID(), getEnvironment()
-							.getJobVertexId(), context.getIndexOfThisSubtask(), barrierID,
-							new LocalStateHandle(states)), ActorRef.noSender());
-		} else {
-			getEnvironment().getJobManager().tell(
-					new CheckpointingMessages.BarrierAck(getEnvironment().getJobID(), getEnvironment().getJobVertexId(),
-							context.getIndexOfThisSubtask(), barrierID), ActorRef.noSender());
+			getEnvironment().acknowledgeCheckpoint(barrierID, new LocalStateHandle(states));
+		}
+		else {
+			getEnvironment().acknowledgeCheckpoint(barrierID);
 		}
-
 	}
 
 	public void setInputsOutputs() {
@@ -304,17 +298,30 @@ public class StreamTask<IN, OUT> extends AbstractInvokable implements StreamTask
 
 	@Override
 	public String toString() {
-		return configuration.getOperatorName() + " (" + context.getIndexOfThisSubtask() + ")";
+		return getEnvironment().getTaskNameWithSubtasks();
 	}
 
 	/**
 	 * Re-injects the user states into the map
 	 */
 	@Override
-	public void injectState(StateHandle stateHandle) {
-		this.states.putAll(stateHandle.getState(userClassLoader));
+	public void setInitialState(LocalStateHandle stateHandle) {
+		this.states.putAll(stateHandle.getState());
 	}
 
+	@Override
+	public void triggerCheckpoint(long checkpointId, long timestamp) {
+		broadcastBarrierFromSource(checkpointId);
+	}
+	
+	@Override
+	public void confirmCheckpoint(long checkpointId, long timestamp) {
+		// we do nothing here so far. this should call commit on the source function, for example
+	}
+
+	
+
+
 	private class SuperstepEventListener implements EventListener<TaskEvent> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
index c4bd095..fd70bfb 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java
@@ -51,7 +51,7 @@ public class StreamCheckpointingITCase {
 	private static final int NUM_TASK_SLOTS = 3;
 	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
 
-	private static final long NUM_STRINGS = 4000000;
+	private static final long NUM_STRINGS = 10000000L;
 
 	private static ForkableFlinkMiniCluster cluster;
 
@@ -99,7 +99,7 @@ public class StreamCheckpointingITCase {
 			StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
 																	"localhost", cluster.getJobManagerRPCPort());
 			env.setParallelism(PARALLELISM);
-			env.enableCheckpointing(200);
+			env.enableCheckpointing(1000);
 			env.getConfig().disableSysoutLogging();
 
 			DataStream<String> stream = env.addSource(new RichParallelSourceFunction<String>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9b7f8aa1/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
index 361621b..877893f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/TaskManagerFailureRecoveryITCase.java
@@ -72,7 +72,7 @@ public class TaskManagerFailureRecoveryITCase {
 			config.setInteger(ConfigConstants.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 2);
 			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
 			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 16);
-
+			
 			config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "500 ms");
 			config.setString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "20 s");
 			config.setInteger(ConfigConstants.AKKA_WATCH_THRESHOLD, 20);


Mime
View raw message