flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Continuing from the stackoverflow post
Date Mon, 30 Nov 2015 14:50:50 GMT
Sorry, I have to correct myself. The windowing semantics are not easy ;-)

2015-11-30 15:34 GMT+01:00 Fabian Hueske <fhueske@gmail.com>:

> Hi Nirmalya,
> thanks for the detailed description of your understanding of Flink's
> window semantics.
> Most of it is correct, but a few things need a bit of correction ;-)
> Please see my comments inline.
> 2015-11-28 4:36 GMT+01:00 Nirmalya Sengupta <sengupta.nirmalya@gmail.com>:
>> Hello Fabian,
>> A little long mail; please have some patience.
>> From your response: ' Let's start by telling me what you actually want
>> to do ;-) '
>> At a broad level, I want to write a (series of, perhaps) tutorial of
>> Flink, where these concepts are brought out by a mix of definition,
>> elaboration, illustration and of course, code snippets. If that helps the
>> community, I will be very happy. In the least, I will understand the
>> principles and their application, much better. So, I am a bit selfish here
>> perhaps. You also mention that you are preparing some such  material. If I
>> can complement your effort, I will be delighted.
> That sounds great! We are almost done with the blog post and will publish
> it soon. Looking forward to your feedback :-)
>> One never knows: going further, I may become a trainer / evangelist of
>> Apache Flink, if I show enough grasp of the subject! :-)
>> Now to this particular question (from SOF):
>> When I began, my intention was to find maximum temperature, *every 5
>> successive records* (to answer your question).
>> As I have said before, I am learning and hence, trying with various
>> operator combinations on the same set of data to see what happens and then,
>> trying to explain why that happens.
>> Let's refer to the code again:
>> val readings =
>>       readIncomingReadings(env,"./sampleIOTTiny.csv")
>>       .map(e => (e.sensorUUID,e.ambientTemperature))
>>       .timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS))
>>       .trigger(CountTrigger.of(5))
>>       .evictor(CountEvictor.of(4))
>>       .maxBy(1)
>> So, what I understand is this:
>> timeWindowAll defines a pane of 5 msecs. When this time expires, the
>> timeWindowAll fires a kind of *onExpirationOf* trigger (I have
>> fabricated the name, quite likely it doesn't exist). This perhaps does
>> nothing other than passing to the function (here, *maxBy() *) the
>> contents of the window (whatever number of elements have been collected in
>> last 5 msecs) and clearing the pane, readying it for the next 5 msecs (not
>> exactly, but more of it later).
> This is correct. timeWindowAll(5 msecs) (without additional trigger
> definitions) will create a new window every 5 msec, trigger after 5 msecs
> (call the user function), purge the window, and create a new window.
> Independent of the trigger, when a window expires, it is removed (including
> all elements it contains) and a new window is created.

THIS IS NOT CORRECT --> "Independent of the trigger, when a window expires,
it is removed (including all elements it contains) and a new window is
In fact, a window is only removed if a trigger returns FIRE_AND_PURGE or
PURGE. The default time windows (without additional Trigger) purge their
content at their end time. If you apply a trigger that does *not* purge the
content of the window after it expires, it will consume memory forever.

>> However, I provide a CountTrigger (5 elements). According to the rules
>> of Flink, this trigger replaces the aforementioned default onExpirationOf
>> trigger. Therefore, when timeWindowAll is ready after 5 msecs have
>> passed, what it finds available to fire is this CountTrigger. However, a
>> CountTrigger can fire only if its count-related (here, 5) criterion is
>> satisfied. So, after 5 msecs have passed, if the number of elements
>> collected in timeWindowAll pane is >= 5, *only* then CountTrigger will
>> fire; otherwise, CountTrigger will not stir and timeWindowAll will shrug
>> its shoulders and go back to wait for the next 5 msecs period.
> If you define a CountTrigger(5), it will triggered exactly once when
> exactly 5 elements are in the window. Even if there are 2mecs for the
> window left. This will also replace the current trigger, that would trigger
> at 5 msecs, i.e., the window is only evaluated once after the 5th element
> was inserted. It depends on the trigger, what happens with the elements in
> the pane after the function has been called. If you look at the Trigger
> interface, you'll find that TriggerResult might be FIRE or FIRE_AND_PURGE
> (among others). FIRE will call the user function and leave the elements in
> the window. FIRE_AND_PURGE will call the user function, purge (delete) the
> window, and create a new window within the same time bounds.
>> Going further, I provide a CountEvictor. According to the rules of
>>  Flink, an Evictor is allowed to act only when its associated trigger
>> (here, CountTrigger) is fired. Because of its presence, a further check
>> is made on the contents of the pane. If CountTigger is fired, the number
>> of elements collected in the pane must be >= 5. However, the evictor is
>> interested only in first 4 of them. The evictor *takes away* these 4
>> from timeWindowAll's pane and gives them to the function. The 5th
>> element still remains in the pane. timeWindowAll readies itself for next
>> 5 msecs, but its pane is not empty this time. It still has that solitary
>> element there.
> When the CountTrigger(5) fires, exactly 5 elements are in the window pane.
> A CountEvictor(4) will remove the first element from the pane such that
> only 4 elements remain, before it calls the user function to evaluate the
> window. It depends on the TriggerResult, what happens with the four
> elements after the user function was invoked. A CountTrigger keeps the
> elements in the window.
>> This much seems straightforward but there is a twist in the tale.
>> A very important point about timeWindowAll's pane is its ephemeral
>> nature. When I specify timeWindowAll(Time.of(5,TimeUnit.MILLISECONDS)),
>> Flink understands this as an instruction to create a pane every successive
>> 5 msecs period. Flink doesn't create one pane which is cleared after every
>> 5 msecs (this is what I inexactly mentioned earlier), and readied for the
>> next 5 msecs. Instead, it brings into existence a *freshly minted pane,*
>> every 5 msecs. The contents of the preceding pane are subjected to trigger
>> (if it exists) and evictor (if it exists) and finally, function (if it is
>> provided). Then, Flink *dumps the preceding pane* along with its
>> contents, if any and readies the new pane, awaiting elements during the
>> next 5 msecs.
> This is correct.
>> In my case above, the criteria of timeWindowAll and Trigger/Evictor are
>> not locked in step as precisely as they should have been. It is quite
>> possible that while CountTrigger fires because 5 elements are already in
>> the pane, 5 msecs are *yet to lapse*. So, the current pane of
>> timeWindowAll is still alive and is collecting subsequent elements
>> arriving. The evictor takes away 4 elements. The remaining element is
>> joined by a few more *before* 5 msecs lapse. After 5 msecs have lapsed,
>> Flink extirpates the pane - along with its current contents - and creates a
>> fresh pane. In effect, some elements which arrive and are collected in the
>> pane *never reach* the trigger/evictor pair. These unfortunate elements
>> are destroyed along with the pane in which they reside. Obviously, this
>> affects the output that I see. The calculation of maximum temperature is
>> inaccurate simply because some of the temperature readings are never
>> available to the _maxBy_ function.
>> Have I got it almost correct? Will be keen to hear from you.
> In fact there is a quite easy solution for your issue, you should not use
> a time window but a count window instead:
> val readings =
>       readIncomingReadings(env,"./sampleIOTTiny.csv")
>       .map(e => (e.sensorUUID,e.ambientTemperature))
>       .countWindowAll(5)
>       .maxBy(1)
> This will give you a tumbling count window, that calls maxBy() when ever 5
> elements arrived.
> See the documentation for details [1]
> Please let me know if you have further questions,
> Fabian
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-0.10/apis/streaming_guide.html#windows-on-unkeyed-data-streams
>> -- Nirmalya
>> --
>> Software Technologist
>> http://www.linkedin.com/in/nirmalyasengupta5
>> "If you have built castles in the air, your work need not be lost. That
>> is where they should be.
>> Now put the foundation under them."

View raw message