flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Aljoscha Krettek (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6116) Watermarks don't work when unioning with same DataStream
Date Thu, 11 May 2017 15:01:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16006582#comment-16006582
] 

Aljoscha Krettek commented on FLINK-6116:
-----------------------------------------

It seems that the downstream operator has two input channels, elements from both the unioned
inputs are always received on the one channel while watermarks are received on the other channel.
If you add some print statements to {{StreamInputProcessor}} you get this:
{code}
NEW CHANNEL: 1
IN WM: Watermark @ 1494514601000 channel 1
IN WM: Watermark @ 1494514601000 channel 1
NEW CHANNEL: 0
GOT ELEMENT: Record @ 1494514601175 : hello!
GOT ELEMENT: Record @ 1494514601175 : hello!
NEW CHANNEL: 1
NEW CHANNEL: 0
GOT ELEMENT: Record @ 1494514601978 : hello!
GOT ELEMENT: Record @ 1494514601978 : hello!
NEW CHANNEL: 1
IN WM: Watermark @ 1494514602000 channel 1
IN WM: Watermark @ 1494514602000 channel 1
{code}
note that it doesn't print {{GOT WATERMARK}}, because we never receive watermarks on the other
channel and thus never forward a watermark to the operator.

> Watermarks don't work when unioning with same DataStream
> --------------------------------------------------------
>
>                 Key: FLINK-6116
>                 URL: https://issues.apache.org/jira/browse/FLINK-6116
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.2.0, 1.3.0
>            Reporter: Aljoscha Krettek
>            Priority: Blocker
>             Fix For: 1.3.0
>
>
> In this example job we don't get any watermarks in the {{WatermarkObserver}}:
> {code}
> public class WatermarkTest {
> 	public static void main(String[] args) throws Exception {
> 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 		env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> 		env.getConfig().setAutoWatermarkInterval(1000);
> 		env.setParallelism(1);
> 		DataStreamSource<String> input = env.addSource(new SourceFunction<String>()
{
> 			@Override
> 			public void run(SourceContext<String> ctx) throws Exception {
> 				while (true) {
> 					ctx.collect("hello!");
> 					Thread.sleep(800);
> 				}
> 			}
> 			@Override
> 			public void cancel() {
> 			}
> 		});
> 		input.union(input)
> 				.flatMap(new IdentityFlatMap())
> 				.transform("WatermarkOp", BasicTypeInfo.STRING_TYPE_INFO, new WatermarkObserver());
> 		env.execute();
> 	}
> 	public static class WatermarkObserver
> 			extends AbstractStreamOperator<String>
> 			implements OneInputStreamOperator<String, String> {
> 		@Override
> 		public void processElement(StreamRecord<String> element) throws Exception {
> 			System.out.println("GOT ELEMENT: " + element);
> 		}
> 		@Override
> 		public void processWatermark(Watermark mark) throws Exception {
> 			super.processWatermark(mark);
> 			System.out.println("GOT WATERMARK: " + mark);
> 		}
> 	}
> 	private static class IdentityFlatMap
> 			extends RichFlatMapFunction<String, String> {
> 		@Override
> 		public void flatMap(String value, Collector<String> out) throws Exception {
> 			out.collect(value);
> 		}
> 	}
> }
> {code}
> When commenting out the `union` it works.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message