flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Streaming time window
Date Thu, 10 Dec 2015 14:28:16 GMT
You are right, WindowFunctions collect all data in a window and are
evaluated at once.
Although FoldFunctions could be directly applied on each element that
enters a window, this is not done at the moment.
Only ReduceFunctions are eagerly applied.

If you port your code to a ReduceFunction, you can do

.apply(ReduceFunction, WindowFunction)

This will first call the ReduceFunction and finally call the WindowFunction
with the result of the ReduceFunction.
In principle, this is also possible for fold, but not yet implemented.

Best, Fabian

2015-12-10 15:16 GMT+01:00 Martin Neumann <mneumann@sics.se>:

> I will give this a try.
>
> Though I'm not sure I can switch over to WindowFunction.
> I work with potentially huge Windows, the Fold gives me a minimal and
> constant memory footprint. Switching to WindowFunction will require to keep
> the Window in Memory before it can be processed (at least to my
> understanding) this will lead to problems. Any Idea how to get around this?
>
> cheers Martin
>
>
>
> On Thu, Dec 10, 2015 at 2:59 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Sure. You don't need a trigger, but a WindowFunction instead of the
>> FoldFunction.
>> Only the WindowFunction has access to the Window object.
>>
>> Something like this:
>>
>> poissHostStreams
>>         .timeWindow(Time.of(WINDOW_SIZE, TimeUnit.MILLISECONDS))
>>         .apply(new WindowFunction<IN, OUT, KEY, TimeWindow>() {
>>
>>           @override
>>           public void apply(KEY key, TimeWindow window, Iterable<IN>
>> vals, Collector<OUT> out) {
>>             // YOUR CODE
>>             window.getEnd()
>>           }
>>         })
>>
>> Best, Fabian
>>
>> 2015-12-10 14:41 GMT+01:00 Martin Neumann <mneumann@sics.se>:
>>
>>> Hi Fabian,
>>>
>>> thanks for your answer. Can I do the same in java using normal time
>>> windows (without additional trigger)?
>>>
>>> My current codes looks like this:
>>>
>>> poissHostStreams
>>>         .timeWindow(Time.of(WINDOW_SIZE, TimeUnit.MILLISECONDS))
>>>         .fold(new Tuple2<>("", new HashMap<>()), new MultiValuePoissonPreProcess())
>>>
>>> How can I get access to the time window object in the fold function?
>>>
>>>
>>> cheers Martin
>>>
>>>
>>> On Thu, Dec 10, 2015 at 12:20 PM, Fabian Hueske <fhueske@gmail.com>
>>> wrote:
>>>
>>>> Hi Martin,
>>>>
>>>> you can get the start and end time of a window from the TimeWindow
>>>> object.
>>>> The following Scala code snippet shows how to access the window end
>>>> time (start time is equivalent):
>>>>
>>>> .timeWindow(Time.minutes(5))
>>>> .trigger(new EarlyCountTrigger(earlyCountThreshold))
>>>> .apply { (
>>>>   key: Int,
>>>>   window: TimeWindow,
>>>>   vals: Iterable[(Int, Short)],
>>>>   out: Collector[(Int, Long, Int)]) =>
>>>>     out.collect( ( key, window.getEnd, vals.map( _._2 ).sum ) )
>>>> }
>>>>
>>>> Cheers, Fabian
>>>>
>>>> 2015-12-10 12:04 GMT+01:00 Martin Neumann <mneumann@sics.se>:
>>>>
>>>>> Hej,
>>>>>
>>>>> Is it possible to extract the start and end window time stamps from
>>>>> within a window operator?
>>>>>
>>>>> I have an event time based window that does a simple fold function. I
>>>>> want to put the output into elasticsearch and want to preserve the start
>>>>> and end timestamp of the data so I can directly compare it with related
>>>>> data. The only Idea I had so far was to manually keep track of the minimum
>>>>> and maximum timestamp found in a window and pass them along with the
>>>>> output. This is a quite bad approximation since the window I see depends
>>>>> alot on how the values are spaced out. Anyone an idea how to do this?
>>>>>
>>>>> cheers Martin
>>>>>
>>>>
>>>>
>>>
>>
>

Mime
View raw message