flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kaelumania <stephan.epp...@zweitag.de>
Subject Re: Maintaining watermarks per key, instead of per operator instance
Date Wed, 23 Nov 2016 10:25:42 GMT
Sounds good to me. But I still need to have some kind of side output (cassandra) that stores
the accumulating aggregates on each time scale (minute, hour). Thus I would need to have something
like this

var hourly = stream.window(1.hour).apply(..)
//write to cassandra
hourly.trigger(accumulating).addSink(cassandra)
//forward to next acc step
var daily = hourly.trigger(discarding).window(1.day).apply(…)
//write to cassandra
daily.trigger(accumulating).addSink(cassandra)

Would this be possible?

best, Stephan
> On 23 Nov 2016, at 11:16, Aljoscha Krettek [via Apache Flink User Mailing List archive.]
<ml-node+s2336050n10294h19@n4.nabble.com> wrote:
> 
> You can implement discarding behaviour by writing a custom trigger (based on EventTimeTrigger)
that returns FIRE_AND_PURGE when firing. With this you could maybe implement a cascade of
windows where the first aggregates for the smallest time interval and is discarding and where
the other triggers take these "pre-aggregated" values and accumulate.
> 
> On Tue, 22 Nov 2016 at 08:11 Stephan Epping <[hidden email] <x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=0>>
wrote:
> Hey Aljoscha,
> 
> the first solution did not work out as expected. As when late elements arrive the first
window is triggered again and would emit a new (accumulated) event, that would be counted
twice (in time accumulation and late accumulation) in the second window.I could implement
my own (discarding strategy) like in Apache Beam, but the out stream should contain accumulated
events that are stored in cassandra. The second solution just gave an compiler error, thus
I think is not possible right now.
> 
> best Stephan
> 
> 
> 
>> On 21 Nov 2016, at 17:56, Aljoscha Krettek <[hidden email] <x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=1>>
wrote:
>> 
>> Hi,
>> why did you settle for the last solution?
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Thu, 17 Nov 2016 at 15:57 kaelumania <[hidden email] <x-msg://8/user/SendEmail.jtp?type=node&node=10294&i=2>>
wrote:
>> Hi Fabian,
>> 
>> your proposed solution for:
>>  
>> Multiple window aggregations
>> You can construct a data flow of cascading window operators and fork off (to emit
or further processing) the result after each window.
>> 
>> Input -> window(15 secs) -> window(1 min) -> window(15 min) -> ...
>>                         \-> out_1        \-> out_2         \-> out_3
>> does not work, am I missing something?
>> 
>> First I tried the following
>> DataStream<Reading> values = input.assignTimestampsAndWatermarks(new StrictWatermarkAssigner());
// force lateness
>> 
>> DataStream<ReadingAggregate> aggregatesPerMinute = values
>>         .keyBy("id")
>>         .timeWindow(Time.minutes(1))
>>         .allowedLateness(Time.minutes(2))
>>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>> 
>> DataStream<ReadingAggregate> aggregatesPerHour = aggregatesPerMinute
>>         .keyBy("id")
>>         .timeWindow(Time.hours(1))
>>         .allowedLateness(Time.hours(2))
>>         .apply(new AggregateReadingAggregates(), new AggregateReadingAggregates());
>> but due to late data the first fold function would emit 2 rolling aggregates (one
with and one without the late element), which results in being counted twice within the second
reducer. Therefore i tried
>> WindowedStream<Reading, Tuple, TimeWindow> readingsPerMinute = input
>>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
>>         .keyBy("id")
>>         .timeWindow(Time.minutes(1))
>>         .allowedLateness(Time.hours(2));
>> 
>> WindowedStream<Reading, Tuple, TimeWindow> readingsPerHours = readingsPerMinute
>>         .timeWindow(Time.hours(1))
>>         .allowedLateness(Time.hours(2));
>> 
>> DataStream<ReadingAggregate> aggregatesPerMinute = readingsPerMinute.apply(new
ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>> DataStream<ReadingAggregate> aggregatesPerHour = readingsPerHours.apply(new
ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>> which gives me a compiler error as WindowedStream does not provide a timeWindow method.
>> 
>> Finally I settled with this:
>> KeyedStream<Reading, Tuple> readings = input
>>         .assignTimestampsAndWatermarks(new StrictWatermarkAssigner()) // force lateness
>>         .keyBy("id");
>> 
>> DataStream<ReadingAggregate> aggregatesPerMinute = readings
>>         .timeWindow(Time.minutes(1))
>>         .allowedLateness(Time.hours(2))
>>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>> 
>> DataStream<ReadingAggregate> aggregatesPerHour = readings
>>         .timeWindow(Time.hours(1))
>>         .allowedLateness(Time.hours(2))
>>         .apply(new ReadingAggregate(), new AggregateReadings(), new AggregateReadings());
>> 
>> 
>> Feedback is very welcome.
>> 
>> best, Stephan
>> 
>> 
>> 
>> 
>>> On 11 Nov 2016, at 00:29, Fabian Hueske-2 [via Apache Flink User Mailing List
archive.] <[hidden email] <http://user/SendEmail.jtp?type=node&node=10179&i=0>>
wrote:
>>> 
>> 
>>> Hi Stephan,
>>> 
>>> I just wrote an answer to your SO question. 
>>> 
>>> Best, Fabian
>> 
>>> 
>>> 2016-11-10 11:01 GMT+01:00 Stephan Epping <<a href="x-msg://3/user/SendEmail.jtp?type=node&amp;node=10033&amp;i=0
<>" target="_top" rel="nofollow" link="external" class="">[hidden email]>:
>> 
>>> 
>>> Hello,
>>> 
>>> I found this question in the Nabble archive (http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288.html>)
but was unable/dont know how to reply.
>>> 
>>> Here is my question regarding the mentioned thread:
>>> 
>>>> Hello, 
>>>> 
>>>> I have similar requirements (see StackOverflor http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data
<http://stackoverflow.com/questions/40465335/apache-flink-multiple-window-aggregations-and-late-data>).
I am pretty new to flink, could you elaborate on a possible solution? We can guarantee good
ordering by sensor_id, thus watermarking by key would be the only reasonable way for us (sensorData.keyBy('id').timeWindow(1.minute).sum('value')),
could I do my own watermarking aftersensorData.keyBy('id').overwriteWatermarking()... per
key? Or maybe using custom state plus a custom trigger? What happens if a sensor dies or is
being removed completely, how can this be detected as watermarks would be ignored for window
garbage collection. Or could we dynamically schedule a job of each sensor? Which would result
in 1000 Jobs.
>>> 
>>> 
>>> Thanks,
>>> Stephan
>>> 
>>> 
>> 
>>> If you reply to this email, your message will be added to the discussion below:
>> 
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10033.html>
>>> To unsubscribe from Maintaining watermarks per key, instead of per operator instance,
click here <>.
>>> NAML <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
>> View this message in context: Re: Maintaining watermarks per key, instead of per
operator instance <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10179.html>
>> Sent from the Apache Flink User Mailing List archive. mailing list archive <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/>
at Nabble.com <http://nabble.com/>.
> 
> 
> 
> If you reply to this email, your message will be added to the discussion below:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10294.html
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10294.html>
> To unsubscribe from Maintaining watermarks per key, instead of per operator instance,
click here <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=7288&code=c3RlcGhhbi5lcHBpbmdAendlaXRhZy5kZXw3Mjg4fC0yNzYyODY4NzI=>.
> NAML <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Maintaining-watermarks-per-key-instead-of-per-operator-instance-tp7288p10295.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
Mime
View raw message