flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4975) Add a limit for how much data may be buffered during checkpoint alignment
Date Tue, 08 Nov 2016 16:44:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15648061#comment-15648061
] 

ASF GitHub Bot commented on FLINK-4975:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2754#discussion_r87030844
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
---
    @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() {
     	}
     
     	@Test
    -	public void testEndOfStreamWhileCheckpoint() {
    +	public void testEndOfStreamWhileCheckpoint() throws Exception {
    +		BufferOrEvent[] sequence = {
    +				// one checkpoint
    +				createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2),
    +
    +				// some buffers
    +				createBuffer(0), createBuffer(0), createBuffer(2),
    +
    +				// start the checkpoint that will be incomplete
    +				createBarrier(2, 2), createBarrier(2, 0),
    +				createBuffer(0), createBuffer(2), createBuffer(1),
    +
    +				// close one after the barrier one before the barrier
    +				createEndOfPartition(2), createEndOfPartition(1),
    +				createBuffer(0),
    +
    +				// final end of stream
    +				createEndOfPartition(0)
    +		};
    +
    +		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
    +		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +		// data after first checkpoint
    +		check(sequence[3], buffer.getNextNonBlocked());
    +		check(sequence[4], buffer.getNextNonBlocked());
    +		check(sequence[5], buffer.getNextNonBlocked());
    +		assertEquals(1L, buffer.getCurrentCheckpointId());
    +
    +		// alignment of second checkpoint
    +		check(sequence[10], buffer.getNextNonBlocked());
    +		assertEquals(2L, buffer.getCurrentCheckpointId());
    +
    +		// first end-of-partition encountered: checkpoint will not be completed
    +		check(sequence[12], buffer.getNextNonBlocked());
    +		check(sequence[8], buffer.getNextNonBlocked());
    +		check(sequence[9], buffer.getNextNonBlocked());
    +		check(sequence[11], buffer.getNextNonBlocked());
    +		check(sequence[13], buffer.getNextNonBlocked());
    +		check(sequence[14], buffer.getNextNonBlocked());
    +
    +		// all done
    +		assertNull(buffer.getNextNonBlocked());
    +		assertNull(buffer.getNextNonBlocked());
    +
    +		buffer.cleanup();
    +
    +		checkNoTempFilesRemain();
    +	}
    +
    +	@Test
    +	public void testSingleChannelAbortCheckpoint() throws Exception {
    +		BufferOrEvent[] sequence = {
    +				createBuffer(0),
    +				createBarrier(1, 0),
    +				createBuffer(0),
    +				createBarrier(2, 0),
    +				createCancellationBarrier(4, 0),
    +				createBarrier(5, 0),
    +				createBuffer(0),
    +				createCancellationBarrier(6, 0),
    +				createBuffer(0)
    +		};
    +
    +		MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence));
    +		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +		StatefulTask toNotify = mock(StatefulTask.class);
    +		buffer.registerCheckpointEventHandler(toNotify);
    +
    +		check(sequence[0], buffer.getNextNonBlocked());
    +		check(sequence[2], buffer.getNextNonBlocked());
    +		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +		assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +		check(sequence[6], buffer.getNextNonBlocked());
    +		assertEquals(5L, buffer.getCurrentCheckpointId());
    +		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
    +		verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
    +		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
    +		assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +		check(sequence[8], buffer.getNextNonBlocked());
    +		assertEquals(6L, buffer.getCurrentCheckpointId());
    +		verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
    +		assertEquals(0L, buffer.getAlignmentDurationNanos());
     		
    +		buffer.cleanup();
    +		checkNoTempFilesRemain();
    +	}
    +
    +	@Test
    +	public void testMultiChannelAbortCheckpoint() throws Exception {
    +		BufferOrEvent[] sequence = {
    +				// some buffers and a successful checkpoint
    +				/* 0 */ createBuffer(0), createBuffer(2), createBuffer(0),
    +				/* 3 */ createBarrier(1, 1), createBarrier(1, 2),
    +				/* 5 */ createBuffer(2), createBuffer(1),
    +				/* 7 */ createBarrier(1, 0),
    +				/* 8 */ createBuffer(0), createBuffer(2),
    +
    +				// aborted on last barrier
    +				/* 10 */ createBarrier(2, 0), createBarrier(2, 2),
    +				/* 12 */ createBuffer(0), createBuffer(2),
    +				/* 14 */ createCancellationBarrier(2, 1),
    +
    +				// successful checkpoint
    +				/* 15 */ createBuffer(2), createBuffer(1),
    +				/* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0),
    +
    +				// abort on first barrier
    +				/* 20 */ createBuffer(0), createBuffer(1),
    +				/* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2),
    +				/* 24 */ createBuffer(0),
    +				/* 25 */ createBarrier(4, 0),
    +
    +				// another successful checkpoint
    +				/* 26 */ createBuffer(0), createBuffer(1), createBuffer(2),
    +				/* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0),
    +				/* 32 */ createBuffer(0), createBuffer(1),
    +
    +				// abort multiple cancellations and a barrier after the cancellations
    +				/* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2),
    +				/* 36 */ createBarrier(6, 0),
    +
    +				/* 37 */ createBuffer(0)
    +		};
    +
    +		MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence));
    +		BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +		StatefulTask toNotify = mock(StatefulTask.class);
    +		buffer.registerCheckpointEventHandler(toNotify);
    +
    +		long startTs;
    +
    +		// successful first checkpoint, with some aligned buffers
    +		check(sequence[0], buffer.getNextNonBlocked());
    +		check(sequence[1], buffer.getNextNonBlocked());
    +		check(sequence[2], buffer.getNextNonBlocked());
    +		startTs = System.nanoTime();
    +		check(sequence[5], buffer.getNextNonBlocked());
    +		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +		validateAlignmentTime(startTs, buffer);
    +
    +		check(sequence[6], buffer.getNextNonBlocked());
    +		check(sequence[8], buffer.getNextNonBlocked());
    +		check(sequence[9], buffer.getNextNonBlocked());
    +
    +		// canceled checkpoint on last barrier
    +		startTs = System.nanoTime();
    +		check(sequence[12], buffer.getNextNonBlocked());
    +		verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
    +		validateAlignmentTime(startTs, buffer);
    +		check(sequence[13], buffer.getNextNonBlocked());
    +
    +		// one more successful checkpoint
    +		check(sequence[15], buffer.getNextNonBlocked());
    +		check(sequence[16], buffer.getNextNonBlocked());
    +		startTs = System.nanoTime();
    +		check(sequence[20], buffer.getNextNonBlocked());
    +		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
    +		validateAlignmentTime(startTs, buffer);
    +		check(sequence[21], buffer.getNextNonBlocked());
    +
    +		// this checkpoint gets immediately canceled
    +		check(sequence[24], buffer.getNextNonBlocked());
    +		verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
    +		assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +		// some buffers
    +		check(sequence[26], buffer.getNextNonBlocked());
    +		check(sequence[27], buffer.getNextNonBlocked());
    +		check(sequence[28], buffer.getNextNonBlocked());
    +
    +		// a simple successful checkpoint
    +		startTs = System.nanoTime();
    +		check(sequence[32], buffer.getNextNonBlocked());
    +		verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
    +		validateAlignmentTime(startTs, buffer);
    +		check(sequence[33], buffer.getNextNonBlocked());
    +
    +		check(sequence[37], buffer.getNextNonBlocked());
    +		verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
    +		assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +		// all done
    +		assertNull(buffer.getNextNonBlocked());
    +		assertNull(buffer.getNextNonBlocked());
    +
    +		buffer.cleanup();
    +		checkNoTempFilesRemain();
    +	}
    +
    +	@Test
    +	public void testAbortViaQueuedBarriers() throws Exception {
    +		BufferOrEvent[] sequence = {
    +				// starting a checkpoint
    +				/* 0 */ createBuffer(1),
    +				/* 1 */ createBarrier(1, 1), createBarrier(1, 2),
    +				/* 3 */ createBuffer(2), createBuffer(0), createBuffer(1),
    +
    +				// queued barrier and cancellation barrier
    +				/* 6 */ createCancellationBarrier(2, 2),
    +				/* 7 */ createBarrier(2, 1),
    +
    +				// some intermediate buffers (some queued)
    +				/* 8 */ createBuffer(0), createBuffer(1), createBuffer(2),
    +
    +				// complete initial checkpoint
    +				/* 11 */ createBarrier(1, 0),
    +
    +				// some buffers (none queued, since checkpoint is aborted)
    +				/* 12 */ createBuffer(2), createBuffer(1), createBuffer(0),
    +
    +				// final barrier of aborted checkpoint
    +				/* 15 */ createBarrier(1, 2),
    --- End diff --
    
    Isn't this a barrier of the initial checkpoint which has been completed successfully?
Just wondering because of the comment.


> Add a limit for how much data may be buffered during checkpoint alignment
> -------------------------------------------------------------------------
>
>                 Key: FLINK-4975
>                 URL: https://issues.apache.org/jira/browse/FLINK-4975
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.1.3
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.2.0, 1.1.4
>
>
> During checkpoint alignment, data may be buffered/spilled.
> We should introduce an upper limit for the spilled data volume. After exceeding that
limit, the checkpoint alignment should abort and the checkpoint be canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message