flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From KristoffSC <krzysiek.chmielew...@gmail.com>
Subject Session Window with Custom Trigger
Date Tue, 23 Jun 2020 13:52:43 GMT
Hi all,
I'm using Flink 1.9.2 and I would like to ask about my use case and approach
I've took to meet it. 

The use case: 
I have a keyed stream, where I have to buffer messages with logic:
1. Buffering should start only when message arrives.
2. The max buffer time should not be longer than 3 seconds
3. Each new message should NOT prolong the buffer time.
4. If particular business condition will be meet, buffering should stop and
all messages should be let through further processing.

The business logic in point 4 is taking under the consideration data from
previously buffered messages in this time buffer session.

My setup for this is
1.keyedStream with ProcessingTimeSessionWindow (I dont need EventTime for
this).
2. Custom Trigger

The custom trigger:
1. keeps some data in its state under AggregatingStateDescriptor allowing me
to override "merge" method from Trigger class.

2. In onElement method, for the first call I execute
ctx.registerEventTimeTimer(window.maxTimestamp());
Additionally in this method I added the busioenss logic which returns
TriggerResult.FIRE or TriggerResult.CONTINUE

3. The onProcessingTime methods returns TriggerResult.FIRE

3. all other methods are returning TriggerResult.CONTINUE



As a result, I can observe that my window is fired two times. One from
onElement method where the busienss condition is meet and second time from
onProcessingTime method.

What is the best way to prevent this?

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Mime
View raw message