flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [1/4] flink git commit: [FLINK-4984] [checkpointing] Add Cancellation Barriers as a way to signal aborted checkpoints
Date Tue, 08 Nov 2016 20:25:07 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.1 4dd3efea4 -> 0962cb6f4


http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
index d4fdc59..cf1f98e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
@@ -18,16 +18,18 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.state.StateHandle;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -35,6 +37,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.util.Arrays;
+import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -42,15 +45,23 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
 /**
  * Tests for the behavior of the {@link BarrierBuffer}.
  */
 public class BarrierBufferTest {
 
+	private static final Random RND = new Random();
+
 	private static final int PAGE_SIZE = 512;
-	
+
 	private static int SIZE_COUNTER = 0;
-	
+
 	private static IOManager IO_MANAGER;
 
 	@BeforeClass
@@ -86,7 +97,9 @@ public class BarrierBufferTest {
 			for (BufferOrEvent boe : sequence) {
 				assertEquals(boe, buffer.getNextNonBlocked());
 			}
-			
+
+			assertEquals(0L, buffer.getAlignmentDurationNanos());
+
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
 			
@@ -120,6 +133,8 @@ public class BarrierBufferTest {
 				assertEquals(boe, buffer.getNextNonBlocked());
 			}
 
+			assertEquals(0L, buffer.getAlignmentDurationNanos());
+
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
 
@@ -222,13 +237,15 @@ public class BarrierBufferTest {
 			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
 			buffer.registerCheckpointEventHandler(handler);
 			handler.setNextExpectedCheckpointId(1L);
-			
+
 			// pre checkpoint 1
 			check(sequence[0], buffer.getNextNonBlocked());
 			check(sequence[1], buffer.getNextNonBlocked());
 			check(sequence[2], buffer.getNextNonBlocked());
 			assertEquals(1L, handler.getNextExpectedCheckpointId());
 
+			long startTs = System.nanoTime();
+
 			// blocking while aligning for checkpoint 1
 			check(sequence[7], buffer.getNextNonBlocked());
 			assertEquals(1L, handler.getNextExpectedCheckpointId());
@@ -236,6 +253,8 @@ public class BarrierBufferTest {
 			// checkpoint 1 done, returning buffered data
 			check(sequence[5], buffer.getNextNonBlocked());
 			assertEquals(2L, handler.getNextExpectedCheckpointId());
+			validateAlignmentTime(startTs, buffer);
+
 			check(sequence[6], buffer.getNextNonBlocked());
 
 			// pre checkpoint 2
@@ -245,10 +264,13 @@ public class BarrierBufferTest {
 			check(sequence[12], buffer.getNextNonBlocked());
 			check(sequence[13], buffer.getNextNonBlocked());
 			assertEquals(2L, handler.getNextExpectedCheckpointId());
-			
+
 			// checkpoint 2 barriers come together
+			startTs = System.nanoTime();
 			check(sequence[17], buffer.getNextNonBlocked());
 			assertEquals(3L, handler.getNextExpectedCheckpointId());
+			validateAlignmentTime(startTs, buffer);
+
 			check(sequence[18], buffer.getNextNonBlocked());
 
 			// checkpoint 3 starts, data buffered
@@ -257,7 +279,7 @@ public class BarrierBufferTest {
 			check(sequence[21], buffer.getNextNonBlocked());
 
 			// checkpoint 4 happens without extra data
-			
+
 			// pre checkpoint 5
 			check(sequence[27], buffer.getNextNonBlocked());
 			assertEquals(5L, handler.getNextExpectedCheckpointId());
@@ -301,7 +323,7 @@ public class BarrierBufferTest {
 			BufferOrEvent[] sequence = {
 					createBuffer(0), createBuffer(1), createBuffer(2),
 					createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0),
-					
+
 					createBuffer(2), createBuffer(1), createBuffer(0),
 					createBarrier(2, 1),
 					createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2),
@@ -327,12 +349,14 @@ public class BarrierBufferTest {
 			assertEquals(2L, handler.getNextExpectedCheckpointId());
 			check(sequence[7], buffer.getNextNonBlocked());
 			check(sequence[8], buffer.getNextNonBlocked());
-			
+
 			// checkpoint 2 alignment
+			long startTs = System.nanoTime();
 			check(sequence[13], buffer.getNextNonBlocked());
 			check(sequence[14], buffer.getNextNonBlocked());
 			check(sequence[18], buffer.getNextNonBlocked());
 			check(sequence[19], buffer.getNextNonBlocked());
+			validateAlignmentTime(startTs, buffer);
 
 			// end of stream: remaining buffered contents
 			check(sequence[10], buffer.getNextNonBlocked());
@@ -343,7 +367,7 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
-			
+
 			buffer.cleanup();
 
 			checkNoTempFilesRemain();
@@ -389,7 +413,7 @@ public class BarrierBufferTest {
 					createBarrier(3, 2),
 					createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0),
 					createBarrier(6, 1),
-					
+
 					// complete checkpoint 4, checkpoint 5 remains not fully triggered
 					createBarrier(4, 2),
 					createBuffer(2),
@@ -419,12 +443,14 @@ public class BarrierBufferTest {
 
 			// alignment of checkpoint 2 - buffering also some barriers for
 			// checkpoints 3 and 4
+			long startTs = System.nanoTime();
 			check(sequence[13], buffer.getNextNonBlocked());
 			check(sequence[20], buffer.getNextNonBlocked());
 			check(sequence[23], buffer.getNextNonBlocked());
-			
+
 			// checkpoint 2 completed
 			check(sequence[12], buffer.getNextNonBlocked());
+			validateAlignmentTime(startTs, buffer);
 			check(sequence[25], buffer.getNextNonBlocked());
 			check(sequence[27], buffer.getNextNonBlocked());
 			check(sequence[30], buffer.getNextNonBlocked());
@@ -507,36 +533,53 @@ public class BarrierBufferTest {
 			MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
 			BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
 
-			ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler();
-			buffer.registerCheckpointEventHandler(handler);
-			handler.setNextExpectedCheckpointId(1L);
+			StatefulTask<?> toNotify = mock(StatefulTask.class);
+			buffer.registerCheckpointEventHandler(toNotify);
 
-			// checkpoint 1
+			long startTs;
+
+			// initial data
 			check(sequence[0], buffer.getNextNonBlocked());
 			check(sequence[1], buffer.getNextNonBlocked());
 			check(sequence[2], buffer.getNextNonBlocked());
+
+			// align checkpoint 1
+			startTs = System.nanoTime();
 			check(sequence[7], buffer.getNextNonBlocked());
 			assertEquals(1L, buffer.getCurrentCheckpointId());
-			
+
+			// checkpoint done - replay buffered
 			check(sequence[5], buffer.getNextNonBlocked());
+			validateAlignmentTime(startTs, buffer);
+			verify(toNotify).triggerCheckpointOnBarrier(eq(1L), anyLong());
 			check(sequence[6], buffer.getNextNonBlocked());
+
 			check(sequence[9], buffer.getNextNonBlocked());
 			check(sequence[10], buffer.getNextNonBlocked());
 
 			// alignment of checkpoint 2
+			startTs = System.nanoTime();
 			check(sequence[13], buffer.getNextNonBlocked());
-			assertEquals(2L, buffer.getCurrentCheckpointId());
 			check(sequence[15], buffer.getNextNonBlocked());
 
 			// checkpoint 2 aborted, checkpoint 3 started
 			check(sequence[12], buffer.getNextNonBlocked());
 			assertEquals(3L, buffer.getCurrentCheckpointId());
+			validateAlignmentTime(startTs, buffer);
+			verify(toNotify).abortCheckpointOnBarrier(2L);
 			check(sequence[16], buffer.getNextNonBlocked());
+
+			// checkpoint 3 alignment in progress
 			check(sequence[19], buffer.getNextNonBlocked());
-			check(sequence[20], buffer.getNextNonBlocked());
-			
+
 			// checkpoint 3 aborted (end of partition)
+			check(sequence[20], buffer.getNextNonBlocked());
+			verify(toNotify).abortCheckpointOnBarrier(3L);
+
+			// replay buffered data from checkpoint 3
 			check(sequence[18], buffer.getNextNonBlocked());
+
+			// all the remaining messages
 			check(sequence[21], buffer.getNextNonBlocked());
 			check(sequence[22], buffer.getNextNonBlocked());
 			check(sequence[23], buffer.getNextNonBlocked());
@@ -613,17 +656,21 @@ public class BarrierBufferTest {
 			check(sequence[19], buffer.getNextNonBlocked());
 			check(sequence[21], buffer.getNextNonBlocked());
 
+			long startTs = System.nanoTime();
+
 			// checkpoint 2 aborted, checkpoint 4 started. replay buffered
 			check(sequence[12], buffer.getNextNonBlocked());
 			assertEquals(4L, buffer.getCurrentCheckpointId());
 			check(sequence[16], buffer.getNextNonBlocked());
 			check(sequence[18], buffer.getNextNonBlocked());
 			check(sequence[22], buffer.getNextNonBlocked());
-			
+
 			// align checkpoint 4 remainder
 			check(sequence[25], buffer.getNextNonBlocked());
 			check(sequence[26], buffer.getNextNonBlocked());
-			
+
+			validateAlignmentTime(startTs, buffer);
+
 			// checkpoint 4 aborted (due to end of partition)
 			check(sequence[24], buffer.getNextNonBlocked());
 			check(sequence[27], buffer.getNextNonBlocked());
@@ -862,9 +909,9 @@ public class BarrierBufferTest {
 
 			assertNull(buffer.getNextNonBlocked());
 			assertNull(buffer.getNextNonBlocked());
-			
+
 			buffer.cleanup();
-			
+
 			checkNoTempFilesRemain();
 		}
 		catch (Exception e) {
@@ -874,26 +921,480 @@ public class BarrierBufferTest {
 	}
 
 	@Test
-	public void testEndOfStreamWhileCheckpoint() {
+	public void testEndOfStreamWhileCheckpoint() throws Exception {
+		BufferOrEvent[] sequence = {
+				// one checkpoint
+				createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2),
+
+				// some buffers
+				createBuffer(0), createBuffer(0), createBuffer(2),
+
+				// start the checkpoint that will be incomplete
+				createBarrier(2, 2), createBarrier(2, 0),
+				createBuffer(0), createBuffer(2), createBuffer(1),
+
+				// close one after the barrier one before the barrier
+				createEndOfPartition(2), createEndOfPartition(1),
+				createBuffer(0),
+
+				// final end of stream
+				createEndOfPartition(0)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+		// data after first checkpoint
+		check(sequence[3], buffer.getNextNonBlocked());
+		check(sequence[4], buffer.getNextNonBlocked());
+		check(sequence[5], buffer.getNextNonBlocked());
+		assertEquals(1L, buffer.getCurrentCheckpointId());
+
+		// alignment of second checkpoint
+		check(sequence[10], buffer.getNextNonBlocked());
+		assertEquals(2L, buffer.getCurrentCheckpointId());
+
+		// first end-of-partition encountered: checkpoint will not be completed
+		check(sequence[12], buffer.getNextNonBlocked());
+		check(sequence[8], buffer.getNextNonBlocked());
+		check(sequence[9], buffer.getNextNonBlocked());
+		check(sequence[11], buffer.getNextNonBlocked());
+		check(sequence[13], buffer.getNextNonBlocked());
+		check(sequence[14], buffer.getNextNonBlocked());
+
+		// all done
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+
+		checkNoTempFilesRemain();
+	}
+
+	@Test
+	public void testSingleChannelAbortCheckpoint() throws Exception {
+		BufferOrEvent[] sequence = {
+				createBuffer(0),
+				createBarrier(1, 0),
+				createBuffer(0),
+				createBarrier(2, 0),
+				createCancellationBarrier(4, 0),
+				createBarrier(5, 0),
+				createBuffer(0),
+				createCancellationBarrier(6, 0),
+				createBuffer(0)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		check(sequence[0], buffer.getNextNonBlocked());
+		check(sequence[2], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), anyLong());
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+		check(sequence[6], buffer.getNextNonBlocked());
+		assertEquals(5L, buffer.getCurrentCheckpointId());
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong());
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+		check(sequence[8], buffer.getNextNonBlocked());
+		assertEquals(6L, buffer.getCurrentCheckpointId());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
 		
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+	}
+
+	@Test
+	public void testMultiChannelAbortCheckpoint() throws Exception {
+		BufferOrEvent[] sequence = {
+				// some buffers and a successful checkpoint
+				/* 0 */ createBuffer(0), createBuffer(2), createBuffer(0),
+				/* 3 */ createBarrier(1, 1), createBarrier(1, 2),
+				/* 5 */ createBuffer(2), createBuffer(1),
+				/* 7 */ createBarrier(1, 0),
+				/* 8 */ createBuffer(0), createBuffer(2),
+
+				// aborted on last barrier
+				/* 10 */ createBarrier(2, 0), createBarrier(2, 2),
+				/* 12 */ createBuffer(0), createBuffer(2),
+				/* 14 */ createCancellationBarrier(2, 1),
+
+				// successful checkpoint
+				/* 15 */ createBuffer(2), createBuffer(1),
+				/* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0),
+
+				// abort on first barrier
+				/* 20 */ createBuffer(0), createBuffer(1),
+				/* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2),
+				/* 24 */ createBuffer(0),
+				/* 25 */ createBarrier(4, 0),
+
+				// another successful checkpoint
+				/* 26 */ createBuffer(0), createBuffer(1), createBuffer(2),
+				/* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0),
+				/* 32 */ createBuffer(0), createBuffer(1),
+
+				// abort multiple cancellations and a barrier after the cancellations
+				/* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2),
+				/* 36 */ createBarrier(6, 0),
+
+				/* 37 */ createBuffer(0)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		long startTs;
+
+		// successful first checkpoint, with some aligned buffers
+		check(sequence[0], buffer.getNextNonBlocked());
+		check(sequence[1], buffer.getNextNonBlocked());
+		check(sequence[2], buffer.getNextNonBlocked());
+		startTs = System.nanoTime();
+		check(sequence[5], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), anyLong());
+		validateAlignmentTime(startTs, buffer);
+
+		check(sequence[6], buffer.getNextNonBlocked());
+		check(sequence[8], buffer.getNextNonBlocked());
+		check(sequence[9], buffer.getNextNonBlocked());
+
+		// canceled checkpoint on last barrier
+		startTs = System.nanoTime();
+		check(sequence[12], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+		validateAlignmentTime(startTs, buffer);
+		check(sequence[13], buffer.getNextNonBlocked());
+
+		// one more successful checkpoint
+		check(sequence[15], buffer.getNextNonBlocked());
+		check(sequence[16], buffer.getNextNonBlocked());
+		startTs = System.nanoTime();
+		check(sequence[20], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(3L), anyLong());
+		validateAlignmentTime(startTs, buffer);
+		check(sequence[21], buffer.getNextNonBlocked());
+
+		// this checkpoint gets immediately canceled
+		check(sequence[24], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+		// some buffers
+		check(sequence[26], buffer.getNextNonBlocked());
+		check(sequence[27], buffer.getNextNonBlocked());
+		check(sequence[28], buffer.getNextNonBlocked());
+
+		// a simple successful checkpoint
+		startTs = System.nanoTime();
+		check(sequence[32], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong());
+		validateAlignmentTime(startTs, buffer);
+		check(sequence[33], buffer.getNextNonBlocked());
+
+		check(sequence[37], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+		// all done
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+	}
+
+	@Test
+	public void testAbortViaQueuedBarriers() throws Exception {
+		BufferOrEvent[] sequence = {
+				// starting a checkpoint
+				/* 0 */ createBuffer(1),
+				/* 1 */ createBarrier(1, 1), createBarrier(1, 2),
+				/* 3 */ createBuffer(2), createBuffer(0), createBuffer(1),
+
+				// queued barrier and cancellation barrier
+				/* 6 */ createCancellationBarrier(2, 2),
+				/* 7 */ createBarrier(2, 1),
+
+				// some intermediate buffers (some queued)
+				/* 8 */ createBuffer(0), createBuffer(1), createBuffer(2),
+
+				// complete initial checkpoint
+				/* 11 */ createBarrier(1, 0),
+
+				// some buffers (none queued, since checkpoint is aborted)
+				/* 12 */ createBuffer(2), createBuffer(1), createBuffer(0),
+
+				// final barrier of aborted checkpoint
+				/* 15 */ createBarrier(2, 0),
+
+				// some more buffers
+				/* 16 */ createBuffer(0), createBuffer(1), createBuffer(2)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		long startTs;
+
+		check(sequence[0], buffer.getNextNonBlocked());
+
+		// starting first checkpoint
+		startTs = System.nanoTime();
+		check(sequence[4], buffer.getNextNonBlocked());
+		check(sequence[8], buffer.getNextNonBlocked());
+
+		// finished first checkpoint
+		check(sequence[3], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(1L), anyLong());
+		validateAlignmentTime(startTs, buffer);
+
+		check(sequence[5], buffer.getNextNonBlocked());
+
+		// re-read the queued cancellation barriers
+		check(sequence[9], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+		check(sequence[10], buffer.getNextNonBlocked());
+		check(sequence[12], buffer.getNextNonBlocked());
+		check(sequence[13], buffer.getNextNonBlocked());
+		check(sequence[14], buffer.getNextNonBlocked());
+
+		check(sequence[16], buffer.getNextNonBlocked());
+		check(sequence[17], buffer.getNextNonBlocked());
+		check(sequence[18], buffer.getNextNonBlocked());
+
+		// no further alignment should have happened
+		assertEquals(0L, buffer.getAlignmentDurationNanos());
+
+		// no further checkpoint (abort) notifications
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+
+		// all done
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+	}
+
+	/**
+	 * This tests the where a replay of queued checkpoint barriers meets
+	 * a canceled checkpoint.
+	 *
+	 * The replayed newer checkpoint barrier must not try to cancel the
+	 * already canceled checkpoint.
+	 */
+	@Test
+	public void testAbortWhileHavingQueuedBarriers() throws Exception {
+		BufferOrEvent[] sequence = {
+				// starting a checkpoint
+				/*  0 */ createBuffer(1),
+				/*  1 */ createBarrier(1, 1),
+				/*  2 */ createBuffer(2), createBuffer(0), createBuffer(1),
+
+				// queued barrier and cancellation barrier
+				/*  5 */ createBarrier(2, 1),
+
+				// some queued buffers
+				/*  6 */ createBuffer(2), createBuffer(1),
+
+				// cancel the initial checkpoint
+				/*  8 */ createCancellationBarrier(1, 0),
+
+				// some more buffers
+				/*  9 */ createBuffer(2), createBuffer(1), createBuffer(0),
+
+				// ignored barrier - already canceled and moved to next checkpoint
+				/* 12 */ createBarrier(1, 2),
+
+				// some more buffers
+				/* 13 */ createBuffer(0), createBuffer(1), createBuffer(2),
+
+				// complete next checkpoint regularly
+				/* 16 */ createBarrier(2, 0), createBarrier(2, 2),
+
+				// some more buffers
+				/* 18 */ createBuffer(0), createBuffer(1), createBuffer(2)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		long startTs;
+
+		check(sequence[0], buffer.getNextNonBlocked());
+
+		// starting first checkpoint
+		startTs = System.nanoTime();
+		check(sequence[2], buffer.getNextNonBlocked());
+		check(sequence[3], buffer.getNextNonBlocked());
+		check(sequence[6], buffer.getNextNonBlocked());
+
+		// cancelled by cancellation barrier
+		check(sequence[4], buffer.getNextNonBlocked());
+		validateAlignmentTime(startTs, buffer);
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
+
+		// the next checkpoint alignment starts now
+		startTs = System.nanoTime();
+		check(sequence[9], buffer.getNextNonBlocked());
+		check(sequence[11], buffer.getNextNonBlocked());
+		check(sequence[13], buffer.getNextNonBlocked());
+		check(sequence[15], buffer.getNextNonBlocked());
+
+		// checkpoint done
+		check(sequence[7], buffer.getNextNonBlocked());
+		validateAlignmentTime(startTs, buffer);
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(2L), anyLong());
+
+		// queued data
+		check(sequence[10], buffer.getNextNonBlocked());
+		check(sequence[14], buffer.getNextNonBlocked());
+
+		// trailing data
+		check(sequence[18], buffer.getNextNonBlocked());
+		check(sequence[19], buffer.getNextNonBlocked());
+		check(sequence[20], buffer.getNextNonBlocked());
+
+		// all done
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+
+		// check overall notifications
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
+	}
+
+	/**
+	 * This tests the where a cancellation barrier is received for a checkpoint already
+	 * canceled due to receiving a newer checkpoint barrier.
+	 */
+	@Test
+	public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception {
+		BufferOrEvent[] sequence = {
+				// starting a checkpoint
+				/*  0 */ createBuffer(2),
+				/*  1 */ createBarrier(3, 1), createBarrier(3, 0),
+				/*  3 */ createBuffer(0), createBuffer(1), createBuffer(2),
+
+				// newer checkpoint barrier cancels/subsumes pending checkpoint
+				/*  6 */ createBarrier(5, 2),
+
+				// some queued buffers
+				/*  7 */ createBuffer(2), createBuffer(1), createBuffer(0),
+
+				// cancel barrier the initial checkpoint /it is already canceled)
+				/* 10 */ createCancellationBarrier(3, 2),
+
+				// some more buffers
+				/* 11 */ createBuffer(2), createBuffer(0), createBuffer(1),
+
+				// complete next checkpoint regularly
+				/* 14 */ createBarrier(5, 0), createBarrier(5, 1),
+
+				// some more buffers
+				/* 16 */ createBuffer(0), createBuffer(1), createBuffer(2)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
+
+		StatefulTask<?> toNotify = mock(StatefulTask.class);
+		buffer.registerCheckpointEventHandler(toNotify);
+
+		long startTs;
+
+		// validate the sequence
+
+		check(sequence[0], buffer.getNextNonBlocked());
+
+		// beginning of first checkpoint
+		check(sequence[5], buffer.getNextNonBlocked());
+
+		// future barrier aborts checkpoint
+		startTs = System.nanoTime();
+		check(sequence[3], buffer.getNextNonBlocked());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(3L);
+		check(sequence[4], buffer.getNextNonBlocked());
+
+		// alignment of next checkpoint
+		check(sequence[8], buffer.getNextNonBlocked());
+		check(sequence[9], buffer.getNextNonBlocked());
+		check(sequence[12], buffer.getNextNonBlocked());
+		check(sequence[13], buffer.getNextNonBlocked());
+
+		// checkpoint finished
+		check(sequence[7], buffer.getNextNonBlocked());
+		validateAlignmentTime(startTs, buffer);
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(eq(5L), anyLong());
+		check(sequence[11], buffer.getNextNonBlocked());
+
+		// remaining data
+		check(sequence[16], buffer.getNextNonBlocked());
+		check(sequence[17], buffer.getNextNonBlocked());
+		check(sequence[18], buffer.getNextNonBlocked());
+
+		// all done
+		assertNull(buffer.getNextNonBlocked());
+		assertNull(buffer.getNextNonBlocked());
+
+		buffer.cleanup();
+		checkNoTempFilesRemain();
+
+		// check overall notifications
+		verify(toNotify, times(1)).triggerCheckpointOnBarrier(anyLong(), anyLong());
+		verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
 	}
 
 	// ------------------------------------------------------------------------
 	//  Utils
 	// ------------------------------------------------------------------------
 
-	private static BufferOrEvent createBarrier(long id, int channel) {
-		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
+	private static BufferOrEvent createBarrier(long checkpointId, int channel) {
+		return new BufferOrEvent(new CheckpointBarrier(checkpointId, System.currentTimeMillis()),
channel);
+	}
+
+	private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) {
+		return new BufferOrEvent(new CancelCheckpointMarker(checkpointId), channel);
 	}
 
 	private static BufferOrEvent createBuffer(int channel) {
-		// since we have no access to the contents, we need to use the size as an
-		// identifier to validate correctness here
-		Buffer buf = new Buffer(
-				MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE),
-				FreeingBufferRecycler.INSTANCE);
-		
-		buf.setSize(SIZE_COUNTER++);
+		final int size = SIZE_COUNTER++;
+		byte[] bytes = new byte[size];
+		RND.nextBytes(bytes);
+
+		MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE);
+		memory.put(0, bytes);
+
+		Buffer buf = new Buffer(memory, FreeingBufferRecycler.INSTANCE);
+		buf.setSize(size);
+
+		// retain an additional time so it does not get disposed after being read by the input
gate
+		buf.retain();
+
 		return new BufferOrEvent(buf, channel);
 	}
 
@@ -907,15 +1408,16 @@ public class BarrierBufferTest {
 		assertEquals(expected.isBuffer(), present.isBuffer());
 		
 		if (expected.isBuffer()) {
-			// since we have no access to the contents, we need to use the size as an
-			// identifier to validate correctness here
 			assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize());
+			MemorySegment expectedMem = expected.getBuffer().getMemorySegment();
+			MemorySegment presentMem = present.getBuffer().getMemorySegment();
+			assertTrue("memory contents differs", expectedMem.compare(presentMem, 0, 0, PAGE_SIZE)
== 0);
 		}
 		else {
 			assertEquals(expected.getEvent(), present.getEvent());
 		}
 	}
-	
+
 	private static void checkNoTempFilesRemain() {
 		// validate that all temp files have been removed
 		for (File dir : IO_MANAGER.getSpillingDirectories()) {
@@ -926,12 +1428,17 @@ public class BarrierBufferTest {
 			}
 		}
 	}
-	
+
+	private static void validateAlignmentTime(long startTimestamp, BarrierBuffer buffer) {
+		final long elapsed = System.nanoTime() - startTimestamp;
+		assertTrue("wrong alignment time", buffer.getAlignmentDurationNanos() <= elapsed);
+	}
+
 	// ------------------------------------------------------------------------
 	//  Testing Mocks
 	// ------------------------------------------------------------------------
 
-	private static class ValidatingCheckpointHandler implements EventListener<CheckpointBarrier>
{
+	private static class ValidatingCheckpointHandler implements StatefulTask<StateHandle<Object>>
{
 		
 		private long nextExpectedCheckpointId = -1L;
 
@@ -944,11 +1451,31 @@ public class BarrierBufferTest {
 		}
 
 		@Override
-		public void onEvent(CheckpointBarrier barrier) {
-			assertNotNull(barrier);
-			assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || nextExpectedCheckpointId
== barrier.getId());
-			assertTrue(barrier.getTimestamp() > 0);
+		public void setInitialState(StateHandle<Object> stateHandle) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception
{
+			assertTrue("wrong checkpoint id",
+					nextExpectedCheckpointId == -1L || nextExpectedCheckpointId == checkpointId);
+
+			assertTrue(timestamp > 0);
+
 			nextExpectedCheckpointId++;
 		}
+
+		@Override
+		public void abortCheckpointOnBarrier(long checkpointId) {}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1f028de/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index b9b6e5f..903f585 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -19,25 +19,30 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.util.event.EventListener;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 
+import org.apache.flink.runtime.state.StateHandle;
 import org.junit.Test;
 
 import java.util.Arrays;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for the behavior of the barrier tracker.
  */
 public class BarrierTrackerTest {
-	
+
 	private static final int PAGE_SIZE = 512;
-	
+
 	@Test
 	public void testSingleChannelNoBarriers() {
 		try {
@@ -329,6 +334,98 @@ public class BarrierTrackerTest {
 		}
 	}
 
+	@Test
+	public void testSingleChannelAbortCheckpoint() throws Exception {
+		BufferOrEvent[] sequence = {
+				createBuffer(0),
+				createBarrier(1, 0),
+				createBuffer(0),
+				createBarrier(2, 0),
+				createCancellationBarrier(4, 0),
+				createBarrier(5, 0),
+				createBuffer(0),
+				createCancellationBarrier(6, 0),
+				createBuffer(0)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
+		BarrierTracker tracker = new BarrierTracker(gate);
+
+		// negative values mean an expected cancellation call!
+		CheckpointSequenceValidator validator =
+				new CheckpointSequenceValidator(1, 2, -4, 5, -6);
+		tracker.registerCheckpointEventHandler(validator);
+
+		for (BufferOrEvent boe : sequence) {
+			if (boe.isBuffer()) {
+				assertEquals(boe, tracker.getNextNonBlocked());
+			}
+			assertTrue(tracker.isEmpty());
+		}
+
+		assertNull(tracker.getNextNonBlocked());
+		assertNull(tracker.getNextNonBlocked());
+	}
+
+	@Test
+	public void testMultiChannelAbortCheckpoint() throws Exception {
+		BufferOrEvent[] sequence = {
+				// some buffers and a successful checkpoint
+				createBuffer(0), createBuffer(2), createBuffer(0),
+				createBarrier(1, 1), createBarrier(1, 2),
+				createBuffer(2), createBuffer(1),
+				createBarrier(1, 0),
+
+				// aborted on last barrier
+				createBuffer(0), createBuffer(2),
+				createBarrier(2, 0), createBarrier(2, 2),
+				createBuffer(0), createBuffer(2),
+				createCancellationBarrier(2, 1),
+
+				// successful checkpoint
+				createBuffer(2), createBuffer(1),
+				createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0),
+
+				// abort on first barrier
+				createBuffer(0), createBuffer(1),
+				createCancellationBarrier(4, 1), createBarrier(4, 2),
+				createBuffer(0),
+				createBarrier(4, 0),
+
+				// another successful checkpoint
+				createBuffer(0), createBuffer(1), createBuffer(2),
+				createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0),
+
+				// abort multiple cancellations and a barrier after the cancellations 
+				createBuffer(0), createBuffer(1),
+				createCancellationBarrier(6, 1), createCancellationBarrier(6, 2),
+				createBarrier(6, 0),
+
+				createBuffer(0)
+		};
+
+		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
+		BarrierTracker tracker = new BarrierTracker(gate);
+
+		// negative values mean an expected cancellation call!
+		CheckpointSequenceValidator validator =
+				new CheckpointSequenceValidator(1, -2, 3, -4, 5, -6);
+		tracker.registerCheckpointEventHandler(validator);
+
+		for (BufferOrEvent boe : sequence) {
+			if (boe.isBuffer()) {
+				assertEquals(boe, tracker.getNextNonBlocked());
+			}
+		}
+
+		assertTrue(tracker.isEmpty());
+
+		assertNull(tracker.getNextNonBlocked());
+		assertNull(tracker.getNextNonBlocked());
+
+		assertTrue(tracker.isEmpty());
+	}
+	
 	// ------------------------------------------------------------------------
 	//  Utils
 	// ------------------------------------------------------------------------
@@ -337,6 +434,10 @@ public class BarrierTrackerTest {
 		return new BufferOrEvent(new CheckpointBarrier(id, System.currentTimeMillis()), channel);
 	}
 
+	private static BufferOrEvent createCancellationBarrier(long id, int channel) {
+		return new BufferOrEvent(new CancelCheckpointMarker(id), channel);
+	}
+
 	private static BufferOrEvent createBuffer(int channel) {
 		return new BufferOrEvent(
 				new Buffer(MemorySegmentFactory.wrap(new byte[]{1, 2}), FreeingBufferRecycler.INSTANCE),
channel);
@@ -346,22 +447,54 @@ public class BarrierTrackerTest {
 	//  Testing Mocks
 	// ------------------------------------------------------------------------
 	
-	private static class CheckpointSequenceValidator implements EventListener<CheckpointBarrier>
{
+	private static class CheckpointSequenceValidator implements StatefulTask<StateHandle<Object>>
{
 
 		private final long[] checkpointIDs;
-		
+
 		private int i = 0;
 
 		private CheckpointSequenceValidator(long... checkpointIDs) {
 			this.checkpointIDs = checkpointIDs;
 		}
-		
+
 		@Override
-		public void onEvent(CheckpointBarrier barrier) {
+		public void setInitialState(StateHandle<Object> state) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public boolean triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
+		}
+
+		@Override
+		public void triggerCheckpointOnBarrier(long checkpointId, long timestamp) throws Exception
{
 			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
-			assertNotNull(barrier);
-			assertEquals("wrong checkpoint id", checkpointIDs[i++], barrier.getId());
-			assertTrue(barrier.getTimestamp() > 0);
+
+			final long expectedId = checkpointIDs[i++];
+			if (expectedId >= 0) {
+				assertEquals("wrong checkpoint id", expectedId, checkpointId);
+				assertTrue(timestamp > 0);
+			} else {
+				fail("got 'triggerCheckpointOnBarrier()' when expecting an 'abortCheckpointOnBarrier()'");
+			}
+		}
+
+		@Override
+		public void abortCheckpointOnBarrier(long checkpointId) {
+			assertTrue("More checkpoints than expected", i < checkpointIDs.length);
+
+			final long expectedId = checkpointIDs[i++];
+			if (expectedId < 0) {
+				assertEquals("wrong checkpoint id for checkoint abort", -expectedId, checkpointId);
+			} else {
+				fail("got 'abortCheckpointOnBarrier()' when expecting an 'triggerCheckpointOnBarrier()'");
+			}
+		}
+
+		@Override
+		public void notifyCheckpointComplete(long checkpointId) throws Exception {
+			throw new UnsupportedOperationException("should never be called");
 		}
 	}
 }


Mime
View raw message