Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 992C0200B45 for ; Fri, 15 Jul 2016 13:28:28 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 97BC4160A6C; Fri, 15 Jul 2016 11:28:28 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E8DB9160A61 for ; Fri, 15 Jul 2016 13:28:26 +0200 (CEST) Received: (qmail 37996 invoked by uid 500); 15 Jul 2016 11:28:26 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 37984 invoked by uid 99); 15 Jul 2016 11:28:25 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 Jul 2016 11:28:25 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id C2F04C2FFA for ; Fri, 15 Jul 2016 11:28:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.429 X-Spam-Level: * X-Spam-Status: No, score=1.429 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id c5gzAch9RDvg for ; Fri, 15 Jul 2016 11:28:20 +0000 (UTC) Received: from mail-qk0-f169.google.com (mail-qk0-f169.google.com [209.85.220.169]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with ESMTPS id BAA335F217 for ; Fri, 15 Jul 2016 11:28:19 +0000 (UTC) Received: by mail-qk0-f169.google.com with SMTP id s63so98333164qkb.2 for ; Fri, 15 Jul 2016 04:28:19 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=yxf7qHcrlElp/IHx4s7Fj1EIC8N0tbr2hi2o6fk1tSk=; b=lpZJQZEzL/srwa4O8S33kLtQLak+i5mPELRNvlxPvHbMpuSBjJ0g8lAWtS5ubSDwXM 616pq3N2SYxUMae3qbPaKY+Am70lQrXwlzFUnhkUS4GHz6Sf/bpMarvns8fq7ON6TRin P7EUZ5+FgOKCcGKJg/AsfwnxRPowm5qcoWEfK8JcbLAd3KxSgOR8CrsU6aYW4RkXD25c PvEl1rFmVLLYDiZ53joNo4blEyYZyGtQAmvb4f7w8qM+levsn/Kb/leykQzy3xTZd/IZ Jk+o3bG4Dr0asNahWJ9m92/0t/ovKYUGRQ9eW3GjX6RpCEZIOmfJrZcg97/i1BGevKhw bqIQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=yxf7qHcrlElp/IHx4s7Fj1EIC8N0tbr2hi2o6fk1tSk=; b=Sd2c3/PQ6HIwmnbbE6tcQzHWOCAMdluWbhusau8Bg9XqFXHNNu3M+s4bBiCuozuSJA jALIGCSkorLgiYJB5U6F519pakwqq7tCvv3C1cpAq+Q+JAkEh92jSaRMy9wv/55nyuiA oI6HnmTMPnOaHkZNQZH3OE63GKGDhwSKVtop+LHDfvkGlI+mBoqgZLKiMaZFEY1P2Tr7 7eP43AfwwNQPo2NyIciCVWp5qBOLbSnmJJp1b/rmdr66MfkdsDJfRA+BW3R/aVu7djJ4 aKBo0bCxvvdgbW91lqGI0I59X8PqcDgUOlC4oO3xMxaudQGGR2eF1Yday1oF/aPGk4zh D3Ew== X-Gm-Message-State: ALyK8tJ6ivaRPEivNRVEe4r3qnsGoNyYuY4K3GrE6YvRNLyaQkO7UjJ+0qZKZT8Blqc3oxmO9/pJlzffJe2EZg== X-Received: by 10.55.38.197 with SMTP id m66mr24044406qkm.208.1468582092646; Fri, 15 Jul 2016 04:28:12 -0700 (PDT) MIME-Version: 1.0 Received: by 10.55.139.4 with HTTP; Fri, 15 Jul 2016 04:27:53 -0700 (PDT) In-Reply-To: References: <8B754047F81D6B4290B9F4CE928333A517B12AA9@lhreml503-mbx> <029f01d1d836$2abe6750$803b35f0$@alibaba-inc.com> <8B754047F81D6B4290B9F4CE928333A517B13144@lhreml503-mbx> From: Vishnu Viswanath Date: Fri, 15 Jul 2016 07:27:53 -0400 Message-ID: Subject: Re: [DISCUSS] Enhance Window Evictor in Flink To: Dev Content-Type: multipart/alternative; boundary=001a11457c06f4afa00537aae970 archived-at: Fri, 15 Jul 2016 11:28:28 -0000 --001a11457c06f4afa00537aae970 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi all, How do we create a FLIP page, is there any permission setup required? I don't see any "Create" page(after logging in) option in the header as mentioned in https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposa= ls Thanks, Vishnu On Wed, Jul 13, 2016 at 10:22 PM, Vishnu Viswanath < vishnu.viswanath25@gmail.com> wrote: > Hi Aljoscha, > > I agree, the user will know exactly that they are creating an EventTime > based evictor or ProcessingTime based evictor looking at the code. > So do you think it will be ok to have multiple versions of TimeEvictor > (one for event time and one for processing time) and also a DeltaEvcitor > (again 2 versions- for event time and processing time) ? > > Please note that the existing behavior of TimeEvictor/DeltaEvictor does > not consider if it is EventTime or ProcessingTime > e.g., in TimeEvictor the current time is considered as the timestamp of > the last element in the window > > *long currentTime =3D Iterables.getLast(elements).getTimestamp();* > > not the highest timestamp of all elements > what I am trying to achieve is something like: > > *long currentTime;* > * if (ctx.isEventTime()) {* > * currentTime =3D getMaxTimestamp(elements);* > * } else {* > * currentTime =3D Iterables.getLast(elements).getTimestamp();* > * }* > > Similarly, in DeltaEvictor the *`lastElement`* is > *`Iterables.getLast(elements);`* and I am thinking we should consider the > element with max timestamp as the last element instead of just getting th= e > last inserted element as *`lastElement`* > > Do you think it is the right thing to do or leave the behavior Evictors a= s > is, w.r.t to choosing the last element? > > Thanks, > Vishnu > > On Wed, Jul 13, 2016 at 11:07 AM, Aljoscha Krettek > wrote: > >> I still think it should be explicit in the class. For example, if you ha= ve >> this code: >> >> input >> .keyBy() >> .window() >> .trigger(EventTimeTrigger.create()) >> .evictor(TimeTrigger.create()) >> >> the time behavior of the trigger is explicitly specified while the evict= or >> would dynamically adapt based on internal workings that the user might n= ot >> be aware of. Having the behavior explicit at the call site is very >> important, in my opinion. >> >> On Wed, 13 Jul 2016 at 16:28 Vishnu Viswanath < >> vishnu.viswanath25@gmail.com> >> wrote: >> >> > Hi, >> > >> > I was hoping to use the isEventTime method in the WindowAssigner to se= t >> > that information in the EvictorContext. >> > What do you think?. >> > >> > Thanks and Regards, >> > Vishnu Viswanath, >> > >> > On Wed, Jul 13, 2016 at 10:09 AM, Aljoscha Krettek > > >> > wrote: >> > >> > > Hi, >> > > I think the way to go here is to add both an EventTimeEvictor and a >> > > ProcessingTimeEvictor. The problem is that "isEventTime" cannot >> really be >> > > determined. That's also the reason why there is an EventTimeTrigger >> and a >> > > ProcessingTimeTrigger. It was just an oversight that the TimeEvictor >> does >> > > not also have these two versions. >> > > >> > > About EvictingWindowOperator, I think you can make the two methods >> > > non-final in WindowOperator, yes. >> > > >> > > Cheers, >> > > Aljoscha >> > > >> > > On Wed, 13 Jul 2016 at 14:32 Vishnu Viswanath < >> > > vishnu.viswanath25@gmail.com> >> > > wrote: >> > > >> > > > Hi Aljoscha, >> > > > >> > > > I am thinking of adding a method boolean isEventTime(); in the >> > > > EvictorContext apart from >> > > > >> > > > long getCurrentProcessingTime(); >> > > > MetricGroup getMetricGroup(); >> > > > long getCurrentWatermark(); >> > > > >> > > > This method can be used to make the Evictor not iterate through al= l >> the >> > > > elements in TimeEvictor. There will be a few changes in the existi= ng >> > > > behavior of TimeEvictor and DeltaEvictor (I have mentioned this in >> the >> > > > design doc) >> > > > >> > > > Also, is there any specific reason why the open and close method i= n >> > > > WindowEvictor is made final? Since the EvictorContext will be in t= he >> > > > EvictingWindowOperator, I need to override the open and close in >> > > > EvitingWindowOperator to make the reference of EvictorContext null= . >> > > > >> > > > Thanks and Regards, >> > > > Vishnu Viswanath, >> > > > >> > > > On Fri, Jul 8, 2016 at 7:40 PM, Vishnu Viswanath < >> > > > vishnu.viswanath25@gmail.com> wrote: >> > > > >> > > > My thought process when asking if we can use state backend in wind= ow >> > > > > function was : can we add the elements to be evicted into some >> state >> > > and >> > > > > allow the evictAfter to read it from some context and remove it >> from >> > > the >> > > > > window? >> > > > > >> > > > > >> > > > > On Fri, Jul 8, 2016 at 7:30 PM, Vishnu Viswanath < >> > > > > vishnu.viswanath25@gmail.com> wrote: >> > > > > >> > > > >> Hi Aljoscha, >> > > > >> >> > > > >> Thanks for the explanation, and sorry for late reply was busy >> with >> > > work. >> > > > >> >> > > > >> I did think about this scenario, in fact in my previous mail I >> > thought >> > > > of >> > > > >> posting this question, then I understood that this problem will >> be >> > > > >> there which ever method we choose(Trigger looking for pattern o= r >> > > Window >> > > > >> looking for pattern). >> > > > >> >> > > > >> I do have a pretty good watermark but my concern is that it >> changes >> > > > based >> > > > >> on the key of these messages(I don't know if it is possible, >> haven't >> > > > >> started coding that yet. May be you could tell me). Even if it = is >> > yes >> > > > some >> > > > >> of these watermarks will be long(in days), which I don't want t= he >> > > > trigger >> > > > >> to wait that long. >> > > > >> >> > > > >> It looks like it is not easy to have an evictAfter based on >> window >> > > > >> function(without introducing coupling), but can the current >> window >> > > apply >> > > > >> function be modified to allow it to change the elements in it - >> may >> > be >> > > > >> using some state backend(I don't know how excatly the internals >> of >> > > these >> > > > >> work, so this might be a wrong question) >> > > > >> >> > > > >> Thanks and Regards, >> > > > >> Vishnu Viswanath, >> > > > >> >> > > > >> On Fri, Jul 8, 2016 at 10:20 AM, Aljoscha Krettek < >> > > aljoscha@apache.org> >> > > > >> wrote: >> > > > >> >> > > > >>> Hi Vishnu, >> > > > >>> how long would these patterns be? The Trigger would not have t= o >> > sort >> > > > the >> > > > >>> elements for every new element but just insert the new element >> into >> > > an >> > > > >>> internal data structure. Only when it sees that the watermark = is >> > > past a >> > > > >>> certain point would it check whether the pattern matches and >> > actually >> > > > >>> Trigger. >> > > > >>> >> > > > >>> A general note regarding order and event time: I think relying >> on >> > > this >> > > > >>> for >> > > > >>> computation is very tricky unless the watermark is 100 % >> correct or >> > > you >> > > > >>> completely discard elements that arrive after the watermark, >> i.e. >> > > > >>> elements >> > > > >>> that would break the promise of the watermark that no elements >> with >> > > an >> > > > >>> earlier timestamp will ever arrive. The reason for this is tha= t >> > there >> > > > >>> could >> > > > >>> always enter new elements that end up between already seen >> > elements. >> > > > For >> > > > >>> example, let's say we have this sequence of elements when the >> > trigger >> > > > >>> fires: >> > > > >>> >> > > > >>> a-b-a >> > > > >>> >> > > > >>> This is the sequence that you are looking for and you emit som= e >> > > result >> > > > >>> from >> > > > >>> the WindowFunction. Now, new elements arrive that fall in >> between >> > the >> > > > >>> elements we already have: >> > > > >>> >> > > > >>> a-d-e-b-f-g-a >> > > > >>> >> > > > >>> This is an updated, sorted view of the actual event-time strea= m >> and >> > > we >> > > > >>> didn't realize that the stream actually looks like this before= . >> > Does >> > > > this >> > > > >>> still match the original pattern or should we now consider thi= s >> as >> > > > >>> non-matching? If no, then the earlier successful match for a-b= -a >> > was >> > > > >>> wrong >> > > > >>> and we should never have processed it but we didn't know at th= e >> > time. >> > > > If >> > > > >>> yes, then pattern matching like this can be done in the Trigge= r >> by >> > > > having >> > > > >>> something like pattern slots: You don't have to store all >> elements >> > in >> > > > the >> > > > >>> Trigger, you just need to store possible candidates that could >> > match >> > > > the >> > > > >>> pattern and ignore the other (in-between) elements. >> > > > >>> >> > > > >>> Cheers, >> > > > >>> Aljoscha >> > > > >>> >> > > > >>> On Fri, 8 Jul 2016 at 14:10 Vishnu Viswanath < >> > > > >>> vishnu.viswanath25@gmail.com> >> > > > >>> wrote: >> > > > >>> >> > > > >>> > Hi Aljoscha, >> > > > >>> > >> > > > >>> > That is a good idea, trying to tie it back to the use case, >> > > > >>> > e.g., suppose trigger is looking for a pattern, a-b-a and >> when it >> > > > sees >> > > > >>> such >> > > > >>> > a pattern, it will trigger the window and it knows that now >> the >> > > > >>> Evictor is >> > > > >>> > going to evict the element b, and trigger updates its state = as >> > a-a >> > > > >>> (even >> > > > >>> > before the window & evictor completes) and will be looking f= or >> > the >> > > > >>> rest of >> > > > >>> > the pattern i.e., b-a. But I can think of 1 problem here, >> > > > >>> > >> > > > >>> > - the events can arrive out of order, i.e., the trigger >> might >> > be >> > > > >>> seeing >> > > > >>> > a pattern a-a-b but actual event time is a-b-a then trigg= er >> > will >> > > > >>> have to >> > > > >>> > sort the elements in the window everytime it sees an >> element. >> > (I >> > > > was >> > > > >>> > planning to do this sorting in the window, which will be >> less >> > > > often >> > > > >>> - >> > > > >>> > only >> > > > >>> > when the trigger fires) >> > > > >>> > >> > > > >>> > Thanks and Regards, >> > > > >>> > Vishnu Viswanath, >> > > > >>> > >> > > > >>> > On Fri, Jul 8, 2016 at 6:04 AM, Aljoscha Krettek < >> > > > aljoscha@apache.org> >> > > > >>> > wrote: >> > > > >>> > >> > > > >>> > Hi, >> > > > >>> > > come to think of it, the right place to put such checks is >> > > actually >> > > > >>> the >> > > > >>> > > Trigger. It would have to be a custom trigger that observe= s >> > time >> > > > but >> > > > >>> also >> > > > >>> > > keeps some internal state machine to decide when it has >> > observed >> > > > the >> > > > >>> > right >> > > > >>> > > pattern in the window. Then the window function would just >> have >> > > to >> > > > >>> do the >> > > > >>> > > processing and you have good separation of concerns. Does >> that >> > > make >> > > > >>> > sense? >> > > > >>> > > >> > > > >>> > > I'm ignoring time and sorting by time for now because we >> > probably >> > > > >>> need >> > > > >>> > > another design document for that. To me it seems like a >> bigger >> > > > thing. >> > > > >>> > > >> > > > >>> > > Cheers, >> > > > >>> > > Aljoscha >> > > > >>> > > >> > > > >>> > > On Thu, 7 Jul 2016 at 23:56 Vishnu Viswanath < >> > > > >>> > vishnu.viswanath25@gmail.com >> > > > >>> > > > >> > > > >>> > > wrote: >> > > > >>> > > >> > > > >>> > > > Hi, >> > > > >>> > > > >> > > > >>> > > > Regarding the evictAfter function, that evicts based on >> some >> > > > >>> decision >> > > > >>> > > made >> > > > >>> > > > by the window function: I think it will be nice if we c= an >> > come >> > > > up >> > > > >>> with >> > > > >>> > > > something that is LESS coupled, because I can think of >> > several >> > > > use >> > > > >>> > cases >> > > > >>> > > > that depend on it. >> > > > >>> > > > >> > > > >>> > > > Especially in the case where there are late arriving >> > messages. >> > > > Only >> > > > >>> > after >> > > > >>> > > > the window function is applied we could tell what to do >> with >> > > the >> > > > >>> > elements >> > > > >>> > > > in the window. You could apply your business logic there >> to >> > > > >>> determine >> > > > >>> > if >> > > > >>> > > > the window funciton was able to do what it is supposed t= o >> do, >> > > if >> > > > >>> yes >> > > > >>> > > evict >> > > > >>> > > > those elements, else(since the elements you are looking >> for >> > > > haven't >> > > > >>> > > arrived >> > > > >>> > > > yet) wait and try again when the trigger gets fired next >> > time. >> > > > >>> > > > >> > > > >>> > > > Thanks and Regards, >> > > > >>> > > > Vishnu Viswanath, >> > > > >>> > > > >> > > > >>> > > > >> > > > >>> > > > On Thu, Jul 7, 2016 at 9:19 AM, Radu Tudoran < >> > > > >>> radu.tudoran@huawei.com> >> > > > >>> > > > wrote: >> > > > >>> > > > >> > > > >>> > > > > Hi, >> > > > >>> > > > > >> > > > >>> > > > > @Aljoscha - I can understand the reason why you are >> > hesitant >> > > to >> > > > >>> > > introduce >> > > > >>> > > > > "slower" windows such as the ones that would maintain >> > sorted >> > > > >>> items or >> > > > >>> > > > > windows with bindings between the different entities >> > > (evictor, >> > > > >>> > trigger, >> > > > >>> > > > > window, apply function). However, I think it's possibl= e >> > just >> > > to >> > > > >>> > create >> > > > >>> > > > more >> > > > >>> > > > > types of windows. The existing ones (timewindows, glob= al >> > > > windows >> > > > >>> ...) >> > > > >>> > > can >> > > > >>> > > > > remain, and just add some more flavors of windows were >> more >> > > > >>> features >> > > > >>> > > are >> > > > >>> > > > > enabled or more functionality (e.g., access to the eac= h >> > > element >> > > > >>> in >> > > > >>> > the >> > > > >>> > > > > evictor ; possibility to delete or mark for eviction >> > elements >> > > > in >> > > > >>> the >> > > > >>> > > > > function...) >> > > > >>> > > > > >> > > > >>> > > > > Regarding the specific case of sorted windows, I think >> the >> > N >> > > > lon >> > > > >>> N >> > > > >>> > > > > complexity to sort (the worst case) is very unlikely. = In >> > fact >> > > > you >> > > > >>> > have >> > > > >>> > > > > almost sorted items/arrays. Moreover, if you consider >> that >> > in >> > > > >>> > > iteration X >> > > > >>> > > > > all elements were sorted, then in iteration X+1 you wi= ll >> > need >> > > > to >> > > > >>> sort >> > > > >>> > > > just >> > > > >>> > > > > the newly arrived elements (M). I would expect that th= is >> > > > number M >> > > > >>> > might >> > > > >>> > > > be >> > > > >>> > > > > significant smaller then N (elements that exists). The= n >> > using >> > > > an >> > > > >>> > > > insertion >> > > > >>> > > > > sort for these new elements you would have M * N >> > complexity >> > > > and >> > > > >>> if >> > > > >>> > > M<< N >> > > > >>> > > > > then the complexity is O(N). Alternatively you can use= a >> > > binary >> > > > >>> > search >> > > > >>> > > > for >> > > > >>> > > > > insertion and then you further reduce the complexity t= o >> > > > O(logN). >> > > > >>> > > > > If M is proportional to N then you can sort M and use >> merge >> > > > sort >> > > > >>> for >> > > > >>> > > > > combining. >> > > > >>> > > > > >> > > > >>> > > > > >> > > > >>> > > > > Dr. Radu Tudoran >> > > > >>> > > > > Research Engineer - Big Data Expert >> > > > >>> > > > > IT R&D Division >> > > > >>> > > > > >> > > > >>> > > > > >> > > > >>> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH >> > > > >>> > > > > European Research Center >> > > > >>> > > > > Riesstrasse 25, 80992 M=C3=BCnchen >> > > > >>> > > > > >> > > > >>> > > > > E-mail: radu.tudoran@huawei.com >> > > > >>> > > > > Mobile: +49 15209084330 >> > > > >>> > > > > Telephone: +49 891588344173 >> > > > >>> > > > > >> > > > >>> > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH >> > > > >>> > > > > Hansaallee 205, 40549 D=C3=BCsseldorf, Germany, >> www.huawei.com >> > > > >>> > > > > Registered Office: D=C3=BCsseldorf, Register Court >> D=C3=BCsseldorf, >> > HRB >> > > > >>> 56063, >> > > > >>> > > > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN >> > > > >>> > > > > Sitz der Gesellschaft: D=C3=BCsseldorf, Amtsgericht >> D=C3=BCsseldorf, >> > > HRB >> > > > >>> 56063, >> > > > >>> > > > > Gesch=C3=A4ftsf=C3=BChrer: Bo PENG, Wanzhou MENG, Lifa= ng CHEN >> > > > >>> > > > > This e-mail and its attachments contain confidential >> > > > information >> > > > >>> from >> > > > >>> > > > > HUAWEI, which is intended only for the person or entit= y >> > whose >> > > > >>> address >> > > > >>> > > is >> > > > >>> > > > > listed above. Any use of the information contained >> herein >> > in >> > > > any >> > > > >>> way >> > > > >>> > > > > (including, but not limited to, total or partial >> > disclosure, >> > > > >>> > > > reproduction, >> > > > >>> > > > > or dissemination) by persons other than the intended >> > > > >>> recipient(s) is >> > > > >>> > > > > prohibited. If you receive this e-mail in error, pleas= e >> > > notify >> > > > >>> the >> > > > >>> > > sender >> > > > >>> > > > > by phone or email immediately and delete it! >> > > > >>> > > > > >> > > > >>> > > > > >> > > > >>> > > > > -----Original Message----- >> > > > >>> > > > > From: =E5=90=95=E6=96=87=E9=BE=99(=E5=90=95=E6=96=87= =E9=BE=99) [mailto:wenlong.lwl@alibaba-inc.com] >> > > > >>> > > > > Sent: Thursday, July 07, 2016 11:59 AM >> > > > >>> > > > > To: dev@flink.apache.org >> > > > >>> > > > > Subject: =E7=AD=94=E5=A4=8D: [DISCUSS] Enhance Window = Evictor in Flink >> > > > >>> > > > > >> > > > >>> > > > > HI, >> > > > >>> > > > > I think it is necessary to support sorted window, whic= h >> can >> > > > avoid >> > > > >>> > > > scanning >> > > > >>> > > > > all the elements of window while trying to evicting >> > element, >> > > > >>> which >> > > > >>> > may >> > > > >>> > > > cost >> > > > >>> > > > > many IO operations, such as querying DBs to get elemen= ts >> > from >> > > > >>> state. >> > > > >>> > > > > What's more, when an window aggregation function is >> > > invertible, >> > > > >>> such >> > > > >>> > as >> > > > >>> > > > > sum, which can be updated by adding or removing a sing= le >> > > > record, >> > > > >>> > window >> > > > >>> > > > > results can be incrementally calculated. In this kind = of >> > > case, >> > > > >>> we can >> > > > >>> > > > > dramatically improve the performance of window >> aggregation, >> > > if >> > > > >>> > evictor >> > > > >>> > > > can >> > > > >>> > > > > trigger update of window aggregation state by some >> > mechanism. >> > > > >>> > > > > >> > > > >>> > > > > Best Wishes! >> > > > >>> > > > > --- >> > > > >>> > > > > wenlong.lwl >> > > > >>> > > > > >> > > > >>> > > > > -----=E9=82=AE=E4=BB=B6=E5=8E=9F=E4=BB=B6----- >> > > > >>> > > > > =E5=8F=91=E4=BB=B6=E4=BA=BA: Aljoscha Krettek [mailto:= aljoscha@apache.org] >> > > > >>> > > > > =E5=8F=91=E9=80=81=E6=97=B6=E9=97=B4: 2016=E5=B9=B47= =E6=9C=887=E6=97=A5 17:32 >> > > > >>> > > > > =E6=94=B6=E4=BB=B6=E4=BA=BA: dev@flink.apache.org >> > > > >>> > > > > =E4=B8=BB=E9=A2=98: Re: [DISCUSS] Enhance Window Evict= or in Flink >> > > > >>> > > > > >> > > > >>> > > > > Hi, >> > > > >>> > > > > regarding "sorting the window by event time": I also >> > > considered >> > > > >>> this >> > > > >>> > > but >> > > > >>> > > > > in the end I don't think it's necessary. Sorting is >> rather >> > > > >>> expensive >> > > > >>> > > and >> > > > >>> > > > > making decisions based on the order of elements can be >> > > tricky. >> > > > An >> > > > >>> > > extreme >> > > > >>> > > > > example of why this can be problematic is the case whe= re >> > all >> > > > >>> elements >> > > > >>> > > in >> > > > >>> > > > > the window have the same timestamp. Now, if you decide >> to >> > > evict >> > > > >>> the >> > > > >>> > > > first 5 >> > > > >>> > > > > elements based on timestamp order you basically >> arbitrarily >> > > > >>> evict 5 >> > > > >>> > > > > elements. I think the better solution for doing >> time-based >> > > > >>> eviction >> > > > >>> > is >> > > > >>> > > to >> > > > >>> > > > > do one pass over the elements to get an overview of th= e >> > > > timestamp >> > > > >>> > > > > distribution, then do a second pass and evict elements >> > based >> > > on >> > > > >>> what >> > > > >>> > > was >> > > > >>> > > > > learned in the first pass. This has complexity 2*n >> compared >> > > to >> > > > >>> the >> > > > >>> > > n*log >> > > > >>> > > > n >> > > > >>> > > > > (plus the work of actually deciding what to evict) of >> the >> > > sort >> > > > >>> based >> > > > >>> > > > > strategy. >> > > > >>> > > > > >> > > > >>> > > > > I might be wrong, though, and there could be a valid >> > use-case >> > > > not >> > > > >>> > > covered >> > > > >>> > > > > by the above idea. >> > > > >>> > > > > >> > > > >>> > > > > regarding Vishnu's other use case of evicting based on >> some >> > > > >>> decision >> > > > >>> > in >> > > > >>> > > > the >> > > > >>> > > > > WindowFunction: could this be solved by doing the chec= k >> for >> > > the >> > > > >>> > pattern >> > > > >>> > > > in >> > > > >>> > > > > the evictor itself instead of in the window function? >> I'm >> > > very >> > > > >>> > hesitant >> > > > >>> > > > to >> > > > >>> > > > > introduce a coupling between the different components = of >> > the >> > > > >>> > windowing >> > > > >>> > > > > system, i.e. assigner, trigger, evictor and window >> > function. >> > > > The >> > > > >>> > reason >> > > > >>> > > > is >> > > > >>> > > > > that using an evictor has a huge performance impact >> since >> > the >> > > > >>> system >> > > > >>> > > > always >> > > > >>> > > > > has to keep all elements and cannot to incremental >> > > aggregation >> > > > of >> > > > >>> > > window >> > > > >>> > > > > results and I therefore don't want to put specific >> features >> > > > >>> regarding >> > > > >>> > > > > eviction into the other components. >> > > > >>> > > > > >> > > > >>> > > > > Cheers, >> > > > >>> > > > > Aljoscha >> > > > >>> > > > > >> > > > >>> > > > > On Thu, 7 Jul 2016 at 10:00 Radu Tudoran < >> > > > >>> radu.tudoran@huawei.com> >> > > > >>> > > > wrote: >> > > > >>> > > > > >> > > > >>> > > > > > Hi, >> > > > >>> > > > > > >> > > > >>> > > > > > I think the situation Vishnu raised is something tha= t >> > > should >> > > > be >> > > > >>> > > > > accounted. >> > > > >>> > > > > > It can happen indeed that you want to condition what >> you >> > > > evict >> > > > >>> from >> > > > >>> > > > > > the window based on the result of the function to be >> > > applied. >> > > > >>> > > > > > >> > > > >>> > > > > > My 2 cents... >> > > > >>> > > > > > I would suggest adding a list for the elements of th= e >> > > stream >> > > > >>> where >> > > > >>> > > you >> > > > >>> > > > > > can MARK them to be delete. Alternatively the iterat= or >> > can >> > > be >> > > > >>> > > extended >> > > > >>> > > > > > to have a function Iterator.markForEviction(int); >> These >> > can >> > > > be >> > > > >>> made >> > > > >>> > > > > > available also in the apply function. Moreover, we c= an >> > use >> > > > >>> this to >> > > > >>> > > > > > extend the functionality such that you add MARKs >> either >> > for >> > > > >>> > eviction >> > > > >>> > > > > > after the function has finished triggering or to be >> > evicted >> > > > in >> > > > >>> the >> > > > >>> > > next >> > > > >>> > > > > iteration. >> > > > >>> > > > > > >> > > > >>> > > > > > >> > > > >>> > > > > > Dr. Radu Tudoran >> > > > >>> > > > > > Research Engineer - Big Data Expert >> > > > >>> > > > > > IT R&D Division >> > > > >>> > > > > > >> > > > >>> > > > > > >> > > > >>> > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH >> > > > >>> > > > > > European Research Center >> > > > >>> > > > > > Riesstrasse 25, 80992 M=C3=BCnchen >> > > > >>> > > > > > >> > > > >>> > > > > > E-mail: radu.tudoran@huawei.com >> > > > >>> > > > > > Mobile: +49 15209084330 >> > > > >>> > > > > > Telephone: +49 891588344173 >> > > > >>> > > > > > >> > > > >>> > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH >> > > > >>> > > > > > Hansaallee 205, 40549 D=C3=BCsseldorf, Germany, >> > www.huawei.com >> > > > >>> > Registered >> > > > >>> > > > > > Office: D=C3=BCsseldorf, Register Court D=C3=BCsseld= orf, HRB >> 56063, >> > > > >>> Managing >> > > > >>> > > > > > Director: Bo PENG, Wanzhou MENG, Lifang CHEN Sitz de= r >> > > > >>> Gesellschaft: >> > > > >>> > > > > > D=C3=BCsseldorf, Amtsgericht D=C3=BCsseldorf, HRB 56= 063, >> > > > >>> > > > > > Gesch=C3=A4ftsf=C3=BChrer: Bo PENG, Wanzhou MENG, Li= fang CHEN >> This >> > > > >>> e-mail and >> > > > >>> > > > > > its attachments contain confidential information fro= m >> > > HUAWEI, >> > > > >>> which >> > > > >>> > > is >> > > > >>> > > > > > intended only for the person or entity whose address >> is >> > > > listed >> > > > >>> > above. >> > > > >>> > > > > > Any use of the information contained herein in any w= ay >> > > > >>> (including, >> > > > >>> > > but >> > > > >>> > > > > > not limited to, total or partial disclosure, >> > reproduction, >> > > or >> > > > >>> > > > > > dissemination) by persons other than the intended >> > > > recipient(s) >> > > > >>> is >> > > > >>> > > > > > prohibited. If you receive this e-mail in error, >> please >> > > > notify >> > > > >>> the >> > > > >>> > > > > > sender by phone or email immediately and delete it! >> > > > >>> > > > > > >> > > > >>> > > > > > -----Original Message----- >> > > > >>> > > > > > From: Vishnu Viswanath [mailto: >> > > vishnu.viswanath25@gmail.com] >> > > > >>> > > > > > Sent: Thursday, July 07, 2016 1:28 AM >> > > > >>> > > > > > To: Dev >> > > > >>> > > > > > Subject: Re: [DISCUSS] Enhance Window Evictor in Fli= nk >> > > > >>> > > > > > >> > > > >>> > > > > > Thank you Maxim and Aljoscha. >> > > > >>> > > > > > >> > > > >>> > > > > > Yes the beforeEvict and afterEvict should able addre= ss >> > > point >> > > > 3. >> > > > >>> > > > > > >> > > > >>> > > > > > I have one more use case in my mind (which I might >> have >> > to >> > > do >> > > > >>> in >> > > > >>> > the >> > > > >>> > > > > > later stages of POC). >> > > > >>> > > > > > What if the `evictAfter` should behave differently >> based >> > on >> > > > the >> > > > >>> > > window >> > > > >>> > > > > > function. >> > > > >>> > > > > > >> > > > >>> > > > > > For example. >> > > > >>> > > > > > I have a window that got triggered and my evict >> function >> > is >> > > > >>> being >> > > > >>> > > > > > called after the apply function. In such cases I >> should >> > be >> > > > >>> able to >> > > > >>> > > > > > decide on what I should evict based on the window >> > function. >> > > > >>> > > > > > e.g., >> > > > >>> > > > > > let the window have elements of type `case class >> Item(id: >> > > > >>> String, >> > > > >>> > > type: >> > > > >>> > > > > > String)` and let the types be `type1` and `type2`. >> > > > >>> > > > > > If window function is able to find a sequence : `typ= e1 >> > > type2 >> > > > >>> > type1`, >> > > > >>> > > > > > then evict all elements of the type type2. >> > > > >>> > > > > > or if the window function is able to find a sequence >> > `type2 >> > > > >>> type2 >> > > > >>> > > > > > type1`, then evict all elements of type type1 else >> don't >> > > > evict >> > > > >>> any >> > > > >>> > > > > elements. >> > > > >>> > > > > > >> > > > >>> > > > > > Is this possible? or at least let the window functio= n >> > > choose >> > > > >>> > between >> > > > >>> > > > > > two Evictor functions -(one for success case and one >> > > failure >> > > > >>> case) >> > > > >>> > > > > > >> > > > >>> > > > > > @Maxim: >> > > > >>> > > > > > regarding the sorted window, actually I wanted my >> > elements >> > > to >> > > > >>> be >> > > > >>> > > > > > sorted but not for the eviction but while applying t= he >> > > window >> > > > >>> > > function >> > > > >>> > > > > > (so thought this could be done easily). But it would >> be >> > > good >> > > > to >> > > > >>> > have >> > > > >>> > > > > > the window sorted based on EventTime. >> > > > >>> > > > > > >> > > > >>> > > > > > >> > > > >>> > > > > > Thanks and Regards, >> > > > >>> > > > > > Vishnu Viswanath, >> > > > >>> > > > > > >> > > > >>> > > > > > >> > > > >>> > > > > > >> > > > >>> > > > > > >> > > > >>> > > > > > On Wed, Jul 6, 2016 at 3:55 PM, Maxim < >> mfateev@gmail.com >> > > >> > > > >>> wrote: >> > > > >>> > > > > > >> > > > >>> > > > > > > Actually for such evictor to be useful the window >> > should >> > > be >> > > > >>> > sorted >> > > > >>> > > > > > > by some field, usually event time. What do you thi= nk >> > > about >> > > > >>> adding >> > > > >>> > > > > > > sorted window abstraction? >> > > > >>> > > > > > > >> > > > >>> > > > > > > On Wed, Jul 6, 2016 at 11:36 AM, Aljoscha Krettek >> > > > >>> > > > > > > >> > > > >>> > > > > > > wrote: >> > > > >>> > > > > > > >> > > > >>> > > > > > > > @Maxim: That's perfect I didn't think about usin= g >> > > > >>> > > > > > > > Iterator.remove() for that. I'll update the doc. >> What >> > > do >> > > > >>> you >> > > > >>> > > think >> > > > >>> > > > > > > > Vishnu? This should also >> > > > >>> > > > > > > cover >> > > > >>> > > > > > > > your before/after case nicely. >> > > > >>> > > > > > > > >> > > > >>> > > > > > > > @Vishnu: The steps would be these: >> > > > >>> > > > > > > > - Converge on a design in this discussion >> > > > >>> > > > > > > > - Add a Jira issue here: >> > > > >>> > > > > > > > https://issues.apache.org/jira/browse/FLINK >> > > > >>> > > > > > > > - Work on the code an create a pull request on >> > github >> > > > >>> > > > > > > > >> > > > >>> > > > > > > > The steps are also outlined here >> > > > >>> > > > > > > > http://flink.apache.org/how-to-contribute.html >> and >> > > here >> > > > >>> > > > > > > > http://flink.apache.org/contribute-code.html. >> > > > >>> > > > > > > > >> > > > >>> > > > > > > > - >> > > > >>> > > > > > > > Aljoscha >> > > > >>> > > > > > > > >> > > > >>> > > > > > > > On Wed, 6 Jul 2016 at 19:45 Maxim < >> mfateev@gmail.com >> > > >> > > > >>> wrote: >> > > > >>> > > > > > > > >> > > > >>> > > > > > > > > The new API forces iteration through every >> element >> > of >> > > > the >> > > > >>> > > buffer >> > > > >>> > > > > > > > > even >> > > > >>> > > > > > > if >> > > > >>> > > > > > > > a >> > > > >>> > > > > > > > > single value to be evicted. What about >> implementing >> > > > >>> > > > > > > > > Iterator.remove() method for elements? The API >> > would >> > > > look >> > > > >>> > like: >> > > > >>> > > > > > > > > >> > > > >>> > > > > > > > > public interface Evictor >> > extends >> > > > >>> > > > > > > > > Serializable { >> > > > >>> > > > > > > > > >> > > > >>> > > > > > > > > /** >> > > > >>> > > > > > > > > * Optionally evicts elements. Called befo= re >> > > > >>> windowing >> > > > >>> > > > > function. >> > > > >>> > > > > > > > > * >> > > > >>> > > > > > > > > * @param elements The elements currently i= n >> the >> > > > >>> pane. Use >> > > > >>> > > > > > > > > Iterator.remove to evict. >> > > > >>> > > > > > > > > * @param size The current number of >> elements in >> > > the >> > > > >>> pane. >> > > > >>> > > > > > > > > * @param window The {@link Window} >> > > > >>> > > > > > > > > */ >> > > > >>> > > > > > > > > void evictBefore(Iterable elements, int >> size, >> > > > >>> > > > > > > > > EvictorContext >> > > > >>> > > > > > > ctx); >> > > > >>> > > > > > > > > >> > > > >>> > > > > > > > > /** >> > > > >>> > > > > > > > > * Optionally evicts elements. Called afte= r >> > > > windowing >> > > > >>> > > > function. >> > > > >>> > > > > > > > > * >> > > > >>> > > > > > > > > * @param elements The elements currently i= n >> the >> > > > >>> pane. Use >> > > > >>> > > > > > > > > Iterator.remove to evict. >> > > > >>> > > > > > > > > * @param size The current number of >> elements in >> > > the >> > > > >>> pane. >> > > > >>> > > > > > > > > * @param window The {@link Window} >> > > > >>> > > > > > > > > */ >> > > > >>> > > > > > > > > void evictAfter(Iterable elements, int >> size, >> > > > >>> > > > > > > > > EvictorContext ctx); } >> > > > >>> > > > > > > > > >> > > > >>> > > > > > > > > Such API allows to abort iteration at any poin= t >> and >> > > > evict >> > > > >>> > > > > > > > > elements in >> > > > >>> > > > > > > any >> > > > >>> > > > > > > > > order. >> > > > >>> > > > > > > > > >> > > > >>> > > > > > > > > Thanks, >> > > > >>> > > > > > > > > >> > > > >>> > > > > > > > > Maxim. >> > > > >>> > > > > > > > > >> > > > >>> > > > > > > > > On Wed, Jul 6, 2016 at 9:04 AM, Vishnu >> Viswanath < >> > > > >>> > > > > > > > > vishnu.viswanath25@gmail.com> wrote: >> > > > >>> > > > > > > > > > >> > > > >>> > > > > > > > > > Hi Aljoscha, >> > > > >>> > > > > > > > > > >> > > > >>> > > > > > > > > > Thanks. Yes the new interface seems to addre= ss >> > > points >> > > > >>> 1 and >> > > > >>> > > 2. >> > > > >>> > > > > > > > > > of >> > > > >>> > > > > > > > > > >> > > > >>> > > > > > > > > > *1) I am having a use case where I have to >> > create a >> > > > >>> custom >> > > > >>> > > > > > > > > > Evictor >> > > > >>> > > > > > > that >> > > > >>> > > > > > > > > > will evict elements from the window based on >> the >> > > > value >> > > > >>> > (e.g., >> > > > >>> > > > > > > > > > if I >> > > > >>> > > > > > > have >> > > > >>> > > > > > > > > > elements are of case class Item(id: Int, >> > > type:String) >> > > > >>> then >> > > > >>> > > > > > > > > > evict >> > > > >>> > > > > > > > elements >> > > > >>> > > > > > > > > > that has type=3D"a"). I believe this is not >> > currently >> > > > >>> > > possible.* >> > > > >>> > > > > > > > > > *2) this is somewhat related to 1) where the= re >> > > should >> > > > >>> be an >> > > > >>> > > > > > > > > > option to >> > > > >>> > > > > > > > > evict >> > > > >>> > > > > > > > > > elements from anywhere in the window. not on= ly >> > from >> > > > the >> > > > >>> > > > > > > > > > beginning of >> > > > >>> > > > > > > > the >> > > > >>> > > > > > > > > > window. (e.g., apply the delta function to a= ll >> > > > >>> elements and >> > > > >>> > > > > > > > > > remove >> > > > >>> > > > > > > all >> > > > >>> > > > > > > > > > those don't pass. I checked the code and evi= ct >> > > method >> > > > >>> just >> > > > >>> > > > > > > > > > returns >> > > > >>> > > > > > > the >> > > > >>> > > > > > > > > > number of elements to be removed and >> > > > >>> processTriggerResult >> > > > >>> > > just >> > > > >>> > > > > > > > > > skips >> > > > >>> > > > > > > > > those >> > > > >>> > > > > > > > > > many elements from the beginning. * >> > > > >>> > > > > > > > > > *3) Add an option to enables the user to >> decide >> > if >> > > > the >> > > > >>> > > > > > > > > > eviction >> > > > >>> > > > > > > should >> > > > >>> > > > > > > > > > happen before the apply function or after th= e >> > apply >> > > > >>> > function. >> > > > >>> > > > > > > Currently >> > > > >>> > > > > > > > > it >> > > > >>> > > > > > > > > > is before the apply function, but I have a u= se >> > case >> > > > >>> where I >> > > > >>> > > > > > > > > > need to >> > > > >>> > > > > > > > first >> > > > >>> > > > > > > > > > apply the function and evict afterward.* >> > > > >>> > > > > > > > > > >> > > > >>> > > > > > > > > > I would be interested in contributing to the >> code >> > > > base. >> > > > >>> > > Please >> > > > >>> > > > > > > > > > let me >> > > > >>> > > > > > > > > know >> > > > >>> > > > > > > > > > the steps. >> > > > >>> > > > > > > > > > >> > > > >>> > > > > > > > > > Thanks and Regards, >> > > > >>> > > > > > > > > > Vishnu Viswanath >> > > > >>> > > > > > > > > > >> > > > >>> > > > > > > > > > On Wed, Jul 6, 2016 at 11:49 AM, Aljoscha >> > Krettek < >> > > > >>> > > > > > > aljoscha@apache.org >> > > > >>> > > > > > > > > >> > > > >>> > > > > > > > > > wrote: >> > > > >>> > > > > > > > > > >> > > > >>> > > > > > > > > > > Hi, >> > > > >>> > > > > > > > > > > as mentioned in the thread on improving th= e >> > > > Windowing >> > > > >>> > API I >> > > > >>> > > > > > > > > > > also >> > > > >>> > > > > > > > have a >> > > > >>> > > > > > > > > > > design doc just for improving >> WindowEvictors. I >> > > had >> > > > >>> this >> > > > >>> > in >> > > > >>> > > > > > > > > > > my head >> > > > >>> > > > > > > > for >> > > > >>> > > > > > > > > a >> > > > >>> > > > > > > > > > > while but was hesitant to publish but sinc= e >> > > people >> > > > >>> are >> > > > >>> > > > > > > > > > > asking about >> > > > >>> > > > > > > > > this >> > > > >>> > > > > > > > > > > now might be a good time to post it. Here'= s >> the >> > > > doc: >> > > > >>> > > > > > > > > > > >> > > > >>> > > > > > > > > >> > > > >>> > > > > > > > > >> > > > >>> > > > > > > > >> > > > >>> > > > > > > >> > > > >>> > > >> > > > >> https://docs.google.com/document/d/1Rr7xzlItYqvFXLyyy-Yv0vvw8f29QYAj >> > > > >>> > > > > > > m5 >> > > > >>> > > > > > > i9E4A_JlU/edit?usp=3Dsharing >> > > > >>> > > > > > > > > > > >> > > > >>> > > > > > > > > > > Feedback/Suggestions are very welcome! >> Please >> > let >> > > > me >> > > > >>> know >> > > > >>> > > > > > > > > > > what you >> > > > >>> > > > > > > > > think. >> > > > >>> > > > > > > > > > > >> > > > >>> > > > > > > > > > > @Vishnu: Are you interested in contributin= g >> a >> > > > >>> solution >> > > > >>> > for >> > > > >>> > > > > > > > > > > this to >> > > > >>> > > > > > > > the >> > > > >>> > > > > > > > > > > Flink code base? I'd be very happy to work >> with >> > > you >> > > > >>> on >> > > > >>> > > this. >> > > > >>> > > > > > > > > > > >> > > > >>> > > > > > > > > > > Cheers, >> > > > >>> > > > > > > > > > > Aljoscha >> > > > >>> > > > > > > > > > > >> > > > >>> > > > > > > > > > > P.S. I think it would be best to keep >> > discussions >> > > > to >> > > > >>> the >> > > > >>> > ML >> > > > >>> > > > > > > > > > > because comments on the doc will not be >> visible >> > > > here >> > > > >>> for >> > > > >>> > > > > > everyone. >> > > > >>> > > > > > > > > > > >> > > > >>> > > > > > > > > >> > > > >>> > > > > > > > >> > > > >>> > > > > > > >> > > > >>> > > > > > >> > > > >>> > > > > >> > > > >>> > > > >> > > > >>> > > >> > > > >>> > =E2=80=8B >> > > > >>> > >> > > > >>> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > >> >> > > > > >> > > > > =E2=80=8B >> > > > >> > > >> > >> > > --001a11457c06f4afa00537aae970--