flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [2/3] flink git commit: [FLINK-4685] [checkpoints] Gather sync/async duration and alignment information for task checkpoints
Date Tue, 27 Sep 2016 15:18:24 GMT
[FLINK-4685] [checkpoints] Gather sync/async duration and alignment information for task checkpoints

This adds to each 'acknowledge checkpoint' message
  - number of bytes buffered during alignment
  - duration of alignment phase
  - duration of synchronous part of the operator checkpoint
  - duration of asynchronous part of the operator checkpoint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1642e32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1642e32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1642e32

Branch: refs/heads/master
Commit: b1642e32c2f69c60c2b212260c3479feb66a9165
Parents: 6ea9284
Author: Stephan Ewen <sewen@apache.org>
Authored: Mon Sep 26 14:10:21 2016 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Sep 27 14:58:41 2016 +0200

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         | 19 ++--
 .../flink/runtime/execution/Environment.java    | 43 +++++++--
 .../runtime/jobgraph/tasks/StatefulTask.java    | 27 ++++--
 .../checkpoint/AcknowledgeCheckpoint.java       | 93 +++++++++++++++++---
 .../ActorGatewayCheckpointResponder.java        | 23 ++---
 .../taskmanager/CheckpointResponder.java        | 29 ++++--
 .../runtime/taskmanager/RuntimeEnvironment.java | 27 ++++--
 .../jobmanager/JobManagerHARecoveryTest.java    | 11 ++-
 .../operators/testutils/DummyEnvironment.java   | 14 +--
 .../operators/testutils/MockEnvironment.java    | 15 ++--
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  6 ++
 .../streaming/runtime/io/BarrierBuffer.java     | 83 +++++++++++------
 .../streaming/runtime/io/BarrierTracker.java    | 44 +++++----
 .../streaming/runtime/io/BufferSpiller.java     | 36 +++++---
 .../runtime/io/CheckpointBarrierHandler.java    | 30 +++++--
 .../runtime/io/StreamInputProcessor.java        | 24 +++--
 .../runtime/io/StreamTwoInputProcessor.java     | 16 ++--
 .../runtime/tasks/OneInputStreamTask.java       |  2 +-
 .../streaming/runtime/tasks/StreamTask.java     | 87 ++++++++++++------
 .../runtime/tasks/TwoInputStreamTask.java       |  2 +-
 .../streaming/runtime/io/BarrierBufferTest.java | 88 ++++++++++++++----
 .../runtime/io/BarrierTrackerTest.java          | 40 +++++++--
 .../runtime/tasks/OneInputStreamTaskTest.java   | 21 +++--
 .../runtime/tasks/StreamMockEnvironment.java    | 14 +--
 24 files changed, 576 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index d5b9b46..c0c9ca1 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -136,15 +136,16 @@ public class RocksDBAsyncSnapshotTest {
 				testHarness.bufferSize) {
 
 			@Override
-			public void acknowledgeCheckpoint(long checkpointId) {
-				super.acknowledgeCheckpoint(checkpointId);
-			}
-
-			@Override
-			public void acknowledgeCheckpoint(long checkpointId,
-			                                  ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			                                  List<KeyGroupsStateHandle> keyGroupStateHandles) {
-				super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles);
+			public void acknowledgeCheckpoint(
+					long checkpointId,
+					ChainedStateHandle<StreamStateHandle> chainedStateHandle, 
+					List<KeyGroupsStateHandle> keyGroupStateHandles,
+					long synchronousDurationMillis, long asynchronousDurationMillis,
+					long bytesBufferedInAlignment, long alignmentDurationNanos) {
+
+				super.acknowledgeCheckpoint(checkpointId, chainedStateHandle, keyGroupStateHandles,
+						synchronousDurationMillis, asynchronousDurationMillis,
+						bytesBufferedInAlignment, alignmentDurationNanos);
 
 				// block on the latch, to verify that triggerCheckpoint returns below,
 				// even though the async checkpoint would not finish

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 1eee9d4..273c0d9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -164,23 +164,52 @@ public interface Environment {
 	 * to for the checkpoint with the give checkpoint-ID. This method does not include
 	 * any state in the checkpoint.
 	 * 
-	 * @param checkpointId The ID of the checkpoint.
+	 * @param checkpointId
+	 *             The ID of the checkpoint.
+	 * @param synchronousDurationMillis
+	 *             The duration (in milliseconds) of the synchronous part of the operator checkpoint
+	 * @param asynchronousDurationMillis
+	 *             The duration (in milliseconds) of the asynchronous part of the operator checkpoint 
+	 * @param bytesBufferedInAlignment
+	 *             The number of bytes that were buffered during the checkpoint alignment phase
+	 * @param alignmentDurationNanos
+	 *             The duration (in nanoseconds) that the stream alignment for the checkpoint took   
 	 */
-	void acknowledgeCheckpoint(long checkpointId);
+	void acknowledgeCheckpoint(
+			long checkpointId,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos);
 
 	/**
 	 * Confirms that the invokable has successfully completed all required steps for
 	 * the checkpoint with the give checkpoint-ID. This method does include
 	 * the given state in the checkpoint.
 	 *
-	 * @param checkpointId The ID of the checkpoint.
-	 * @param chainedStateHandle Handle for the chained operator state
-	 * @param keyGroupStateHandles  Handles for key group state
+	 * @param checkpointId
+	 *             The ID of the checkpoint.
+	 * @param chainedStateHandle
+	 *             Handle for the chained operator state
+	 * @param keyGroupStateHandles
+	 *             Handles for key group state
+	 * @param synchronousDurationMillis
+	 *             The duration (in milliseconds) of the synchronous part of the operator checkpoint
+	 * @param asynchronousDurationMillis
+	 *             The duration (in milliseconds) of the asynchronous part of the operator checkpoint 
+	 * @param bytesBufferedInAlignment
+	 *             The number of bytes that were buffered during the checkpoint alignment phase
+	 * @param alignmentDurationNanos
+	 *             The duration (in nanoseconds) that the stream alignment for the checkpoint took   
 	 */
 	void acknowledgeCheckpoint(
 			long checkpointId,
 			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			List<KeyGroupsStateHandle> keyGroupStateHandles);
+			List<KeyGroupsStateHandle> keyGroupStateHandles,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos);
 
 	/**
 	 * Marks task execution failed for an external reason (a reason other than the task code itself
@@ -189,7 +218,7 @@ public interface Environment {
 	 * Otherwise it sets the state to FAILED, and, if the invokable code is running,
 	 * starts an asynchronous thread that aborts that code.
 	 *
-	 * <p>This method never blocks.</p>
+	 * <p>This method never blocks.
 	 */
 	void failExternally(Throwable cause);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index cab7ed6..9ddfdf7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -22,7 +22,6 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
-
 import java.util.List;
 
 /**
@@ -41,18 +40,34 @@ public interface StatefulTask {
 	void setInitialState(ChainedStateHandle<StreamStateHandle> chainedState, List<KeyGroupsStateHandle> keyGroupsState) throws Exception;
 
 	/**
-	 * This method is either called directly and asynchronously by the checkpoint
-	 * coordinator (in the case of functions that are directly notified - usually
-	 * the data sources), or called synchronously when all incoming channels have
-	 * reported a checkpoint barrier.
+	 * This method is called to trigger a checkpoint, asynchronously by the checkpoint
+	 * coordinator.
+	 * 
+	 * <p>This method is called for tasks that start the checkpoints by injecting the initial barriers,
+	 * i.e., the source tasks. In contrast, checkpoints on downstream operators, which are the result of
+	 * receiving checkpoint barriers, invoke the {@link #triggerCheckpointOnBarrier(long, long, long, long)}
+	 * method.
 	 *
-	 * @param checkpointId The ID of the checkpoint, incrementing.
+	 * @param checkpointId The ID of the checkpoint, strictly incrementing.
 	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
 	 *
 	 * @return {@code false} if the checkpoint can not be carried out, {@code true} otherwise
 	 */
 	boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception;
 
+	/**
+	 * This method is called when a checkpoint is triggered as a result of receiving checkpoint
+	 * barriers on all input streams.
+	 * 
+	 * @param checkpointId The ID of the checkpoint, strictly incrementing.
+	 * @param timestamp The timestamp when the checkpoint was triggered at the JobManager.
+	 * @param bytesAligned The number of bytes that were buffered during the alignment of the streams.
+	 * @param alignmentTimeNanos The time that the stream alignment took, in nanoseconds.   
+	 * 
+	 * @throws Exception Exceptions thrown as the result of triggering a checkpoint are forwarded.
+	 */
+	void triggerCheckpointOnBarrier(
+			long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception;
 
 	/**
 	 * Invoked when a checkpoint has been completed, i.e., when the checkpoint coordinator has received

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index 0c56603..72396eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -26,6 +26,8 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 
 import java.util.List;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
+
 /**
  * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the
  * {@link org.apache.flink.runtime.jobmanager.JobManager} to signal that the checkpoint of an
@@ -37,27 +39,74 @@ import java.util.List;
 public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable {
 
 	private static final long serialVersionUID = -7606214777192401493L;
-	
+
 	private final ChainedStateHandle<StreamStateHandle> stateHandle;
 
 	private final List<KeyGroupsStateHandle> keyGroupsStateHandle;
 
-	public AcknowledgeCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId) {
+	/** The duration (in milliseconds) that the synchronous part of the checkpoint took */
+	private final long synchronousDurationMillis;
+
+	/** The duration (in milliseconds) that the asynchronous part of the checkpoint took */
+	private final long asynchronousDurationMillis;
+
+	/** The number of bytes that were buffered during the checkpoint alignment phase */
+	private final long bytesBufferedInAlignment;
+
+	/** The duration (in nanoseconds) that the alignment phase of the task's checkpoint took */
+	private final long alignmentDurationNanos;
+
+	// ------------------------------------------------------------------------
+
+	public AcknowledgeCheckpoint(
+			JobID job,
+			ExecutionAttemptID taskExecutionId,
+			long checkpointId) {
 		this(job, taskExecutionId, checkpointId, null, null);
 	}
 
 	public AcknowledgeCheckpoint(
-		JobID job,
-		ExecutionAttemptID taskExecutionId,
-		long checkpointId,
-		ChainedStateHandle<StreamStateHandle> state,
-		List<KeyGroupsStateHandle> keyGroupStateAndSizes) {
+			JobID job,
+			ExecutionAttemptID taskExecutionId,
+			long checkpointId,
+			ChainedStateHandle<StreamStateHandle> state,
+			List<KeyGroupsStateHandle> keyGroupStateAndSizes) {
+		this(job, taskExecutionId, checkpointId, state, keyGroupStateAndSizes, -1L, -1L, -1L, -1L);
+	}
+
+	public AcknowledgeCheckpoint(
+			JobID job,
+			ExecutionAttemptID taskExecutionId,
+			long checkpointId,
+			ChainedStateHandle<StreamStateHandle> state,
+			List<KeyGroupsStateHandle> keyGroupStateAndSizes,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos) {
 
 		super(job, taskExecutionId, checkpointId);
+
+		// these may be null in cases where the operator has no state
 		this.stateHandle = state;
 		this.keyGroupsStateHandle = keyGroupStateAndSizes;
+
+		// these may be "-1", in case the values are unknown or not set
+		checkArgument(synchronousDurationMillis >= -1);
+		checkArgument(asynchronousDurationMillis >= -1);
+		checkArgument(bytesBufferedInAlignment >= -1);
+		checkArgument(alignmentDurationNanos >= -1);
+
+		this.synchronousDurationMillis = synchronousDurationMillis;
+		this.asynchronousDurationMillis = asynchronousDurationMillis;
+		this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+		this.alignmentDurationNanos = alignmentDurationNanos;
 	}
 
+	// ------------------------------------------------------------------------
+	//  properties
+	// ------------------------------------------------------------------------
+
 	public ChainedStateHandle<StreamStateHandle> getStateHandle() {
 		return stateHandle;
 	}
@@ -66,8 +115,29 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 		return keyGroupsStateHandle;
 	}
 
+	public long getSynchronousDurationMillis() {
+		return synchronousDurationMillis;
+	}
+
+	public long getAsynchronousDurationMillis() {
+		return asynchronousDurationMillis;
+	}
+
+	public long getBytesBufferedInAlignment() {
+		return bytesBufferedInAlignment;
+	}
+
+	public long getAlignmentDurationNanos() {
+		return alignmentDurationNanos;
+	}
+
 	// --------------------------------------------------------------------------------------------
-	
+
+	@Override
+	public int hashCode() {
+		return super.hashCode();
+	}
+
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
@@ -76,9 +146,10 @@ public class AcknowledgeCheckpoint extends AbstractCheckpointMessage implements
 		else if (o instanceof AcknowledgeCheckpoint) {
 			AcknowledgeCheckpoint that = (AcknowledgeCheckpoint) o;
 			return super.equals(o) &&
-					(this.stateHandle == null ? that.stateHandle == null : (that.stateHandle != null && this.stateHandle.equals(that.stateHandle))) &&
-					(this.keyGroupsStateHandle == null ? that.keyGroupsStateHandle == null : (that.keyGroupsStateHandle != null && this.keyGroupsStateHandle.equals(that.keyGroupsStateHandle)));
-
+					(this.stateHandle == null ? that.stateHandle == null : 
+							(that.stateHandle != null && this.stateHandle.equals(that.stateHandle))) &&
+					(this.keyGroupsStateHandle == null ? that.keyGroupsStateHandle == null : 
+							(that.keyGroupsStateHandle != null && this.keyGroupsStateHandle.equals(that.keyGroupsStateHandle)));
 		}
 		else {
 			return false;

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
index 56e5922..c317bed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
@@ -43,18 +43,21 @@ public class ActorGatewayCheckpointResponder implements CheckpointResponder {
 
 	@Override
 	public void acknowledgeCheckpoint(
-		JobID jobID,
-		ExecutionAttemptID executionAttemptID,
-		long checkpointID,
-		ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-		List<KeyGroupsStateHandle> keyGroupStateHandles) {
+			JobID jobID,
+			ExecutionAttemptID executionAttemptID,
+			long checkpointID,
+			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
+			List<KeyGroupsStateHandle> keyGroupStateHandles,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos) {
 
 		AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
-			jobID,
-			executionAttemptID,
-			checkpointID,
-			chainedStateHandle,
-			keyGroupStateHandles);
+				jobID, executionAttemptID, checkpointID,
+				chainedStateHandle, keyGroupStateHandles,
+				synchronousDurationMillis, asynchronousDurationMillis,
+				bytesBufferedInAlignment, alignmentDurationNanos);
 
 		actorGateway.tell(message);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
index 9d5c4e1..b3f9827 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
@@ -34,18 +34,35 @@ public interface CheckpointResponder {
 	/**
 	 * Acknowledges the given checkpoint.
 	 *
-	 * @param jobID Job ID of the running job
-	 * @param executionAttemptID Execution attempt ID of the running task
-	 * @param checkpointID Checkpoint ID of the checkpoint
-	 * @param chainedStateHandle Chained state handle
-	 * @param keyGroupStateHandles State handles for key groups
+	 * @param jobID
+	 *             Job ID of the running job
+	 * @param executionAttemptID
+	 *             Execution attempt ID of the running task
+	 * @param checkpointID
+	 *             Checkpoint ID of the checkpoint
+	 * @param chainedStateHandle
+	 *             Chained state handle
+	 * @param keyGroupStateHandles
+	 *             State handles for key groups
+	 * @param synchronousDurationMillis
+	 *             The duration (in milliseconds) of the synchronous part of the operator checkpoint
+	 * @param asynchronousDurationMillis
+	 *             The duration (in milliseconds) of the asynchronous part of the operator checkpoint 
+	 * @param bytesBufferedInAlignment
+	 *             The number of bytes that were buffered during the checkpoint alignment phase
+	 * @param alignmentDurationNanos
+	 *             The duration (in nanoseconds) that the stream alignment for the checkpoint took
 	 */
 	void acknowledgeCheckpoint(
 		JobID jobID,
 		ExecutionAttemptID executionAttemptID,
 		long checkpointID,
 		ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-		List<KeyGroupsStateHandle> keyGroupStateHandles);
+		List<KeyGroupsStateHandle> keyGroupStateHandles,
+		long synchronousDurationMillis,
+		long asynchronousDurationMillis,
+		long bytesBufferedInAlignment,
+		long alignmentDurationNanos);
 
 	/**
 	 * Declines the given checkpoint.

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 3e4ba4d..23b6f82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -239,23 +239,34 @@ public class RuntimeEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId) {
-		acknowledgeCheckpoint(checkpointId, null, null);
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos) {
+
+		acknowledgeCheckpoint(checkpointId, null, null,
+				synchronousDurationMillis, asynchronousDurationMillis,
+				bytesBufferedInAlignment, alignmentDurationNanos);
 	}
 
 	@Override
 	public void acknowledgeCheckpoint(
 			long checkpointId,
 			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			List<KeyGroupsStateHandle> keyGroupStateHandles) {
+			List<KeyGroupsStateHandle> keyGroupStateHandles,
+			long synchronousDurationMillis,
+			long asynchronousDurationMillis,
+			long bytesBufferedInAlignment,
+			long alignmentDurationNanos) {
 
 
 		checkpointResponder.acknowledgeCheckpoint(
-			jobId,
-			executionId,
-			checkpointId,
-			chainedStateHandle,
-			keyGroupStateHandles);
+				jobId, executionId, checkpointId,
+				chainedStateHandle, keyGroupStateHandles,
+				synchronousDurationMillis, asynchronousDurationMillis,
+				bytesBufferedInAlignment, alignmentDurationNanos);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 6b80c3d..ef8e3bd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -453,12 +453,15 @@ public class JobManagerHARecoveryTest {
 			try {
 				ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(
 						InstantiationUtil.serializeObject(checkpointId));
+
 				RetrievableStreamStateHandle<Long> state = new RetrievableStreamStateHandle<Long>(byteStreamStateHandle);
 				ChainedStateHandle<StreamStateHandle> chainedStateHandle = new ChainedStateHandle<StreamStateHandle>(Collections.singletonList(state));
+
 				getEnvironment().acknowledgeCheckpoint(
 						checkpointId,
 						chainedStateHandle,
-						Collections.<KeyGroupsStateHandle>emptyList());
+						Collections.<KeyGroupsStateHandle>emptyList(),
+						0L, 0L, 0L, 0L);
 				return true;
 			} catch (Exception ex) {
 				throw new RuntimeException(ex);
@@ -466,6 +469,12 @@ public class JobManagerHARecoveryTest {
 		}
 
 		@Override
+		public void triggerCheckpointOnBarrier(
+				long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception {
+			throw new UnsupportedOperationException("should not be called!");
+		}
+
+		@Override
 		public void notifyCheckpointComplete(long checkpointId) {
 			if (completedCheckpoints++ > NUM_CHECKPOINTS_TO_COMPLETE) {
 				completedCheckpointsLatch.countDown();

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 4654507..a857d1b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -153,14 +153,18 @@ public class DummyEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId) {
-
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			long synchronousDurationMillis, long asynchronousDurationMillis,
+			long bytesBufferedInAlignment, long alignmentDurationNanos) {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId,
-			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			List<KeyGroupsStateHandle> keyGroupStateHandles) {
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			ChainedStateHandle<StreamStateHandle> chainedStateHandle, List<KeyGroupsStateHandle> keyGroupStateHandles,
+			long synchronousDurationMillis, long asynchronousDurationMillis,
+			long bytesBufferedInAlignment, long alignmentDurationNanos) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index e7bf6e1..75e88eb 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -314,15 +314,18 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId) {
-		throw new UnsupportedOperationException();
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			long synchronousDurationMillis, long asynchronousDurationMillis,
+			long bytesBufferedInAlignment, long alignmentDurationNanos) {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId,
-			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			List<KeyGroupsStateHandle> keyGroupStateHandles) {
-		throw new UnsupportedOperationException();
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			ChainedStateHandle<StreamStateHandle> chainedStateHandle, List<KeyGroupsStateHandle> keyGroupStateHandles,
+			long synchronousDurationMillis, long asynchronousDurationMillis,
+			long bytesBufferedInAlignment, long alignmentDurationNanos) {
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index a5f4019..ed30fd7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -228,6 +228,12 @@ public class TaskAsyncCallTest {
 		}
 
 		@Override
+		public void triggerCheckpointOnBarrier(
+				long checkpointId, long timestamp, long bytesAligned, long alignmentTimeNanos) throws Exception {
+			throw new UnsupportedOperationException("Should not be called");
+		}
+
+		@Override
 		public void notifyCheckpointComplete(long checkpointId) {
 			if (checkpointId != lastCheckpointId && this.error == null) {
 				this.error = new Exception("calls out of order");

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index dcd76c6..d60c999 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -25,7 +25,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import org.slf4j.Logger;
@@ -37,22 +37,22 @@ import org.slf4j.LoggerFactory;
  * 
  * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the
  * BarrierBuffer continues receiving buffers from the blocked channels and stores them internally until 
- * the blocks are released.</p>
+ * the blocks are released.
  */
 @Internal
 public class BarrierBuffer implements CheckpointBarrierHandler {
 
 	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
-	
+
 	/** The gate that the buffer draws its input from */
 	private final InputGate inputGate;
 
 	/** Flags that indicate whether a channel is currently blocked/buffered */
 	private final boolean[] blockedChannels;
-	
+
 	/** The total number of channels that this buffer handles data from */
 	private final int totalNumberOfInputChannels;
-	
+
 	/** To utility to write blocked data to a file channel */
 	private final BufferSpiller bufferSpiller;
 
@@ -65,17 +65,23 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	private BufferSpiller.SpilledBufferOrEventSequence currentBuffered;
 
 	/** Handler that receives the checkpoint notifications */
-	private EventListener<CheckpointBarrier> checkpointHandler;
+	private StatefulTask toNotifyOnCheckpoint;
 
 	/** The ID of the checkpoint for which we expect barriers */
 	private long currentCheckpointId = -1L;
 
 	/** The number of received barriers (= number of blocked/buffered channels) */
 	private int numBarriersReceived;
-	
+
 	/** The number of already closed channels */
 	private int numClosedChannels;
-	
+
+	/** The timestamp as in {@link System#nanoTime()} at which the last alignment started */
+	private long startOfAlignmentTimestamp;
+
+	/** The time (in nanoseconds) that the latest alignment took */
+	private long latestAlignmentDurationNanos;
+
 	/** Flag to indicate whether we have drawn all available input */
 	private boolean endOfStream;
 
@@ -100,7 +106,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	// ------------------------------------------------------------------------
 
 	@Override
-	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+	public BufferOrEvent getNextNonBlocked() throws Exception {
 		while (true) {
 			// process buffered BufferOrEvents before grabbing new ones
 			BufferOrEvent next;
@@ -114,7 +120,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 					return getNextNonBlocked();
 				}
 			}
-			
+
 			if (next != null) {
 				if (isBlocked(next.getChannelIndex())) {
 					// if the channel is blocked we, we just store the BufferOrEvent
@@ -139,12 +145,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				}
 			}
 			else if (!endOfStream) {
-				// end of stream. we feed the data that is still buffered
+				// end of input stream. stream continues with the buffered data
 				endOfStream = true;
 				releaseBlocks();
 				return getNextNonBlocked();
 			}
 			else {
+				// final end of both input and buffered data
 				return null;
 			}
 		}
@@ -158,7 +165,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 		}
 	}
 	
-	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws IOException {
+	private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {
 		final long barrierId = receivedBarrier.getId();
 
 		if (numBarriersReceived > 0) {
@@ -175,6 +182,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				releaseBlocks();
 				currentCheckpointId = barrierId;
 				onBarrier(channelIndex);
+
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Starting stream alignment for checkpoint {}", barrierId);
+				}
+				startOfAlignmentTimestamp = System.nanoTime();
 			}
 			else {
 				// ignore trailing barrier from aborted checkpoint
@@ -186,6 +198,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 			// first barrier of a new checkpoint
 			currentCheckpointId = barrierId;
 			onBarrier(channelIndex);
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Starting stream alignment for checkpoint {}", barrierId);
+			}
+			startOfAlignmentTimestamp = System.nanoTime();
 		}
 		else {
 			// trailing barrier from previous (skipped) checkpoint
@@ -199,21 +216,23 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 						receivedBarrier.getId(), receivedBarrier.getTimestamp());
 			}
 
-			if (checkpointHandler != null) {
-				checkpointHandler.onEvent(receivedBarrier);
-			}
-			
 			releaseBlocks();
+
+			if (toNotifyOnCheckpoint != null) {
+				toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+						receivedBarrier.getId(), receivedBarrier.getTimestamp(),
+						bufferSpiller.getBytesWritten(), latestAlignmentDurationNanos);
+			}
 		}
 	}
 	
 	@Override
-	public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
-		if (this.checkpointHandler == null) {
-			this.checkpointHandler = checkpointHandler;
+	public void registerCheckpointEventHandler(StatefulTask toNotifyOnCheckpoint) {
+		if (this.toNotifyOnCheckpoint == null) {
+			this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
 		}
 		else {
-			throw new IllegalStateException("BarrierBuffer already has a registered checkpoint handler");
+			throw new IllegalStateException("BarrierBuffer already has a registered checkpoint notifyee");
 		}
 	}
 	
@@ -267,9 +286,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	 * is the next to be consumed.
 	 */
 	private void releaseBlocks() throws IOException {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Releasing blocks");
-		}
+		LOG.debug("End of stream alignment, feeding buffered data back");
 
 		for (int i = 0; i < blockedChannels.length; i++) {
 			blockedChannels[i] = false;
@@ -295,10 +312,16 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 				currentBuffered = bufferedNow;
 			}
 		}
+
+		final long now = System.nanoTime();
+		if (startOfAlignmentTimestamp > 0) {
+			latestAlignmentDurationNanos = now - startOfAlignmentTimestamp;
+			startOfAlignmentTimestamp = 0;
+		}
 	}
 
 	// ------------------------------------------------------------------------
-	// For Testing
+	//  Properties
 	// ------------------------------------------------------------------------
 
 	/**
@@ -309,7 +332,17 @@ public class BarrierBuffer implements CheckpointBarrierHandler {
 	public long getCurrentCheckpointId() {
 		return this.currentCheckpointId;
 	}
-	
+
+	@Override
+	public long getAlignmentDurationNanos() {
+		long start = this.startOfAlignmentTimestamp;
+		if (start <= 0) {
+			return latestAlignmentDurationNanos;
+		} else {
+			return System.nanoTime() - start;
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	// Utilities 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
index 9c9ec4f..1db5845 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
@@ -21,10 +21,9 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
-import java.io.IOException;
 import java.util.ArrayDeque;
 
 /**
@@ -57,11 +56,12 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
 	
 	/** The listener to be notified on complete checkpoints */
-	private EventListener<CheckpointBarrier> checkpointHandler;
+	private StatefulTask toNotifyOnCheckpoint;
 	
 	/** The highest checkpoint ID encountered so far */
 	private long latestPendingCheckpointID = -1;
-	
+
+	// ------------------------------------------------------------------------
 	
 	public BarrierTracker(InputGate inputGate) {
 		this.inputGate = inputGate;
@@ -70,7 +70,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	}
 
 	@Override
-	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
+	public BufferOrEvent getNextNonBlocked() throws Exception {
 		while (true) {
 			BufferOrEvent next = inputGate.getNextBufferOrEvent();
 			if (next == null) {
@@ -86,12 +86,12 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 	}
 
 	@Override
-	public void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler) {
-		if (this.checkpointHandler == null) {
-			this.checkpointHandler = checkpointHandler;
+	public void registerCheckpointEventHandler(StatefulTask toNotifyOnCheckpoint) {
+		if (this.toNotifyOnCheckpoint == null) {
+			this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
 		}
 		else {
-			throw new IllegalStateException("BarrierTracker already has a registered checkpoint handler");
+			throw new IllegalStateException("BarrierTracker already has a registered checkpoint notifyee");
 		}
 	}
 
@@ -105,22 +105,29 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		return pendingCheckpoints.isEmpty();
 	}
 
-	private void processBarrier(CheckpointBarrier receivedBarrier) {
+	@Override
+	public long getAlignmentDurationNanos() {
+		// this one does not do alignment at all
+		return 0L;
+	}
+
+	private void processBarrier(CheckpointBarrier receivedBarrier) throws Exception {
 		// fast path for single channel trackers
 		if (totalNumberOfInputChannels == 1) {
-			if (checkpointHandler != null) {
-				checkpointHandler.onEvent(receivedBarrier);
+			if (toNotifyOnCheckpoint != null) {
+				toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+						receivedBarrier.getId(), receivedBarrier.getTimestamp(), 0L, 0L);
 			}
 			return;
 		}
-		
+
 		// general path for multiple input channels
 		final long barrierId = receivedBarrier.getId();
 
 		// find the checkpoint barrier in the queue of bending barriers
 		CheckpointBarrierCount cbc = null;
 		int pos = 0;
-		
+
 		for (CheckpointBarrierCount next : pendingCheckpoints) {
 			if (next.checkpointId == barrierId) {
 				cbc = next;
@@ -128,7 +135,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 			}
 			pos++;
 		}
-		
+
 		if (cbc != null) {
 			// add one to the count to that barrier and check for completion
 			int numBarriersNew = cbc.incrementBarrierCount();
@@ -141,8 +148,9 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 				}
 				
 				// notify the listener
-				if (checkpointHandler != null) {
-					checkpointHandler.onEvent(receivedBarrier);
+				if (toNotifyOnCheckpoint != null) {
+					toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+							receivedBarrier.getId(), receivedBarrier.getTimestamp(), 0L, 0L);
 				}
 			}
 		}
@@ -182,7 +190,7 @@ public class BarrierTracker implements CheckpointBarrierHandler {
 		public int incrementBarrierCount() {
 			return ++barrierCount;
 		}
-		
+
 		@Override
 		public int hashCode() {
 			return (int) ((checkpointId >>> 32) ^ checkpointId) + 17 * barrierCount; 

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 1b38a56..5a8a4cd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -85,8 +85,8 @@ public class BufferSpiller {
 	/** A counter, to created numbered spill files */
 	private int fileCounter;
 	
-	/** A flag to check whether the spiller has written since the last roll over */
-	private boolean hasWritten;
+	/** The number of bytes written since the last roll over */
+	private long bytesWritten;
 	
 	/**
 	 * Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
@@ -124,7 +124,6 @@ public class BufferSpiller {
 	 * @throws IOException Thrown, if the buffer of event could not be spilled.
 	 */
 	public void add(BufferOrEvent boe) throws IOException {
-		hasWritten = true;
 		try {
 			ByteBuffer contents;
 			if (boe.isBuffer()) {
@@ -133,6 +132,7 @@ public class BufferSpiller {
 			}
 			else {
 				contents = EventSerializer.toSerializedEvent(boe.getEvent());
+				
 			}
 			
 			headBuffer.clear();
@@ -140,7 +140,9 @@ public class BufferSpiller {
 			headBuffer.putInt(contents.remaining());
 			headBuffer.put((byte) (boe.isBuffer() ? 0 : 1));
 			headBuffer.flip();
-			
+
+			bytesWritten += (headBuffer.remaining() + contents.remaining());
+
 			sources[1] = contents;
 			currentChannel.write(sources);
 		}
@@ -186,10 +188,10 @@ public class BufferSpiller {
 	}
 	
 	private SpilledBufferOrEventSequence rollOverInternal(boolean newBuffer) throws IOException {
-		if (!hasWritten) {
+		if (bytesWritten == 0) {
 			return null;
 		}
-		
+
 		ByteBuffer buf;
 		if (newBuffer) {
 			buf = ByteBuffer.allocateDirect(READ_BUFFER_SIZE);
@@ -197,16 +199,16 @@ public class BufferSpiller {
 		} else {
 			buf = readBuffer;
 		}
-		
+
 		// create a reader for the spilled data
 		currentChannel.position(0L);
 		SpilledBufferOrEventSequence seq = 
 				new SpilledBufferOrEventSequence(currentSpillFile, currentChannel, buf, pageSize);
-		
+
 		// create ourselves a new spill file
 		createSpillingChannel();
-		
-		hasWritten = false;
+
+		bytesWritten = 0L;
 		return seq;
 	}
 
@@ -225,6 +227,14 @@ public class BufferSpiller {
 		}
 	}
 
+	/**
+	 * Gets the number of bytes written in the current spill file.
+	 * @return the number of bytes written in the current spill file
+	 */
+	public long getBytesWritten() {
+		return bytesWritten;
+	}
+
 	// ------------------------------------------------------------------------
 	//  For testing
 	// ------------------------------------------------------------------------
@@ -255,16 +265,16 @@ public class BufferSpiller {
 	 * method {@link #getNext()}.
 	 */
 	public static class SpilledBufferOrEventSequence {
-		
+
 		/** Header is "channel index" (4 bytes) + length (4 bytes) + buffer/event (1 byte) */
 		private static final int HEADER_LENGTH = 9;
 
 		/** The file containing the data */
 		private final File file;
-		
+
 		/** The file channel to draw the data from */
 		private final FileChannel fileChannel;
-		
+
 		/** The byte buffer for bulk reading */
 		private final ByteBuffer buffer;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 5aa2030..56859fb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -20,8 +20,7 @@ package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
 import java.io.IOException;
 
@@ -39,18 +38,22 @@ public interface CheckpointBarrierHandler {
 	 * has been determined to be finished.
 	 * 
 	 * @return The next BufferOrEvent, or {@code null}, if the stream is finished.
-	 * @throws java.io.IOException Thrown if the network or local disk I/O fails.
-	 * @throws java.lang.InterruptedException Thrown if the thread is interrupted while blocking during
-	 *                                        waiting for the next BufferOrEvent to become available.
+	 * 
+	 * @throws IOException Thrown if the network or local disk I/O fails.
+	 * 
+	 * @throws InterruptedException Thrown if the thread is interrupted while blocking during
+	 *                              waiting for the next BufferOrEvent to become available.
+	 * @throws Exception Thrown in case that a checkpoint fails that is started as the result of receiving 
+	 *                   the last checkpoint barrier 
 	 */
-	BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException;
+	BufferOrEvent getNextNonBlocked() throws Exception;
 
 	/**
-	 * Registers the given event handler to be notified on successful checkpoints.
+	 * Registers the task be notified once all checkpoint barriers have been received for a checkpoint.
 	 * 
-	 * @param checkpointHandler The handler to register.
+	 * @param task The task to notify
 	 */
-	void registerCheckpointEventHandler(EventListener<CheckpointBarrier> checkpointHandler);
+	void registerCheckpointEventHandler(StatefulTask task);
 
 	/**
 	 * Cleans up all internally held resources.
@@ -64,4 +67,13 @@ public interface CheckpointBarrierHandler {
 	 * @return {@code True}, if no data is buffered internally, {@code false} otherwise.
 	 */
 	boolean isEmpty();
+
+	/**
+	 * Gets the time that the latest alignment took, in nanoseconds.
+	 * If there is currently an alignment in progress, it will return the time spent in the
+	 * current alignment so far.
+	 * 
+	 * @return The duration in nanoseconds
+	 */
+	long getAlignmentDurationNanos();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index d11990e..85e9297 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -24,6 +24,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -37,7 +38,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -45,7 +45,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 /**
  * Input reader for {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
@@ -85,10 +84,10 @@ public class StreamInputProcessor<IN> {
 
 	@SuppressWarnings("unchecked")
 	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
-								EventListener<CheckpointBarrier> checkpointListener,
-								CheckpointingMode checkpointMode,
-								IOManager ioManager,
-								boolean enableWatermarkMultiplexing) throws IOException {
+			StatefulTask checkpointedTask,
+			CheckpointingMode checkpointMode,
+			IOManager ioManager,
+			boolean enableWatermarkMultiplexing) throws IOException {
 
 		InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
@@ -102,8 +101,8 @@ public class StreamInputProcessor<IN> {
 			throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode);
 		}
 		
-		if (checkpointListener != null) {
-			this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+		if (checkpointedTask != null) {
+			this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
 		}
 		
 		if (enableWatermarkMultiplexing) {
@@ -215,7 +214,7 @@ public class StreamInputProcessor<IN> {
 	 * Sets the metric group for this StreamInputProcessor.
 	 * 
 	 * @param metrics metric group
-     */
+	 */
 	public void setMetricGroup(IOMetricGroup metrics) {
 		metrics.gauge("currentLowWatermark", new Gauge<Long>() {
 			@Override
@@ -223,6 +222,13 @@ public class StreamInputProcessor<IN> {
 				return lastEmittedWatermark;
 			}
 		});
+
+		metrics.gauge("checkpointAlignmentTime", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return barrierHandler.getAlignmentDurationNanos();
+			}
+		});
 	}
 	
 	public void cleanup() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index ce764b7..70ce783 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
@@ -34,7 +35,6 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -42,7 +42,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -95,7 +94,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			Collection<InputGate> inputGates2,
 			TypeSerializer<IN1> inputSerializer1,
 			TypeSerializer<IN2> inputSerializer2,
-			EventListener<CheckpointBarrier> checkpointListener,
+			StatefulTask checkpointedTask,
 			CheckpointingMode checkpointMode,
 			IOManager ioManager,
 			boolean enableWatermarkMultiplexing) throws IOException {
@@ -112,8 +111,8 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			throw new IllegalArgumentException("Unrecognized CheckpointingMode: " + checkpointMode);
 		}
 		
-		if (checkpointListener != null) {
-			this.barrierHandler.registerCheckpointEventHandler(checkpointListener);
+		if (checkpointedTask != null) {
+			this.barrierHandler.registerCheckpointEventHandler(checkpointedTask);
 		}
 		
 		if (enableWatermarkMultiplexing) {
@@ -294,6 +293,13 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 				return Math.min(lastEmittedWatermark1, lastEmittedWatermark2);
 			}
 		});
+
+		metrics.gauge("checkpointAlignmentTime", new Gauge<Long>() {
+			@Override
+			public Long getValue() {
+				return barrierHandler.getAlignmentDurationNanos();
+			}
+		});
 	}
 	
 	public void cleanup() throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 938d8c1..d6d2fb5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -43,7 +43,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 		if (numberOfInputs > 0) {
 			InputGate[] inputGates = getEnvironment().getAllInputGates();
 			inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer,
-					getCheckpointBarrierListener(), 
+					this, 
 					configuration.getCheckpointMode(),
 					getEnvironment().getIOManager(),
 					isSerializingTimestamps());

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index ff074b7..7976f01 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -26,7 +26,6 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -41,7 +40,6 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
-import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
@@ -589,7 +587,7 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	@Override
 	public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
 		try {
-			return performCheckpoint(checkpointId, timestamp);
+			return performCheckpoint(checkpointId, timestamp, 0L, 0L);
 		}
 		catch (Exception e) {
 			// propagate exceptions only if the task is still in "running" state
@@ -601,11 +599,31 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		}
 	}
 
-	private boolean performCheckpoint(final long checkpointId, final long timestamp) throws Exception {
+	@Override
+	public void triggerCheckpointOnBarrier(
+			long checkpointId, long timestamp, long bytesAligned, long alignmentDurationNanos) throws Exception {
+
+		try {
+			performCheckpoint(checkpointId, timestamp, bytesAligned, alignmentDurationNanos);
+		}
+		catch (CancelTaskException e) {
+			throw e;
+		}
+		catch (Exception e) {
+			throw new Exception("Error while performing a checkpoint", e);
+		}
+	}
+
+	private boolean performCheckpoint(
+			long checkpointId, long timestamp, long bytesBufferedAlignment, long alignmentDurationNanos) throws Exception {
+
 		LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName());
+
 		synchronized (lock) {
 			if (isRunning) {
 
+				final long startOfSyncPart = System.nanoTime();
+
 				// Since both state checkpointing and downstream barrier emission occurs in this
 				// lock scope, they are an atomic operation regardless of the order in which they occur.
 				// Given this, we immediately emit the checkpoint barriers, so the downstream operators
@@ -654,13 +672,20 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 				LOG.debug("Finished synchronous checkpoints for checkpoint {} on task {}", checkpointId, getName());
 
+				final long endOfSyncPart = System.nanoTime();
+				final long syncDurationMillis = (endOfSyncPart - startOfSyncPart) / 1_000_000;
+
 				AsyncCheckpointRunnable asyncCheckpointRunnable = new AsyncCheckpointRunnable(
 						"checkpoint-" + checkpointId + "-" + timestamp,
 						this,
 						cancelables,
 						chainedStateHandles,
 						keyGroupsStateHandleFuture,
-						checkpointId);
+						checkpointId,
+						bytesBufferedAlignment,
+						alignmentDurationNanos,
+						syncDurationMillis,
+						endOfSyncPart);
 
 				synchronized (cancelables) {
 					cancelables.add(asyncCheckpointRunnable);
@@ -851,29 +876,12 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 	// ------------------------------------------------------------------------
 	//  Utilities
 	// ------------------------------------------------------------------------
-	
+
 	@Override
 	public String toString() {
 		return getName();
 	}
 
-	final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
-		return new EventListener<CheckpointBarrier>() {
-			@Override
-			public void onEvent(CheckpointBarrier barrier) {
-				try {
-					performCheckpoint(barrier.getId(), barrier.getTimestamp());
-				}
-				catch (CancelTaskException e) {
-					throw e;
-				}
-				catch (Exception e) {
-					throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
-				}
-			}
-		};
-	}
-
 	// ------------------------------------------------------------------------
 	
 	private static class AsyncCheckpointRunnable implements Runnable, Closeable {
@@ -890,13 +898,25 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 
 		private final String name;
 
+		private final long bytesBufferedInAlignment;
+
+		private final long alignmentDurationNanos;
+
+		private final long syncDurationMillies;
+
+		private final long asyncStartNanos;
+
 		AsyncCheckpointRunnable(
 				String name,
 				StreamTask<?, ?> owner,
 				Set<Closeable> cancelables,
 				ChainedStateHandle<StreamStateHandle> chainedStateHandles,
 				RunnableFuture<KeyGroupsStateHandle> keyGroupsStateHandleFuture,
-				long checkpointId) {
+				long checkpointId,
+				long bytesBufferedInAlignment,
+				long alignmentDurationNanos,
+				long syncDurationMillies,
+				long asyncStartNanos) {
 
 			this.name = name;
 			this.owner = owner;
@@ -904,6 +924,10 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 			this.chainedStateHandles = chainedStateHandles;
 			this.keyGroupsStateHandleFuture = keyGroupsStateHandleFuture;
 			this.checkpointId = checkpointId;
+			this.bytesBufferedInAlignment = bytesBufferedInAlignment;
+			this.alignmentDurationNanos = alignmentDurationNanos;
+			this.syncDurationMillies = syncDurationMillies;
+			this.asyncStartNanos = asyncStartNanos;
 		}
 
 		@Override
@@ -925,19 +949,26 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 					}
 				}
 
+				final long asyncEndNanos = System.nanoTime();
+				final long asyncDurationMillis = (asyncEndNanos - asyncStartNanos) / 1_000_000;
+
 				if (chainedStateHandles.isEmpty() && keyedStates.isEmpty()) {
-					owner.getEnvironment().acknowledgeCheckpoint(checkpointId);
+					owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
+							syncDurationMillies, asyncDurationMillis,
+							bytesBufferedInAlignment, alignmentDurationNanos);
 				} else  {
-					owner.getEnvironment().acknowledgeCheckpoint(checkpointId, chainedStateHandles, keyedStates);
+					owner.getEnvironment().acknowledgeCheckpoint(checkpointId,
+							chainedStateHandles, keyedStates,
+							syncDurationMillies, asyncDurationMillis,
+							bytesBufferedInAlignment, alignmentDurationNanos);
 				}
 
-				if(LOG.isDebugEnabled()) {
+				if (LOG.isDebugEnabled()) {
 					LOG.debug("Finished asynchronous checkpoints for checkpoint {} on task {}. Returning handles on " +
 							"keyed states {}.", checkpointId, name, keyedStates);
 				}
 			}
 			catch (Exception e) {
-
 				// registers the exception and tries to fail the whole task
 				AsynchronousException asyncException = new AsynchronousException(e);
 				owner.registerAsyncException(asyncException);

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index c3305eb..9252063 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -68,7 +68,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 	
 		this.inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2,
 				inputDeserializer1, inputDeserializer2,
-				getCheckpointBarrierListener(),
+				this,
 				configuration.getCheckpointMode(),
 				getEnvironment().getIOManager(),
 				isSerializingTimestamps());

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index d4fdc59..b549ef8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -26,7 +25,10 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import org.junit.AfterClass;
@@ -35,6 +37,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.util.Arrays;
+import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -86,7 +89,9 @@ public class BarrierBufferTest {
 			for (BufferOrEvent boe : sequence) {
 				assertEquals(boe, buffer.getNextNonBlocked());
 			}
-			
+
+			assertEquals(0L, buffer.getAlignmentDurationNanos());
+
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
 			
@@ -120,6 +125,8 @@ public class BarrierBufferTest {
 				assertEquals(boe, buffer.getNextNonBlocked());
 			}
 
+			assertEquals(0L, buffer.getAlignmentDurationNanos());
+
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
 
@@ -222,13 +229,15 @@ public class BarrierBufferTest {
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
 			buffer.registerCheckpointEventHandler(handler);
 			handler.setNextExpectedCheckpointId(1L);
-			
+
 			// pre checkpoint 1
 			check(sequence[0], buffer.getNextNonBlocked());
 			check(sequence[1], buffer.getNextNonBlocked());
 			check(sequence[2], buffer.getNextNonBlocked());
 			assertEquals(1L, handler.getNextExpectedCheckpointId());
 
+			long startTs = System.nanoTime();
+
 			// blocking while aligning for checkpoint 1
 			check(sequence[7], buffer.getNextNonBlocked());
 			assertEquals(1L, handler.getNextExpectedCheckpointId());
@@ -236,6 +245,8 @@ public class BarrierBufferTest {
 			// checkpoint 1 done, returning buffered data
 			check(sequence[5], buffer.getNextNonBlocked());
 			assertEquals(2L, handler.getNextExpectedCheckpointId());
+			validateAlignmentTime(startTs, buffer);
+
 			check(sequence[6], buffer.getNextNonBlocked());
 
 			// pre checkpoint 2
@@ -245,10 +256,13 @@ public class BarrierBufferTest {
 			check(sequence[12], buffer.getNextNonBlocked());
 			check(sequence[13], buffer.getNextNonBlocked());
 			assertEquals(2L, handler.getNextExpectedCheckpointId());
-			
+
 			// checkpoint 2 barriers come together
+			startTs = System.nanoTime();
 			check(sequence[17], buffer.getNextNonBlocked());
 			assertEquals(3L, handler.getNextExpectedCheckpointId());
+			validateAlignmentTime(startTs, buffer);
+
 			check(sequence[18], buffer.getNextNonBlocked());
 
 			// checkpoint 3 starts, data buffered
@@ -257,7 +271,7 @@ public class BarrierBufferTest {
 			check(sequence[21], buffer.getNextNonBlocked());
 
 			// checkpoint 4 happens without extra data
-			
+
 			// pre checkpoint 5
 			check(sequence[27], buffer.getNextNonBlocked());
 			assertEquals(5L, handler.getNextExpectedCheckpointId());
@@ -301,7 +315,7 @@ public class BarrierBufferTest {
 			BufferOrEvent[] sequence = {
 					createBuffer(0), createBuffer(1), createBuffer(2),
 					createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
-					
+
 					createBuffer(2), createBuffer(1), createBuffer(0),
 					createBarrier(2, 1),
 					createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
@@ -327,12 +341,14 @@ public class BarrierBufferTest {
 			assertEquals(2L, handler.getNextExpectedCheckpointId());
 			check(sequence[7], buffer.getNextNonBlocked());
 			check(sequence[8], buffer.getNextNonBlocked());
-			
+
 			// checkpoint 2 alignment
+			long startTs = System.nanoTime();
 			check(sequence[13], buffer.getNextNonBlocked());
 			check(sequence[14], buffer.getNextNonBlocked());
 			check(sequence[18], buffer.getNextNonBlocked());
 			check(sequence[19], buffer.getNextNonBlocked());
+			validateAlignmentTime(startTs, buffer);
 
 			// end of stream: remaining buffered contents
 			check(sequence[10], buffer.getNextNonBlocked());
@@ -343,7 +359,7 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
-			
+
 			buffer.cleanup();
 
 			checkNoTempFilesRemain();
@@ -389,7 +405,7 @@ public class BarrierBufferTest {
 					createBarrier(3, 2),
 					createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
 					createBarrier(6, 1),
-					
+
 					// complete checkpoint 4, checkpoint 5 remains not fully triggered
 					createBarrier(4, 2),
 					createBuffer(2),
@@ -419,10 +435,13 @@ public class BarrierBufferTest {
 
 			// alignment of checkpoint 2 - buffering also some barriers for
 			// checkpoints 3 and 4
+			long startTs = System.nanoTime();
 			check(sequence[13], buffer.getNextNonBlocked());
 			check(sequence[20], buffer.getNextNonBlocked());
 			check(sequence[23], buffer.getNextNonBlocked());
-			
+
+			validateAlignmentTime(startTs, buffer);
+
 			// checkpoint 2 completed
 			check(sequence[12], buffer.getNextNonBlocked());
 			check(sequence[25], buffer.getNextNonBlocked());
@@ -613,17 +632,21 @@ public class BarrierBufferTest {
 			check(sequence[19], buffer.getNextNonBlocked());
 			check(sequence[21], buffer.getNextNonBlocked());
 
+			long startTs = System.nanoTime();
+
 			// checkpoint 2 aborted, checkpoint 4 started. replay buffered
 			check(sequence[12], buffer.getNextNonBlocked());
 			assertEquals(4L, buffer.getCurrentCheckpointId());
 			check(sequence[16], buffer.getNextNonBlocked());
 			check(sequence[18], buffer.getNextNonBlocked());
 			check(sequence[22], buffer.getNextNonBlocked());
-			
+
 			// align checkpoint 4 remainder
 			check(sequence[25], buffer.getNextNonBlocked());
 			check(sequence[26], buffer.getNextNonBlocked());
-			
+
+			validateAlignmentTime(startTs, buffer);
+
 			// checkpoint 4 aborted (due to end of partition)
 			check(sequence[24], buffer.getNextNonBlocked());
 			check(sequence[27], buffer.getNextNonBlocked());
@@ -926,12 +949,17 @@ public class BarrierBufferTest {
 			}
 		}
 	}
-	
+
+	private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) {
+		final long elapsed = System.nanoTime() - startTimestamp;
+		assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Testing Mocks
 	// ------------------------------------------------------------------------
 
-	private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier> {
+	private static class ValidatingCheckpointHandler implements StatefulTask {
 		
 		private long nextExpectedCheckpointId = -1L;
 
@@ -944,11 +972,33 @@ public class BarrierBufferTest {
 		}
 
 		@Override
-		public void onEvent(CheckpointBarrier barrier) {
-			assertNotNull(barrier);
-			assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == barrier.getId());
-			assertTrue(barrier.getTimestamp() > 0);
+		public void setInitialState(
+				ChainedStateHandle<StreamStateHandle> chainedState,
+				List<KeyGroupsStateHandle> keyGroupsState) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public void triggerCheckpointOnBarrier(
+				long checkpointId, long timestamp,
+				long bytesAligned, long alignmentTimeNanos) throws Exception {
+
+			assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointId);
+			assertTrue(timestamp > 0);
+			assertTrue(bytesAligned >= 0);
+			assertTrue(alignmentTimeNanos >= 0);
+
 			nextExpectedCheckpointId++;
 		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index b9b6e5f..314dcc4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -22,12 +22,16 @@ import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.List;
 
 import static org.junit.Assert.*;
 
@@ -346,22 +350,42 @@ public class BarrierTrackerTest {
 	//  Testing Mocks
 	// ------------------------------------------------------------------------
 	
-	private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier> {
+	private static class CheckpointSequenceValidator implements StatefulTask {
 
 		private final long[] checkpointIDs;
-		
+
 		private int i = 0;
 
 		private CheckpointSequenceValidator(long... checkpointIDs) {
 			this.checkpointIDs = checkpointIDs;
 		}
-		
+
 		@Override
-		public void onEvent(CheckpointBarrier barrier) {
+		public void setInitialState(
+				ChainedStateHandle<StreamStateHandle> chainedState,
+				List<KeyGroupsStateHandle> keyGroupsState) throws Exception {
+
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public void triggerCheckpointOnBarrier(
+				long checkpointId, long timestamp,
+				long bytesAligned, long alignmentTimeNanos) throws Exception {
+
 			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
-			assertNotNull(barrier);
-			assertEquals("wrong checkpoint id", checkpointIDs[i++], barrier.getId());
-			assertTrue(barrier.getTimestamp() > 0);
+			assertEquals("wrong checkpoint id", checkpointIDs[i++], checkpointId);
+			assertTrue(timestamp > 0);
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 7ef0080..88fb383 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -478,27 +478,32 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		List<KeyGroupsStateHandle> getKeyGroupStates() {
 			List<KeyGroupsStateHandle> result = new ArrayList<>();
-			for (int i = 0; i < keyGroupStates.size(); i++) {
-				if (keyGroupStates.get(i) != null) {
-					result.add(keyGroupStates.get(i));
+			for (KeyGroupsStateHandle keyGroupState : keyGroupStates) {
+				if (keyGroupState != null) {
+					result.add(keyGroupState);
 				}
 			}
 			return result;
 		}
 
-		AcknowledgeStreamMockEnvironment(Configuration jobConfig, Configuration taskConfig,
-		                                 ExecutionConfig executionConfig, long memorySize,
-		                                 MockInputSplitProvider inputSplitProvider, int bufferSize) {
+		AcknowledgeStreamMockEnvironment(
+				Configuration jobConfig, Configuration taskConfig,
+				ExecutionConfig executionConfig, long memorySize,
+				MockInputSplitProvider inputSplitProvider, int bufferSize) {
 			super(jobConfig, taskConfig, executionConfig, memorySize, inputSplitProvider, bufferSize);
 		}
 
 
 		@Override
-		public void acknowledgeCheckpoint(long checkpointId, ChainedStateHandle<StreamStateHandle> state,
-		                                  List<KeyGroupsStateHandle> keyGroupStates) {
+		public void acknowledgeCheckpoint(
+				long checkpointId,
+				ChainedStateHandle<StreamStateHandle> state, List<KeyGroupsStateHandle> keyGroupStates,
+				long syncDuration, long asymcDuration, long alignmentByte, long alignmentDuration) {
+
 			this.checkpointId = checkpointId;
 			this.state = state;
 			this.keyGroupStates = keyGroupStates;
+
 			checkpointLatch.trigger();
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 0901b32..2036f69 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -308,14 +308,18 @@ public class StreamMockEnvironment implements Environment {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId) {
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			long synchronousDurationMillis, long asynchronousDurationMillis,
+			long bytesBufferedInAlignment, long alignmentDurationNanos) {
 	}
 
 	@Override
-	public void acknowledgeCheckpoint(long checkpointId,
-			ChainedStateHandle<StreamStateHandle> chainedStateHandle,
-			List<KeyGroupsStateHandle> keyGroupStateHandles) {
-
+	public void acknowledgeCheckpoint(
+			long checkpointId,
+			ChainedStateHandle<StreamStateHandle> chainedStateHandle, List<KeyGroupsStateHandle> keyGroupStateHandles,
+			long synchronousDurationMillis, long asynchronousDurationMillis,
+			long bytesBufferedInAlignment, long alignmentDurationNanos) {
 	}
 
 	@Override


Mime
View raw message