flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: [DISCUSS] Allowed Lateness in Flink
Date Thu, 28 Jul 2016 17:13:54 GMT
Hi all,

Yes I can do that. 
I believe that there are a lot of interesting ideas to discuss 
but also some important performance related issues that 
we have to consider.

More on the upcoming FLIP ;)

Thanks,
Kostas

> On Jul 28, 2016, at 6:28 PM, Aljoscha Krettek <aljoscha@apache.org> wrote:
> 
> Hi,
> yes, this is also what I hinted at in my earlier email about the
> "SimpleTrigger" interface. We should keep the interface we currently have
> and maybe extend it a bit while adding a new DSL of simpler/composable
> triggers that can be executed in side on of the classic Triggers.
> 
> For now, we kept this under the umbrella of the "Improve Windowing in
> Flink" doc but you're right, it might be time to move this to its own FLIP.
> Kostas, would you like to do this since you already started on a
> proof-of-concept implementation? (at least that's what I gathered from the
> earlier email)
> 
> Cheers,
> Aljoscha
> 
> On Thu, 28 Jul 2016 at 16:28 Radu Tudoran <radu.tudoran@huawei.com> wrote:
> 
>> Hi,
>> 
>> IMHO I think we should still maintain user specific triggers and I think
>> there will always be corner cases where a very specific trigger will be
>> needed to be constructed. With this being said, I think the idea of
>> supporting also some state machine to be generated for the trigger is very
>> good. Will you start a FLIP document for this?
>> 
>> 
>> 
>> -----Original Message-----
>> From: Aljoscha Krettek [mailto:aljoscha@apache.org]
>> Sent: Thursday, July 28, 2016 3:47 PM
>> To: dev@flink.apache.org
>> Subject: Re: [DISCUSS] Allowed Lateness in Flink
>> 
>> Another (maybe completely crazy) idea is to regard the triggers really as
>> a DSL and use compiler techniques to derive a state machine that you use to
>> do the actual triggering.
>> 
>> With this, the "trigger" objects that make up the tree of triggers would
>> not contain any logic themselves. A trigger specification such as
>> And(AfterWatermark, Count(5)) would simply be an AST of our "trigger
>> language" and from this we derive that our trigger waits on the watermark
>> and also until the element count is at least 5. We would generate a compact
>> state machine for this that is updated with incoming elements and sometimes
>> (that's the somewhat tricky) party checked for whether we should fire.
>> 
>> The advantages of this are:
>> - Possibly very tight state representation that is known at job
>> specification time.
>> - No dealing with user specified triggers since our DSL is strictly
>> specified by us
>> 
>> The disadvantages are:
>> - No user specified triggers inside the DSL
>> 
>> The last part would be mitigated by still allowing users to write triggers
>> for the current Trigger API if they want/need all the power that that
>> provides.
>> 
>> Just some thoughts...
>> 
>> Cheers,
>> Aljoscha
>> 
>> On Tue, 26 Jul 2016 at 14:31 Kostas Kloudas <k.kloudas@data-artisans.com>
>> wrote:
>> 
>>> And also I think that the shouldFire has to take as an additional
>>> argument the time. This will help differentiate between ON_TIME and
>>> EARLY, LATE firings.
>>> 
>>>> On Jul 26, 2016, at 11:02 AM, Kostas Kloudas <
>>> k.kloudas@data-artisans.com> wrote:
>>>> 
>>>> Hello,
>>>> 
>>>> This is a nice proposal that I think covers most of the cases.
>>>> The only thing that is missing would be a method:
>>>> 
>>>> void onFire(Window window, TriggerContext ctx)
>>>> 
>>>> that will be responsible for doing whatever is necessary if the
>>>> windowOperator decides to fire. You can imagine resetting the
>>>> counter of a countTrigger to 0.
>>>> 
>>>> As a recap, the SimpleTrigger interface should be:
>>>> 
>>>> class SimpleTrigger {
>>>> void onElement(T element, long timestamp, W window, TriggerContext
>>>> ctx); boolean shouldFire(W window, TriggerContext cox);
>>>> 
>>>> void onMerge(W window, OnMergeContext cox); void onFire(Window
>>>> window, TriggerContext ctx) void clear(W window, TriggerContext
>>>> ctx); }
>>>> 
>>>> The onMerge and onFire methods can be seen as callbacks and will be
>>>> applied upon merge (in case of Session windows) and upon firing.
>>>> 
>>>> What do you think?
>>>> 
>>>> Kostas
>>>> 
>>>>> On Jul 25, 2016, at 3:34 PM, Aljoscha Krettek <aljoscha@apache.org>
>>> wrote:
>>>>> 
>>>>> Hi,
>>>>> yes, this is essentially the solution I had in my head but I went a
>>>>> bit further and generalized it.
>>>>> 
>>>>> Basically, to make triggers composable they should have this
>>>>> interface, let's call it SimpleTrigger for now:
>>>>> 
>>>>> class SimpleTrigger {
>>>>> void onElement(T element, long timestamp, W window, TriggerContext
>>>>> ctx); boolean shouldFire(W window, TriggerContext ctx); void
>>>>> onMerge(W window, OnMergeContext ctx); void clear(W window,
>>>>> TriggerContext ctx); }
>>>>> 
>>>>> notice how onElement() cannot return a TriggerResult anymore and
>>>>> how
>>>>> onEventTime() and onProcessingTime() of the currently existing
>>>>> Trigger interface were folded into shouldFire(). Each trigger
>>>>> essentially
>>> becomes a
>>>>> predicate that says at any given time whether they would fire the
>>> window.
>>>>> Having just one method that can decide whether to fire or not makes
>>> these
>>>>> easily composable to form complex triggers, thus enabling the
>>>>> trigger
>>> DSL
>>>>> we want to implement.
>>>>> 
>>>>> The way to go about implementing this is either to replace our
>>>>> current Trigger interface by this new interface or to keep our more
>>>>> powerful interface with all the customization options and have one
>>>>> SimpleTriggerTrigger that can execute a tree of SimpleTriggers. A
>>>>> rough sketch of this would be this:
>>>>> https://gist.github.com/aljoscha/66b0fcab89cd2b6190a63899f461067f
>>>>> 
>>>>> Cheers,
>>>>> Aljoscha
>>>>> 
>>>>> 
>>>>> 
>>>>> On Mon, 25 Jul 2016 at 14:33 Kostas Kloudas <
>>> k.kloudas@data-artisans.com>
>>>>> wrote:
>>>>> 
>>>>>> Hi Aljoscha,
>>>>>> 
>>>>>> This was exactly one of the problems I also found.
>>>>>> 
>>>>>> The way I was thinking about it is the following:
>>>>>> 
>>>>>> Conceptually, time (event and processing) advances but state is a
>>>>>> fixed property of the window.
>>>>>> 
>>>>>> Given this, I modified the Count trigger to also ask for the
>>>>>> current state (count) of the window in all method (e.g.
>>>>>> onEventTime and onProcessingTime). This way the trigger can be
>>>>>> composed and play well with the other triggers.
>>>>>> 
>>>>>> If you have any more ideas on that and the rest of the problems I
>>>>>> sent in the previous email, please let me know.
>>>>>> 
>>>>>> Kostas
>>>>>> 
>>>>>>> On Jul 25, 2016, at 2:22 PM, Aljoscha Krettek
>>>>>>> <aljoscha@apache.org>
>>>>>> wrote:
>>>>>>> 
>>>>>>> These are some very interesting thoughts! I have some more, based
>>>>>>> on
>>>>>> these:
>>>>>>> 
>>>>>>> What happens if you have for example this Trigger:
>>>>>>> All(Watermark.pastEndOfWindow(), Count.atLeast(10))
>>>>>>> 
>>>>>>> When would this even fire, i.e. what are the steps that lead
to
>>>>>>> this combined trigger firing with the Trigger system that we
>>>>>>> currently
>>> have in
>>>>>>> place?
>>>>>>> 
>>>>>>> I have some thoughts but they are not compatible with the way
we
>>>>>> currently
>>>>>>> handle triggers. I have to think some more, but please shoot
if
>>>>>>> you
>>> have
>>>>>>> any ideas.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Aljoscha
>>>>>>> 
>>>>>>> On Fri, 22 Jul 2016 at 13:10 Kostas Kloudas <
>>> k.kloudas@data-artisans.com
>>>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>>> Forgot to say that the signature for the onFire() that I
think
>>>>>>>> fits
>>>>>> should
>>>>>>>> be:
>>>>>>>> 
>>>>>>>> void onFire(Window window, TriggerContext ctx) throws Exception;
>>>>>>>> 
>>>>>>>>> On Jul 22, 2016, at 12:47 PM, Kostas Kloudas <
>>>>>>>> k.kloudas@data-artisans.com> wrote:
>>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> I started working on the new triggers proposed here and
so far
>>>>>>>>> I can
>>>>>> see
>>>>>>>>> two shortcomings in the current state of the triggers
that do
>>>>>>>>> not
>>> play
>>>>>>>> well
>>>>>>>>> with the new proposals, and more specifically the composite
>>>>>>>>> triggers
>>>>>> All
>>>>>>>>> and Any.
>>>>>>>>> 
>>>>>>>>> So here it goes:
>>>>>>>>> 
>>>>>>>>> 1) In the document posted above, there are some new useful
>>>>>>>>> trigger
>>>>>>>> combinations (like Any and All) which allow
>>>>>>>>> for combining primitive triggers. This decouples the
>>>>>>>>> TriggerResult
>>> of
>>>>>>>> each individual trigger from the action that
>>>>>>>>> is actually going to be executed. For example, an All
trigger
>>>>>>>>> may
>>> have
>>>>>>>> one proposing FIRE while the other
>>>>>>>>> CONTINUE and the final result will be CONTINUE.
>>>>>>>>> 
>>>>>>>>> In this case, any action that  should be taken by each
>>>>>>>>> individual
>>>>>>>> trigger upon firing, e.g. cleaning its state as in the
>>>>>>>>> case of CountTrigger, should be postponed until the parent
>>>>>>>>> trigger
>>>>>> (All)
>>>>>>>> decides to fire.
>>>>>>>>> 
>>>>>>>>> For this, there should be a onFire() method in each trigger
>>>>>>>>> that
>>> does
>>>>>>>> exactly that. This method should be called in the
>>>>>>>>> fireOrCleanup() of the windowOperator, when the firing
is
>>> successful.
>>>>>>>>> 
>>>>>>>>> 2) In the current implementation, when stateful triggers,
like
>>>>>>>>> the
>>>>>>>> CountTrigger, are combined in a composite Trigger
>>>>>>>>> (with Any or All) their state is shared because the stateHandle
>>>>>>>>> is
>>> the
>>>>>>>> same for both. To solve this, the handle should
>>>>>>>>> become unique, BUT consistent for the same Trigger. The
latter
>>> implies
>>>>>>>> that the handle for the same trigger after
>>>>>>>>> a node failure, should be the same as that of its predecessor
>>> (before
>>>>>>>> the failure).
>>>>>>>>> 
>>>>>>>>> Let me know your thoughts on these.
>>>>>>>>> 
>>>>>>>>> Kostas
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>>> On Jul 21, 2016, at 10:24 AM, Aljoscha Krettek <
>>> aljoscha@apache.org>
>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> I'm proposing to get this small change into 1.1:
>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-4239
This will
>>>>>>>>>> make
>>> our
>>>>>>>> lives
>>>>>>>>>> easier with the future proposed changes.
>>>>>>>>>> 
>>>>>>>>>> What do you think?
>>>>>>>>>> On Tue, 19 Jul 2016 at 11:41 Aljoscha Krettek
>>>>>>>>>> <aljoscha@apache.org
>>>> 
>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi,
>>>>>>>>>>> these new features should make it into the 1.2
release. We
>>>>>>>>>>> are
>>>>>> already
>>>>>>>>>>> working on releasing 1.1 so it won't make it
for that one.
>>>>>>>> unfortunately.
>>>>>>>>>>> 
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Aljoscha
>>>>>>>>>>> 
>>>>>>>>>>> On Mon, 18 Jul 2016 at 23:19 Chen Qin <qinnchen@gmail.com>
>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> BTW, do you have rough timeline in term of
roll out it to
>>>>>> production?
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Chen
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On Mon, Jul 18, 2016 at 2:46 AM, Aljoscha
Krettek <
>>>>>>>> aljoscha@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>> Chen commented this on the doc (I'm mirroring
here so
>>>>>>>>>>>>> everyone
>>> can
>>>>>>>>>>>> follow):
>>>>>>>>>>>>> "It would be cool to be able to access
last snapshot of
>>>>>>>>>>>>> window
>>>>>> states
>>>>>>>>>>>>> before it get purged. Pipeline author
might consider put it
>>>>>>>>>>>>> to
>>>>>>>> external
>>>>>>>>>>>>> storage and deal with late arriving events
by restore
>>> corresponding
>>>>>>>>>>>>> window."
>>>>>>>>>>>>> 
>>>>>>>>>>>>> My answer:
>>>>>>>>>>>>> This is partially covered by the section
called "What
>>>>>>>>>>>>> Happens at Window-Cleanup Time, Who Decides
When to Purge".
>>>>>>>>>>>>> What I want to
>>>>>>>>>>>> introduce
>>>>>>>>>>>>> is that the window can have one final
emission if there is
>>>>>>>>>>>>> new
>>> data
>>>>>>>> in
>>>>>>>>>>>> the
>>>>>>>>>>>>> buffers at cleanup time.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The work on this will also depend on
this proposal:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Win
>>> dow+Function+Metadata
>>>>>>>>>>>>> With
>>>>>>>>>>>>> this, the WindowFunction can get meta
data about the window
>>> firing
>>>>>>>> so it
>>>>>>>>>>>>> could be informed that this is the last
firing before a
>>>>>>>>>>>>> cleanup
>>> and
>>>>>>>> that
>>>>>>>>>>>>> there already was an earlier, on-time
firing.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Does this cover your concerns, Chen?
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On Sun, 10 Jul 2016 at 21:24 Chen Qin
<qinnchen@gmail.com>
>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Sure. Currently, it looks like any
element assigned to a
>>>>>>>>>>>>>> too
>>> late
>>>>>>>>>>>> window
>>>>>>>>>>>>>> will be dropped silently😓 ?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Having a late window stream imply
somehow Flink needs to
>>>>>>>>>>>>>> add
>>> one
>>>>>>>> more
>>>>>>>>>>>>> state
>>>>>>>>>>>>>> to window and split window state
cleanup from window
>>> retirement.
>>>>>>>>>>>>>> I would suggest as simple as adding
a function in trigger
>>> called
>>>>>>>>>>>>>> OnLateElement and always fire_purge
it would enable
>>>>>>>>>>>>>> developer
>>>>>> aware
>>>>>>>>>>>> and
>>>>>>>>>>>>>> handle this case.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Chen
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Fri, Jul 8, 2016 at 1:00 AM, Aljoscha
Krettek <
>>>>>>>> aljoscha@apache.org
>>>>>>>>>>>>> 
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> @Chen I added a section at the
end of the document
>>>>>>>>>>>>>>> regarding
>>>>>> access
>>>>>>>>>>>> to
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> elements that are dropped as
late. Right now, the section
>>>>>>>>>>>>>>> just
>>>>>>>>>>>> mentions
>>>>>>>>>>>>>>> that we have to do this but there
is no real proposal yet
>>>>>>>>>>>>>>> for
>>> how
>>>>>>>>>>>> to do
>>>>>>>>>>>>>> it.
>>>>>>>>>>>>>>> Only a rough sketch so that we
don't forget about it.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On Fri, 8 Jul 2016 at 07:47 Chen
Qin <qinnchen@gmail.com>
>>> wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> +1 for allowedLateness scenario.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> The rationale behind is there
are backfills or data
>>>>>>>>>>>>>>>> issues
>>> hold
>>>>>>>>>>>>>> in-window
>>>>>>>>>>>>>>>> data till watermark pass
end time. It cause sink write
>>> partial
>>>>>>>>>>>>> output.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Allow high allowedLateness
threshold makes life easier
>>>>>>>>>>>>>>>> to
>>> merge
>>>>>>>>>>>> those
>>>>>>>>>>>>>>>> results and overwrite partial
output with correct output
>>>>>>>>>>>>>>>> at
>>>>>> sink.
>>>>>>>>>>>> But
>>>>>>>>>>>>>>> yeah,
>>>>>>>>>>>>>>>> pipeline author is at risk
of blow up statebackend with
>>>>>>>>>>>>>>>> huge
>>>>>>>>>>>> states.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Alternatively, In some case,
if sink allows
>>>>>>>>>>>>>>>> read-check-merge
>>>>>>>>>>>>> operation,
>>>>>>>>>>>>>>>> window can explicit call
out events ingested after
>>>>>>>>>>>> allowedLateness.
>>>>>>>>>>>>> It
>>>>>>>>>>>>>>> asks
>>>>>>>>>>>>>>>> pipeline author mitigated
these events in a way outside
>>>>>>>>>>>>>>>> of
>>> flink
>>>>>>>>>>>>>>> ecosystem.
>>>>>>>>>>>>>>>> The catch is that since everywhere
in a flink job can
>>>>>>>>>>>>>>>> replay
>>> and
>>>>>>>>>>>>>>>> checkpoint, notification
should somehow includes these
>>>>>>>>>>>>>>>> info
>>> as
>>>>>>>>>>>> well.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>> Chen
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> On Thu, Jul 7, 2016 at 12:14
AM, Kostas Kloudas <
>>>>>>>>>>>>>>>> k.kloudas@data-artisans.com
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> In the effort to move
the discussion to the mailing
>>>>>>>>>>>>>>>>> list,
>>>>>> rather
>>>>>>>>>>>>> than
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> doc,
>>>>>>>>>>>>>>>>> there was a comment in
the doc:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> “It seems this proposal
marries the allowed lateness of
>>> events
>>>>>>>>>>>> and
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> discarding of window
state. In most use cases this
>>>>>>>>>>>>>>>>> should be
>>>>>>>>>>>>>>> sufficient,
>>>>>>>>>>>>>>>>> but there are instances
where having independent
>>>>>>>>>>>>>>>>> control of
>>>>>>>>>>>> these
>>>>>>>>>>>>> may
>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> useful.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> For instance, you may
have a job that computes some
>>> aggregate,
>>>>>>>>>>>>> like a
>>>>>>>>>>>>>>>> sum.
>>>>>>>>>>>>>>>>> You may want to keep
the window state around for a
>>>>>>>>>>>>>>>>> while,
>>> but
>>>>>>>>>>>> not
>>>>>>>>>>>>> too
>>>>>>>>>>>>>>>> long.
>>>>>>>>>>>>>>>>> Yet you may want to continue
processing late events
>>>>>>>>>>>>>>>>> after
>>> you
>>>>>>>>>>>>>> discarded
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> window state. It is possible
that your stream sinks can
>>>>>>>>>>>>>>>>> make
>>>>>>>>>>>> use of
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>>>> data. For instance, they
may be writing to a data store
>>>>>>>>>>>>>>>>> that
>>>>>>>>>>>>> returns
>>>>>>>>>>>>>> an
>>>>>>>>>>>>>>>>> error if a row already
exists, which allow the sink to
>>>>>>>>>>>>>>>>> read
>>> the
>>>>>>>>>>>>>>> existing
>>>>>>>>>>>>>>>>> row and update it with
the new data."
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> To which I would like
to reply:
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> If I understand your
use-case correctly, I believe that
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>> binding of the allowed
lateness to the state purging
>>>>>>>>>>>>>>>>> does
>>> not
>>>>>>>>>>>>> impose
>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>>> problem. The lateness
specifies the upper time bound,
>>>>>>>>>>>>>>>>> after
>>>>>>>>>>>> which
>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> state
>>>>>>>>>>>>>>>>> will be discarded. Between
the start of a window and
>>>>>>>>>>>>>>>>> its
>>> (end +
>>>>>>>>>>>>>>>>> allowedLateness) you
can write custom triggers that
>>>>>>>>>>>>>>>>> fire,
>>> purge
>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> state,
>>>>>>>>>>>>>>>>> or do nothing. Given
this, I suppose that, at the most
>>> extreme
>>>>>>>>>>>>> case,
>>>>>>>>>>>>>>> you
>>>>>>>>>>>>>>>>> can specify an allowed
lateness of Long.MaxValue and do
>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>> purging
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> state "manually". By
doing this, you remove the
>>>>>>>>>>>>>>>>> safeguard of
>>>>>>>>>>>>> letting
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>> system purge the state
at some point in time, and you
>>>>>>>>>>>>>>>>> can do
>>>>>>>>>>>> your
>>>>>>>>>>>>> own
>>>>>>>>>>>>>>>>> custom state management
that fits your needs.
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>> Kostas
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Jul 6, 2016, at
5:43 PM, Aljoscha Krettek <
>>>>>>>>>>>>> aljoscha@apache.org>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> @Vishnu Funny you
should ask that because I have a
>>>>>>>>>>>>>>>>>> design
>>> doc
>>>>>>>>>>>>> lying
>>>>>>>>>>>>>>>>> around.
>>>>>>>>>>>>>>>>>> I'll open a new mail
thread to not hijack this one.
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>> On Wed, 6 Jul 2016
at 17:17 Vishnu Viswanath <
>>>>>>>>>>>>>>>>> vishnu.viswanath25@gmail.com>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I was going through
the suggested improvements in
>>>>>>>>>>>>>>>>>>> window,
>>>>>>>>>>>> and I
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>>>> few questions/suggestion
on improvement regarding the
>>>>>>>>>>>> Evictor.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 1) I am having
a use case where I have to create a
>>>>>>>>>>>>>>>>>>> custom
>>>>>>>>>>>>> Evictor
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>>> evict elements
from the window based on the value
>>>>>>>>>>>>>>>>>>> (e.g.,
>>> if I
>>>>>>>>>>>>> have
>>>>>>>>>>>>>>>>> elements
>>>>>>>>>>>>>>>>>>> are of case class
Item(id: Int, type:String) then
>>>>>>>>>>>>>>>>>>> evict
>>>>>>>>>>>> elements
>>>>>>>>>>>>>>> that
>>>>>>>>>>>>>>>>> has
>>>>>>>>>>>>>>>>>>> type="a"). I
believe this is not currently possible.
>>>>>>>>>>>>>>>>>>> 2) this is somewhat
related to 1) where there should
>>>>>>>>>>>>>>>>>>> be an
>>>>>>>>>>>>> option
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> evict
>>>>>>>>>>>>>>>>>>> elements from
anywhere in the window. not only from
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>> beginning
>>>>>>>>>>>>>> of
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> window. (e.g.,
apply the delta function to all
>>>>>>>>>>>>>>>>>>> elements
>>> and
>>>>>>>>>>>>> remove
>>>>>>>>>>>>>>> all
>>>>>>>>>>>>>>>>>>> those don't pass.
I checked the code and evict method
>>>>>>>>>>>>>>>>>>> just
>>>>>>>>>>>>> returns
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>> number of elements
to be removed and
>>>>>>>>>>>>>>>>>>> processTriggerResult
>>>>>>>>>>>> just
>>>>>>>>>>>>>> skips
>>>>>>>>>>>>>>>>> those
>>>>>>>>>>>>>>>>>>> many elements
from the beginning.
>>>>>>>>>>>>>>>>>>> 3) Add an option
to enables the user to decide if the
>>>>>>>>>>>> eviction
>>>>>>>>>>>>>>> should
>>>>>>>>>>>>>>>>>>> happen before
the apply function or after the apply
>>> function.
>>>>>>>>>>>>>>>> Currently
>>>>>>>>>>>>>>>>> it
>>>>>>>>>>>>>>>>>>> is before the
apply function, but I have a use case
>>>>>>>>>>>>>>>>>>> where
>>> I
>>>>>>>>>>>> need
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>> first
>>>>>>>>>>>>>>>>>>> apply the function
and evict afterward.
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> I am doing these
for a POC so I think I can modify
>>>>>>>>>>>>>>>>>>> the
>>> flink
>>>>>>>>>>>>> code
>>>>>>>>>>>>>>> base
>>>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>>>>>> make these changes
and build, but I would appreciate
>>>>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>> suggestion
>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>> whether these
are viable changes or will there any
>>>>>>>>>>>> performance
>>>>>>>>>>>>>> issue
>>>>>>>>>>>>>>>> if
>>>>>>>>>>>>>>>>>>> these are done.
Also any pointer on where to
>>>>>>>>>>>>>>>>>>> start(e.g,
>>> do I
>>>>>>>>>>>>>> create
>>>>>>>>>>>>>>> a
>>>>>>>>>>>>>>>>> new
>>>>>>>>>>>>>>>>>>> class similar
to EvictingWindowOperator that extends
>>>>>>>>>>>>>>> WindowOperator?)
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> Thanks and Regards,
>>>>>>>>>>>>>>>>>>> Vishnu Viswanath,
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> On Wed, Jul 6,
2016 at 9:39 AM, Aljoscha Krettek <
>>>>>>>>>>>>>>> aljoscha@apache.org
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> I did:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>> https://mail-archives.apache.org/mod_mbox/flink-dev/201606.mbox/%3cCAN
>>> MXwW0AbTTjjg9EWdxRUGxkjM7jscBeNmVRZOHPt2qO3pQMwA@mail.gmail.com%3e
>>>>>>>>>>>>>>>>>>>> ;-)
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> On Wed, 6
Jul 2016 at 15:31 Ufuk Celebi
>>>>>>>>>>>>>>>>>>>> <uce@apache.org>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> On Wed,
Jul 6, 2016 at 3:19 PM, Aljoscha Krettek <
>>>>>>>>>>>>>>>> aljoscha@apache.org
>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>> In
the future, it might be good to to discussions
>>>>>>>>>>>> directly on
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> ML
>>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>>>> then
change the document accordingly. This way
>>>>>>>>>>>>>>>>>>>>>> everyone
>>>>>>>>>>>> can
>>>>>>>>>>>>>>> follow
>>>>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>>>>>>>> discussion
on the ML. I also feel that Google Doc
>>> comments
>>>>>>>>>>>>>> often
>>>>>>>>>>>>>>>>>>> don't
>>>>>>>>>>>>>>>>>>>>> give
>>>>>>>>>>>>>>>>>>>>>> enough
space for expressing more complex opinions.
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>>> I agree!
Would you mind raising this point as a
>>>>>>>>>>>>>>>>>>>>> separate
>>>>>>>>>>>>>>> discussion
>>>>>>>>>>>>>>>> on
>>>>>>>>>>>>>>>>>>>> dev@
>>>>>>>>>>>>>>>>>>>>> ?
>>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>> 
>>> 
>> 


Mime
View raw message