flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ahmad Hassan <ahmad.has...@gmail.com>
Subject Re: Keyed windows with single sink
Date Thu, 22 Jun 2017 16:49:55 GMT
Hi Fabian,

How the process function will be called at 12:00:01 as there are no windows output or events
after 12:00:00. 

Thanks 

> On 22 Jun 2017, at 17:07, Fabian Hueske <fhueske@gmail.com> wrote:
> 
> Let's say window A and window B end at 12:00:00 and window C at 13:00:00.
> When the ProcessFunction receives a watermark at 12:00:01, it knows that Window A and
B have been finished. 
> When it receives a watermark of 13:00:01 it knows that all results of window C have been
received. If there were no records with timestamp 13:00:00, window C did not receive any data
and didn't there not compute anything.
> 
> 
> 2017-06-22 17:44 GMT+02:00 Ahmad Hassan <ahmad.hassan@gmail.com>:
>> Thanks for the answers. My scenario is:
>> 
>> | Window A |
>> | Window B |
>>                    | Window C |
>> 
>> If no events are received for Window C, then how process function would know that
both window 'A' and window 'B' have finished and need to aggregated their result before sink
is called?
>> 
>> Thanks
>> 
>> 
>>> On 22 June 2017 at 16:27, Fabian Hueske <fhueske@gmail.com> wrote:
>>> Hi Ahmad,
>>> 
>>> Flink's watermark mechanism guarantees that when you receive a watermark for
time t all records with a timestamp smaller than t have been received as well.
>>> Records emitted from a window have the timestamp of their end time. So, the ProcessFunction
receives a timestamp for 12:00:00 you can be sure that you also received all records for windows
that closed before 12:00:00.
>>> The function should buffer all records it receives between watermarks as state
and once it receives a watermark (triggering of a registered event-time timer) it should write
the buffered records out.
>>> 
>>> Btw. this only works for event time windows but not for processing time.
>>> 
>>> Cheers, Fabian
>>> 
>>> 2017-06-22 16:44 GMT+02:00 Ahmad Hassan <ahmad.hassan@gmail.com>:
>>>> Hi Stefan,
>>>> 
>>>> How process function would know that the last window result has arrived?
Because slidingwindows slide every 5 minutes which means that window of new time-range or
new watermark will arrive after 5 minutes. 
>>>> 
>>>> Thanks
>>>> 
>>>> 
>>>>> On 22 June 2017 at 15:10, Stefan Richter <s.richter@data-artisans.com>
wrote:
>>>>> The process function has the signature
>>>>> 
>>>>> void processElement(I value, Context ctx, Collector<O> out) throws
Exception
>>>>> where the context is providing access to the current watermark and you
can also register timer callbacks, when that trigger when a certain watermark is reached.
You can simply monitor the watermark through the context for each incoming window result.
Start time is not important, because you know that you have collected the results for all
windows with a smaller end time than the watermark that you currently see in the context,
because this is Flinkā€™s notion of completeness. This means you can prepare those windows
and aggregate results and send them downstream to the sink.
>>>>> 
>>>>>> Am 22.06.2017 um 15:46 schrieb Ahmad Hassan <ahmad.hassan@gmail.com>:
>>>>>> 
>>>>>> Thanks Stefan, But how the Process function will have these watermarks?
I have sliding windows like below
>>>>>> 
>>>>>> final DataStream<WindowStats> eventStream = inputStream
>>>>>> 				.keyBy(TENANT, CATEGORY)
>>>>>> 				.window(SlidingProcessingTimeWindows.of(Time.minutes(100,Time.minutes(5)))
>>>>>> 				.fold(new WindowStats(), new ProductAggregationMapper(), new
ProductAggregationWindowFunction());
>>>>>> 
>>>>>> Window results are coming every 5 minutes after first window output.
How the process function would know that all the windows for a Tenant have finished for a
giving start and end time.
>>>>>> 
>>>>>> Thanks for help.
>>>>>> 
>>>>>> Cheers,
>>>>>> 
>>>>>>> On 22 June 2017 at 14:37, Stefan Richter <s.richter@data-artisans.com>
wrote:
>>>>>>> Hi,
>>>>>>> 
>>>>>>> one possible approach could be that you have a process function
before the sink. Process function is aware of watermarks, so it can collect and buffer window
results until it sees a watermark. This is the signal that all results for windows with an
end time smaller than the watermark are complete. They can then be aggregated and send to
the sink.
>>>>>>> 
>>>>>>> Best,
>>>>>>> Stefan
>>>>>>> 
>>>>>>> > Am 22.06.2017 um 15:15 schrieb Ahmad Hassan <ahmad.hassan@gmail.com>:
>>>>>>> >
>>>>>>> > Hi All,
>>>>>>> >
>>>>>>> > I am using categoryID as a keyby attribute for creating
keyed stream from a product event stream. Keyed stream then creates time windows for each
category. However, when the window time expires, i want to write the output data of all the
products in all all categories in a single atomic operation collectively. Is there a way to
call a single sink function for all the windows with same start and end time. Or is there
a way in flink to know that all windows with same end time have finished processing their
sink function?
>>>>>>> >
>>>>>>> > Currently, each window calls sink function individually.
>>>>>>> >
>>>>>>> > cheers,
>>>>>>> >
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 

Mime
View raw message