flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Anderson <da...@ververica.com>
Subject Re: Watermark won't advance in ProcessFunction
Date Mon, 28 Oct 2019 10:30:31 GMT
The reason why the watermark is not advancing is that
assignAscendingTimestamps is a periodic watermark generator. This
style of watermark generator is called at regular intervals to create
watermarks -- by default, this is done every 200 msec. With only a
tiny bit of data to process, the job doesn't run long enough for the
watermark generator to ever be called.


On Mon, Oct 28, 2019 at 9:17 AM Dian Fu <dian0511.fu@gmail.com> wrote:
>
> Before a program close, it will emit Long.MaxValue as the watermark and that watermark
will trigger all the windows. This is the reason why your `timeWindow` program could work.
However, for the first program, you have not registered the event time timer(though context.timerService.registerEventTimeTimer)
and also there is also no onTimer logic defined to process it.
>
> 在 2019年10月28日,下午4:01,杨力 <bill.lee.y@gmail.com> 写道:
>
> It seems to be the case. But when I use timeWindow or CEP with fromCollection, it works
well. For example,
>
> ```
> sEnv.fromCollection(Seq[Long](1, 1002, 2002, 3002)).assignAscendingTimestamps(identity[Long])
>     .keyBy(_ % 2).timeWindow(Time.seconds(1)).sum(0).print()
> ```
>
> prints
>
> ```
> 1
> 1002
> 2002
> 3002
> ```
>
> How can I implement my KeyedProcessFunction so that it would work as expected.
>
> Dian Fu <dian0511.fu@gmail.com> 于 2019年10月28日周一 下午2:04写道:
>>
>> Hi,
>>
>> It generates watermark periodically by default in the underlying implementation of
`assignAscendingTimestamps`. So for your test program, the watermark is still not generated
yet and I think that's the reason why it's Long.MinValue.
>>
>> Regards,
>> Dian
>>
>> 在 2019年10月28日,上午11:59,杨力 <bill.lee.y@gmail.com> 写道:
>>
>> I'm going to sort elements in a PriorityQueue and set up timers at (currentWatermark
+ 1), following the instructions in https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/process_function.html#timer-coalescing.
>>
>> However, it seems that context.timerService().currentWatermark() always returns Long.MinValue
and my onTimer will never be called. Here's minimal program to reproduce the problem. Am I
missing something?
>>
>> ```
>> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
>> sEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> sEnv.setParallelism(argOps.parallelism())
>> sEnv.fromCollection(Seq[Long](1, 2, 3)).assignAscendingTimestamps(identity[Long])
>>     .process(new ProcessFunction[Long, Long] {
>>       override def processElement(i: Long, context: ProcessFunction[Long, Long]#Context,
collector: Collector[Long]): Unit = {
>>         collector.collect(context.timerService().currentWatermark())
>>       }
>>     }).print()
>> sEnv.execute()
>> ```
>>
>> ```
>> -9223372036854775808
>> -9223372036854775808
>> -9223372036854775808
>> ```
>>
>>
>

Mime
View raw message