flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
Date Wed, 28 Sep 2016 13:35:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15529632#comment-15529632
] 

ASF GitHub Bot commented on FLINK-4329:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2546#discussion_r80916240
  
    --- Diff: flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileMonitoringTest.java
---
    @@ -106,6 +107,117 @@ public static void destroyHDFS() {
     	//						TESTS
     
     	@Test
    +	public void testFileReadingOperatorWithIngestionTime() throws Exception {
    +		Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
    +		Map<Integer, String> expectedFileContents = new HashMap<>();
    +		for(int i = 0; i < NO_OF_FILES; i++) {
    +			Tuple2<org.apache.hadoop.fs.Path, String> file = fillWithData(hdfsURI, "file",
i, "This is test line.");
    +			filesCreated.add(file.f0);
    +			expectedFileContents.put(i, file.f1);
    +		}
    +
    +		TextInputFormat format = new TextInputFormat(new Path(hdfsURI));
    +		TypeInformation<String> typeInfo = TypeExtractor.getInputFormatTypes(format);
    +
    +		ContinuousFileReaderOperator<String, ?> reader = new ContinuousFileReaderOperator<>(format);
    +
    +		ExecutionConfig executionConfig = new ExecutionConfig();
    +		executionConfig.setAutoWatermarkInterval(100);
    +
    +		TestTimeServiceProvider timeServiceProvider = new TestTimeServiceProvider();
    +		OneInputStreamOperatorTestHarness<FileInputSplit, String> tester =
    +			new OneInputStreamOperatorTestHarness<>(reader, executionConfig,
    +				timeServiceProvider, TimeCharacteristic.IngestionTime);
    +
    +		reader.setOutputType(typeInfo, executionConfig);
    +		tester.open();
    +
    +		// test that watermarks are correctly emitted
    +
    +		timeServiceProvider.setCurrentTime(201);
    +		timeServiceProvider.setCurrentTime(301);
    +		timeServiceProvider.setCurrentTime(401);
    +		timeServiceProvider.setCurrentTime(501);
    +
    +		int i = 0;
    +		for(Object line: tester.getOutput()) {
    +			if (!(line instanceof Watermark)) {
    +				Assert.fail("Only watermarks are expected here ");
    +			}
    +			Watermark w = (Watermark) line;
    +			Assert.assertEquals(w.getTimestamp(), 200 + (i * 100));
    +			i++;
    +		}
    +
    +		// clear the output to get the elements only and the final watermark
    +		tester.getOutput().clear();
    +		Assert.assertEquals(tester.getOutput().size(), 0);
    +
    +		// create the necessary splits for the test
    +		FileInputSplit[] splits = format.createInputSplits(
    +			reader.getRuntimeContext().getNumberOfParallelSubtasks());
    +
    +		// and feed them to the operator
    +		for(FileInputSplit split: splits) {
    +			tester.processElement(new StreamRecord<>(split));
    +		}
    +
    +		// then close the reader gracefully so that
    +		// we wait until all input is read
    +		synchronized (tester.getCheckpointLock()) {
    +			tester.close();
    +		}
    +
    +		for(org.apache.hadoop.fs.Path file: filesCreated) {
    +			hdfs.delete(file, false);
    +		}
    +
    +		// the lines received must be the elements in the files +1 for the Long.MAX_VALUE watermark
    +		Assert.assertEquals(tester.getOutput().size(), NO_OF_FILES * LINES_PER_FILE + 1);
    +
    +		// put the elements read in a map by file they belong to
    +		Map<Integer, List<String>> actualFileContents = new HashMap<>();
    +		for(Object line: tester.getOutput()) {
    +			if (line instanceof StreamRecord) {
    +				StreamRecord<String> element = (StreamRecord<String>) line;
    +				Assert.assertEquals(element.getTimestamp(), 501);
    +
    +				int fileIdx = Character.getNumericValue(element.getValue().charAt(0));
    +				List<String> content = actualFileContents.get(fileIdx);
    +				if (content == null) {
    +					content = new ArrayList<>();
    +					actualFileContents.put(fileIdx, content);
    +				}
    +				content.add(element.getValue() + "\n");
    +			} else if (line instanceof Watermark) {
    +				Assert.assertEquals(((Watermark) line).getTimestamp(), Long.MAX_VALUE);
    --- End diff --
    
    Does the test assume that all watermarks emitted by the reader are LongMax? I am confused
here, isn't that exactly what should NOT happen? Otherwise all emitted elements are late?


> Fix Streaming File Source Timestamps/Watermarks Handling
> --------------------------------------------------------
>
>                 Key: FLINK-4329
>                 URL: https://issues.apache.org/jira/browse/FLINK-4329
>             Project: Flink
>          Issue Type: Bug
>          Components: Streaming Connectors
>    Affects Versions: 1.1.0
>            Reporter: Aljoscha Krettek
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0, 1.1.3
>
>
> The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, i.e. they
are just passed through. This means that when the {{ContinuousFileMonitoringFunction}} closes
and emits a {{Long.MAX_VALUE}} that watermark can "overtake" the records that are to be emitted
in the {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" setting
in window operator this can lead to elements being dropped as late.
> Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion timestamps
since it is not technically a source but looks like one to the user.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message