flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
Date Wed, 05 Oct 2016 10:58:21 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15548382#comment-15548382
] 

ASF GitHub Bot commented on FLINK-4329:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2593#discussion_r81944268
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
---
    @@ -106,6 +107,155 @@ public static void destroyHDFS() {
     	//						TESTS
     
     	@Test
    +	public void testFileReadingOperatorWithIngestionTime() throws Exception {
    +		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
    +		Map<Integer, String> expectedFileContents = new HashMap<>();
    +
    +		for(int i = 0; i < NO_OF_FILES; i++) {
    +			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file",
i, "This is test line.");
    +			filesCreated.add(file.f0);
    +			expectedFileContents.put(i, file.f1);
    +		}
    +
    +		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
    +		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
    +
    +		final long watermarkInterval = 10;
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		executionConfig.setAutoWatermarkInterval(watermarkInterval);
    +
    +		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
    +		reader.setOutputType(typeInfo, executionConfig);
    +
    +		final TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
    +		final OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
    +			new OneInputStreamOperatorTestHarness<>(reader, executionConfig, timeServiceProvider);
    +		tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +		tester.open();
    +
    +		Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic());
    +
    +		// test that watermarks are correctly emitted
    +
    +		timeServiceProvider.setCurrentTime(201);
    +		timeServiceProvider.setCurrentTime(301);
    +		timeServiceProvider.setCurrentTime(401);
    +		timeServiceProvider.setCurrentTime(501);
    +
    +		int i = 0;
    +		for(Object line: tester.getOutput()) {
    +			if (!(line instanceof Watermark)) {
    +				Assert.fail("Only watermarks are expected here ");
    +			}
    +			Watermark w = (Watermark) line;
    +			Assert.assertEquals(200 + (i * 100), w.getTimestamp());
    +			i++;
    +		}
    +
    +		// clear the output to get the elements only and the final watermark
    +		tester.getOutput().clear();
    +		Assert.assertEquals(0, tester.getOutput().size());
    +
    +		// create the necessary splits for the test
    +		FileInputSplit[] splits = format.createInputSplits(
    +			reader.getRuntimeContext().getNumberOfParallelSubtasks());
    +
    +		// and feed them to the operator
    +		Map<Integer, List<String>> actualFileContents = new HashMap<>();
    +
    +		long lastSeenWatermark = Long.MIN_VALUE;
    +		int lineCounter = 0;	// counter for the lines read from the splits
    +		int watermarkCounter = 0;
    +
    +		for(FileInputSplit split: splits) {
    +
    +			// set the next "current processing time".
    +			long nextTimestamp = timeServiceProvider.getCurrentProcessingTime() + watermarkInterval;
    +			timeServiceProvider.setCurrentTime(nextTimestamp);
    +
    +			// send the next split to be read and wait until it is fully read.
    +			tester.processElement(new StreamRecord<>(split));
    +			synchronized (tester.getCheckpointLock()) {
    +				while (tester.getOutput().isEmpty() || tester.getOutput().size() != (LINES_PER_FILE
+ 1)) {
    +					tester.getCheckpointLock().wait(10);
    +				}
    +			}
    +
    +			// verify that the results are the expected
    +			for(Object line: tester.getOutput()) {
    +				if (line instanceof StreamRecord) {
    +					StreamRecord<String> element = (StreamRecord<String>) line;
    +					lineCounter++;
    +
    +					Assert.assertEquals(nextTimestamp, element.getTimestamp());
    +
    +					int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
    +					List<String> content = actualFileContents.get(fileIdx);
    +					if (content == null) {
    +						content = new ArrayList<>();
    +						actualFileContents.put(fileIdx, content);
    +					}
    +					content.add(element.getValue() + "\n");
    +				} else if (line instanceof Watermark) {
    +					long watermark = ((Watermark) line).getTimestamp();
    +
    +					Assert.assertEquals(nextTimestamp - (nextTimestamp % watermarkInterval), watermark);
    +					Assert.assertTrue(watermark > lastSeenWatermark);
    +					watermarkCounter++;
    +
    +					lastSeenWatermark = watermark;
    +				} else {
    +					Assert.fail("Unknown element in the list.");
    +				}
    +			}
    +
    +			// clean the output to be ready for the next split
    +			tester.getOutput().clear();
    +		}
    +
    +		// now we are processing one split after the other,
    +		// so all the elements must be here by now.
    +		Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
    +
    +		// because we expect one watermark per split.
    +		Assert.assertEquals(NO_OF_FILES, watermarkCounter);
    --- End diff --
    
    This should be `Assert.assertEquals(splits.length, watermarkCounter);`.


> Fix Streaming File Source Timestamps/Watermarks Handling
> --------------------------------------------------------
>
>                 Key: FLINK-4329
>                 URL: https://issues.apache.org/jira/browse/FLINK-4329
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.1.0
>            Reporter: Aljoscha Krettek
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, i.e. they
are just passed through. This means that when the {{ContinuousFileMonitoringFunction}} closes
and emits a {{Long.MAX_VALUE}} that watermark can "overtake" the records that are to be emitted
in the {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" setting
in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion timestamps
since it is not technically a source but looks like one to the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message