flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Till Rohrmann <trohrm...@apache.org>
Subject Re: Testing with ProcessingTime and FromElementsFunction with TestEnvironment
Date Thu, 09 May 2019 07:02:14 GMT
Hi Steve,

afaik there is no such thing in Flink. I agree that Flink's testing
utilities should be improved. If you implement such a source, then you
might be able to contribute it back to the community. That would be super


On Wed, May 8, 2019 at 6:40 PM Steven Nelson <snelson@sourceallies.com>

> That’s what I figured was happening :( Your explanation is a lot better
> than what I gave to my team, so that will help a lot, thank you!
> Is there a testing source already created that does this sort of thing?
> The Flink-testing library seems a bit sparse.
> -Steve
> Sent from my iPhone
> On May 8, 2019, at 9:33 AM, Till Rohrmann <trohrmann@apache.org> wrote:
> Hi Steve,
> I think the reason for the different behaviour is due to the way event
> time and processing time are implemented.
> When you are using event time, watermarks need to travel through the
> topology denoting the current event time. When you source terminates, the
> system will send a watermark with Long.MAX_VALUE through the topology. This
> will effectively trigger the completion of all pending event time
> operations.
> In the case of processing time, Flink does not do this. Instead it simply
> relies on the processing time clocks on each machine. Hence, there is no
> way for Flink to tell the different machines that their respective
> processing time clocks should proceed to a certain time in case of a
> shutdown. Instead you should make sure that you don't terminate the job
> before a certain time (processing time) has passed. You could do this by
> adding a sleep to your source function after you've output all records and
> just before leaving the source loop.
> Cheers,
> Till
> On Tue, May 7, 2019 at 11:49 PM Steven Nelson <snelson@sourceallies.com>
> wrote:
>> Hello!
>> I am trying to write a test that runs in the TestEnviroment. I create a
>> process that uses ProcessingTime, has a source constructed from a
>> FromElementsFunction and runs data through a Keyed Stream into
>> a ProcessingTimeSessionWindows.withGap().
>> The problem is that it appears that the env.execute method returns
>> immediately after the session closes, not allowing the events to be
>> released from the window before shutdown occurs. This used to work when I
>> used EventTime.
>> Thoughts?
>> -Steve

View raw message