flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kanstantsin Kamkou <kkam...@gmail.com>
Subject Re: Maintaining watermarks per key, instead of per operator instance
Date Thu, 02 Jun 2016 17:18:15 GMT
Hi Aljoscha! Is it possible somehow to use the RichXFunction in CEP?
The task is pretty similar, but I have to ignore once the next
triggered event for the same key.


On Wed, Jun 1, 2016 at 2:54 PM, Aljoscha Krettek <aljoscha@apache.org> wrote:
> Hi,
> yeah, in that case per-key watermarks would be useful for you. I won't be
> possible to add such a feature, though, due to the (possibly) dynamic nature
> of the key space and how watermark tracking works.
>
> You should be able to implement it with relatively low overhead using a
> RichFlatMapFunction and keyed state. This is the relevant section of the
> doc:
> https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/state.html#using-the-keyvalue-state-interface.
>
> We are also in the process of improving our windowing system, especially
> when it comes to late data, cleanup and trigger semantics. You can have a
> look here if you're interested:
> https://docs.google.com/document/d/1Xp-YBf87vLTduYSivgqWVEMjYUmkA-hyb4muX3KRl08/edit?usp=sharing.
>
> Best,
> Aljoscha
>
> On Tue, 31 May 2016 at 14:36 <leon_mclare@tutanota.com> wrote:
>>
>> Hi Aljoscha,
>>
>> thanks for the speedy reply.
>>
>> I am processing measurements delivered by smart meters. I use windows to
>> gather measurements and calculate values such as average consumption. The
>> key is simply the meter ID.
>>
>> The challenge is that meters may undergo network partitioning, under which
>> they fall back to local buffering. The data is then transmitted once
>> connectivity has been re-established. I am using event time to obtain
>> accurate calculations.
>>
>> If a specific meter goes offline, and the watermark progresses to the next
>> window for an operator instance, then all late data will be discarded once
>> that meter is online again, until it has caught up to the event time. This
>> is because I am using a custom EventTimeTrigger implementation that discards
>> late elements. The reason for that is because Flink would otherwise
>> immediately evaluate the window upon receiving a late element, which is a
>> problem since my calculations (e.g. the average consumption) depend on
>> multiple elements. I cannot calculate averages with that single late
>> element.
>>
>> Each individual meter guarantees in-order transmission of measurements. If
>> watermarks progressed per key, then i would never have late elements because
>> of that guarantee. I would be able to accurately calculate averages, with
>> the trade-off that my results would arrive sporadically from the same
>> operator instance.
>>
>> I suppose I could bypass the use of windows by implementing a stateful map
>> function that mimics windows to a certain degree. I implemented something
>> similar in Storm, but the amount of application logic required is
>> substantial.
>>
>> I completely understand why Flink evaluates a window on a late element,
>> since there is no other way to know when to evaluate the window as event
>> time has already progressed.
>>
>> Perhaps there is a way to gather/redirect late elements?
>>
>> Regards
>> Leon
>>
>> 31. May 2016 13:37 by aljoscha@apache.org:
>>
>>
>> Hi,
>> I'm afraid this is impossible with the current design of Flink. Might I
>> ask what you want to achieve with this? Maybe we can come up with a
>> solution.
>>
>> -Aljoscha
>>
>> On Tue, 31 May 2016 at 13:24 <leon_mclare@tutanota.com> wrote:
>>>
>>> My use case primarily concerns applying transformations per key, with the
>>> keys remaining fixed throughout the topology. I am using event time for my
>>> windows.
>>>
>>> The problem i am currently facing is that watermarks in windows propagate
>>> per operator instance, meaning the operator event time increases for all
>>> keys that the operator is in charge of. I wish for watermarks to progress
>>> per key, not per operator instance.
>>>
>>> Is this easily possible? I was unable to find an appropriate solution
>>> based on existing code recipes.
>>>
>>> Greetings
>>> Leon

Mime
View raw message