flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Epping <stephan.epp...@zweitag.de>
Subject Re: Maintaining watermarks per key, instead of per operator instance
Date Tue, 15 Nov 2016 08:09:29 GMT
Hey Aljoscha,

that sounds very promising, awesome! Though, I still would need to implement my own window
management logic (window assignment and window state purging), right? I was thinking about
reusing some of the existing components (TimeWindow) and WindowAssigner, but run my own WindowOperator
(aka ProcessFunction). But I am not sure, if that is done easily. I would love to hear your
opinion on that, and what the tricky parts will be? For example, common mistakes you experienced
in developing the windowing mechanism.

best Stephan


> On 14 Nov 2016, at 19:05, Aljoscha Krettek <aljoscha@apache.org> wrote:
> 
> Hi Stephan,
> I was going to suggest that using a flatMap and tracking the timestamp of each key yourself
is a bit like having a per-key watermark. I wanted to wait a bit before answering because
I'm currently working on a new type of Function that will be release with Flink 1.2: ProcessFunction.
This is somewhat like a FlatMap but also allows to access the element timestamp, query current
processing time/event time and set (per key) timers for processing time and event time. With
this you should be able to easily implement your per-key tracking, I hope.
> 
> Cheers,
> Aljoscha
> 
> P.S. ProcessFunction is already in the Flink repository but it's called TimelyFlatMapFunction
right now, because I was working on it under that working title.
> 
> On Mon, 14 Nov 2016 at 15:47 kaelumania <stephan.epping@zweitag.de <mailto:stephan.epping@zweitag.de>>
wrote:
> Hey Fabian,
> 
> thank you very much. 
> 
> - yes, I would window by event time and fire/purge by processing time
> - Cheaper in the end meant, that having too much state in the flink cluster would be
more expensive, as we store all data in cassandra too.I think the fault tolerance would be
okay, as we would make a compare and set with cassandra. 
> 
> With the flatMap Operator wouldn’t it be like running my own windowing mechanism? I
need to keep the aggregate window per sensor open (with checkpointing and state management)
until I receive an element for a sensor that is later in time than the windows time and then
purge the state and emit a new event (which is like having a watermark per sensor). Further,
I need a timer that fires like after 24 hours, in case a sensor dies and doesn’t send more
data which might is possible with window assigner/trigger, right? But not inside normal functions,
e.g. flatMap? We can guarantee that all sensor data per sensor comes almost in order (might
be out of order within a few seconds), but there might be gaps of several hours after network
partitions.
> 
> There is now way to define/redefine the watermark per keyed stream? Or adjust the window
assigner + trigger to achieve the desired behaviour? I am a bit reserved in implementing the
whole state management. Do you plan to support such use cases on keyed streams? Maybe the
WatermarkAssigner could also receive information about the key for wich the watermark should
be calculated etc.
> 
> best, Stephan
> 
> 
> 
>> On 14 Nov 2016, at 15:17, Fabian Hueske-2 [via Apache Flink User Mailing List archive.]
<[hidden email] <http://user/SendEmail.jtp?type=node&node=10098&i=0>>
wrote:
>> 
> 
>> Hi Stephan,
>> 
>> I'm skeptical about two things: 
>> - using processing time will result in inaccurately bounded aggregates (or do you
want to group by event time in a processing time window?)
>> - writing to and reading from Cassandra might be expensive (not sure what you mean
by cheaper in the end) and it is not integrated with Flink's checkpointing mechanism for fault-tolerance.
>> 
>> To me, the stateful FlatMapOperator looks like the best approach. There is an upcoming
feature for registering timers in user-functions, i.e., a function is called after the timer
exceeds. This could be helpful to overcome the problem of closing the window without new data.
>> 
>> Best, 
>> Fabian
> 
>> 
>> 2016-11-14 8:39 GMT+01:00 Stephan Epping <<a href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=0
<x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=0>" target="_top"
rel="nofollow" link="external" class="">[hidden email]>:
> 
>> Hello Fabian,
>> 
>> Thank you very much. What is your opinion on the following solution:
>> 
>> - Window data per time window, e.g. 15 minutes
>> - using processing time as trigger, e.g. 15 minutes
>> - which results in an aggregate over sensor values
>> - then use cassandra to select the previous aggregate (as there can be multiple for
the time window due to processing time)
>> - then update the aggregate and put it into a cassandra sink again
>> 
>> The cassandra select will be a bit slower than using an in memory/flink state, but
will be cheaper in the end. Further, what does this have for consequences?
>> For example, replaying events will be more difficult, right? Also, what about Snapshots?
Will they work with the mentioned design?
>> 
>> kind regards,
>> Stephan
> 
>>> On 11 Nov 2016, at 00:39, Fabian Hueske <<a href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=1
<x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=1>" target="_top"
rel="nofollow" link="external" class="">[hidden email]> wrote:
>>> 
> 
>>> Hi Stephan,
>>> 
>>> I just wrote an answer to your SO question. 
>>> 
>>> Best, Fabian
> 
>>> 
>>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a href="x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=2
<x-msg://10/user/SendEmail.jtp?type=node&amp;node=10094&amp;i=2>" target="_top"
rel="nofollow" link="external" class="">[hidden email]>:
> 
>>> 
>>> Hello,
>>> 
>>> I found this question in the Nabble archive (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
but was unable/dont know how to reply.
>>> 
>>> Here is my question regarding the mentioned thread:
>>> 
>>>> Hello, 
>>>> 
>>>> I have similar requirements (see StackOverflor http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data
<http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>).
I am pretty new to flink, could you elaborate on a possible solution? We can guarantee good
ordering by sensor_id, thus watermarking by key would be the only reasonable way for us (sensorData.keyBy('id').timeWindow(1.minute).sum('value')),
could I do my own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per
key? Or maybe using custom state plus a custom trigger? What happens if a sensor dies or is
being removed completely, how can this be detected as watermarks would be ignored for window
garbage collection. Or could we dynamically schedule a job of each sensor? Which would result
in 1000 Jobs.
>>> 
>>> 
>>> Thanks,
>>> Stephan
>>> 
>>> 
> 
>>> 
>> 
>> 
>> 
>> 
>> If you reply to this email, your message will be added to the discussion below:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10094.html>
>> To unsubscribe from Maintaining watermarks per key, instead of per operator instance,
click here <>.
>> NAML <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
> 
> View this message in context: Re: Maintaining watermarks per key, instead of per operator
instance <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10098.html>
> Sent from the Apache Flink User Mailing List archive. mailing list archive <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
at Nabble.com <http://nabble.com/>.


Mime
View raw message