flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5096) Make the RollingSink rescalable.
Date Mon, 21 Nov 2016 15:32:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15683867#comment-15683867
] 

ASF GitHub Bot commented on FLINK-5096:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2845#discussion_r88915712
  
    --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/RollingSinkITCase.java
---
    @@ -638,6 +644,231 @@ public void flatMap(Tuple2<Integer, String> value,
     		Assert.assertEquals(8, numFiles);
     	}
     
    +	private static final String PART_PREFIX = "part";
    +	private static final String PENDING_SUFFIX = ".pending";
    +	private static final String IN_PROGRESS_SUFFIX = ".in-progress";
    +	private static final String VALID_LENGTH_SUFFIX = ".valid";
    +
    +	@Test
    +	public void testBucketStateTransitions() throws Exception {
    +		final File outDir = tempFolder.newFolder();
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness = createRescalingTestSink(outDir,
1, 0);
    +		testHarness.setup();
    +		testHarness.open();
    +
    +		testHarness.setProcessingTime(0L);
    +
    +		testHarness.processElement(new StreamRecord<>("test1", 1L)); // we have a bucket
size of 5 bytes
    +		testHarness.processElement(new StreamRecord<>("test2", 1L));
    +		checkFs(outDir, 1, 1 ,0, 0);
    +
    +		testHarness.processElement(new StreamRecord<>("test3", 1L));
    +		checkFs(outDir, 1, 2, 0, 0);
    +
    +		testHarness.snapshot(0, 0);
    +		checkFs(outDir, 1, 2, 0, 0);
    +
    +		testHarness.notifyOfCompletedCheckpoint(0);
    +		checkFs(outDir, 1, 0, 2, 0);
    +
    +		OperatorStateHandles snapshot = testHarness.snapshot(1, 0);
    +
    +		testHarness.close();
    +		checkFs(outDir, 0, 1, 2, 0);
    +
    +		testHarness = createRescalingTestSink(outDir, 1, 0);
    +		testHarness.setup();
    +		testHarness.initializeState(snapshot);
    +		testHarness.open();
    +		checkFs(outDir, 0, 0, 3, 1);
    +
    +		snapshot = testHarness.snapshot(2, 0);
    +
    +		testHarness.processElement(new StreamRecord<>("test4", 10));
    +		checkFs(outDir, 1, 0, 3, 1);
    +
    +		testHarness = createRescalingTestSink(outDir, 1, 0);
    +		testHarness.setup();
    +		testHarness.initializeState(snapshot);
    +		testHarness.open();
    +
    +		// the in-progress file remains as we do not clean up now
    +		checkFs(outDir, 1, 0, 3, 1);
    +
    +		testHarness.close();
    +
    +		// at close it is not moved to final because it is not part
    +		// of the current task's state, it was just a not cleaned up leftover.
    +		checkFs(outDir, 1, 0, 3, 1);
    +	}
    +
    +	@Test
    +	public void testScalingDown() throws Exception {
    +		final File outDir = tempFolder.newFolder();
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness1 = createRescalingTestSink(outDir,
3, 0);
    +		testHarness1.setup();
    +		testHarness1.open();
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness2 = createRescalingTestSink(outDir,
3, 1);
    +		testHarness2.setup();
    +		testHarness2.open();
    +
    +		OneInputStreamOperatorTestHarness<String, Object> testHarness3 = createRescalingTestSink(outDir,
3, 2);
    +		testHarness3.setup();
    +		testHarness3.open();
    +
    +		testHarness1.processElement(new StreamRecord<>("test1", 0L));
    +		checkFs(outDir, 1, 0, 0, 0);
    +
    +		testHarness2.processElement(new StreamRecord<>("test2", 0L));
    +		checkFs(outDir, 2, 0, 0, 0);
    +
    +		testHarness3.processElement(new StreamRecord<>("test3", 0L));
    +		testHarness3.processElement(new StreamRecord<>("test4", 0L));
    +		checkFs(outDir, 3, 1, 0, 0);
    +
    +		// intentionally we snapshot them in the reverse order so that the states are shuffled
    +		OperatorStateHandles mergedSnapshot = AbstractStreamOperatorTestHarness.repackageState(
    +			testHarness3.snapshot(0, 0),
    +			testHarness1.snapshot(0, 0),
    +			testHarness2.snapshot(0, 0)
    +		);
    +
    +		testHarness1 = createRescalingTestSink(outDir, 2, 0);
    +		testHarness1.setup();
    +		testHarness1.initializeState(mergedSnapshot);
    +		testHarness1.open();
    +
    +		// because we do not have a pending for part-2-0
    --- End diff --
    
    this comments needs some context; as in `X happened "because we do not have a pending
for part-2-0"`


> Make the RollingSink rescalable.
> --------------------------------
>
>                 Key: FLINK-5096
>                 URL: https://issues.apache.org/jira/browse/FLINK-5096
>             Project: Flink
>          Issue Type: Improvement
>          Components: filesystem-connector
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0
>
>
> Integrate the RollingSink with the new state abstractions so that its parallelism can
change after restoring from a savepoint.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message