Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 7761818A38 for ; Thu, 25 Jun 2015 10:31:31 +0000 (UTC) Received: (qmail 60188 invoked by uid 500); 25 Jun 2015 10:31:31 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 60126 invoked by uid 500); 25 Jun 2015 10:31:31 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 60115 invoked by uid 99); 25 Jun 2015 10:31:31 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 25 Jun 2015 10:31:31 +0000 Received: from mail-wi0-f172.google.com (mail-wi0-f172.google.com [209.85.212.172]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 9A03C1A02F5 for ; Thu, 25 Jun 2015 10:31:30 +0000 (UTC) Received: by wicgi11 with SMTP id gi11so71333630wic.0 for ; Thu, 25 Jun 2015 03:31:29 -0700 (PDT) X-Received: by 10.180.108.230 with SMTP id hn6mr4424849wib.87.1435228289286; Thu, 25 Jun 2015 03:31:29 -0700 (PDT) MIME-Version: 1.0 References: <55891C19.3090203@informatik.hu-berlin.de> In-Reply-To: From: Aljoscha Krettek Date: Thu, 25 Jun 2015 10:31:19 +0000 Message-ID: Subject: Re: Thoughts About Streaming To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=e89a8f3ba5cf5ac9750519552016 --e89a8f3ba5cf5ac9750519552016 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi, I also ran the tests on top of PR 856 (inverse reducer) now. The results seem incorrect. When I insert a Thread.sleep(1) in the tuple source, all the previous tests reported around 3600 tuples (Size 5 sec, Slide 1 sec) (Theoretically there would be 5000 tuples in 5 seconds but this is due to overhead). These are the results for the inverse reduce optimisation: (Tuple 0,38) (Tuple 0,829) (Tuple 0,1625) (Tuple 0,2424) (Tuple 0,3190) (Tuple 0,3198) (Tuple 0,-339368) (Tuple 0,-1315725) (Tuple 0,-2932932) (Tuple 0,-5082735) (Tuple 0,-7743256) (Tuple 0,75701046) (Tuple 0,642829470) (Tuple 0,2242018381) (Tuple 0,5190708618) (Tuple 0,10060360311) (Tuple 0,-94254951) (Tuple 0,-219806321293) (Tuple 0,-1258895232699) (Tuple 0,-4074432596329) One line is one emitted window count. This is what happens when I remove the Thread.sleep(1): (Tuple 0,660676) (Tuple 0,2553733) (Tuple 0,3542696) (Tuple 0,1) (Tuple 0,1107035) (Tuple 0,2549491) (Tuple 0,4100387) (Tuple 0,-8406583360092) (Tuple 0,-8406582150743) (Tuple 0,-8406580427190) (Tuple 0,-8406580427190) (Tuple 0,-8406580427190) (Tuple 0,6847279255682044995) (Tuple 0,6847279255682044995) (Tuple 0,-5390528042713628318) (Tuple 0,-5390528042711551780) (Tuple 0,-5390528042711551780) So at some point the pre-reducer seems to go haywire and does not recover from it. The good thing is that it does produce results now, where the previous Current/Reduce would simply hang and not produce any output. On Thu, 25 Jun 2015 at 12:02 G=C3=A1bor G=C3=A9vay wrote= : > Hello, > > Aljoscha, can you please try the performance test of Current/Reduce > with the InversePreReducer in PR 856? (If you just call sum, it will > use an InversePreReducer.) It would be an interesting test, because > the inverse function optimization really depends on the stream being > ordered, and I think it has the potential of being faster then > Next/Reduce. Especially if the window size is much larger than the > slide size. > > Best regards, > Gabor > > > 2015-06-25 11:36 GMT+02:00 Aljoscha Krettek : > > I think I'll have to elaborate a bit so I created a proof-of-concept > > implementation of my Ideas and ran some throughput measurements to > > alleviate concerns about performance. > > > > First, though, I want to highlight again why the current approach does > not > > work with out-of-order elements (which, again, occur constantly due to > the > > distributed nature of the system). This is the example I posted earlier= : > > https://gist.github.com/aljoscha/a367012646ab98208d27. The plan looks > like > > this: > > > > +--+ > > | | Source > > +--+ > > | > > +-----+ > > | | > > | +--+ > > | | | Identity Map > > | +--+ > > | | > > +-----+ > > | > > +--+ > > | | Window > > +--+ > > | > > | > > +--+ > > | | Sink > > +--+ > > > > So all it does is pass the elements through an identity map and then > merge > > them again before the window operator. The source emits ascending > integers > > and the window operator has a custom timestamp extractor that uses the > > integer itself as the timestamp and should create windows of size 4 (th= at > > is elements with timestamp 0-3 are one window, the next are the element= s > > with timestamp 4-8, and so on). Since the topology basically doubles th= e > > elements form the source I would expect to get these windows: > > Window: 0, 0, 1, 1, 2, 2, 3, 3 > > Window: 4, 4, 6, 6, 7, 7, 8, 8 > > > > The output is this, however: > > Window: 0, 1, 2, 3, > > Window: 4, 0, 1, 2, 3, 4, 5, 6, 7, > > Window: 8, 9, 10, 11, > > Window: 12, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, > > Window: 16, 17, 18, 19, > > Window: 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, > > Window: 24, 25, 26, 27, > > Window: 28, 29, 30, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, > > > > The reason is that the elements simply arrive out-of-order. Imagine wha= t > > would happen if the elements actually arrived with some delay from > > different operations. > > > > Now, on to the performance numbers. The proof-of-concept I created is > > available here: > > https://github.com/aljoscha/flink/tree/event-time-window-fn-mock. The > basic > > idea is that sources assign the current timestamp when emitting element= s. > > They also periodically emit watermarks that tell us that no elements wi= th > > an earlier timestamp will be emitted. The watermarks propagate through > the > > operators. The window operator looks at the timestamp of an element and > > puts it into the buffer that corresponds to that window. When the windo= w > > operator receives a watermark it will look at the in-flight windows > > (basically the buffers) and emit those windows where the window-end is > > before the watermark. > > > > For measuring throughput I did the following: The source emits tuples o= f > > the form ("tuple", 1) in an infinite loop. The window operator sums up > the > > tuples, thereby counting how many tuples the window operator can handle > in > > a given time window. There are two different implementations for the > > summation: 1) simply summing up the values in a mapWindow(), there you > get > > a List of all tuples and simple iterate over it. 2) using sum(1), which > is > > implemented as a reduce() (that uses the pre-reducer optimisations). > > > > These are the performance numbers (Current is the current implementatio= n, > > Next is my proof-of-concept): > > > > Tumbling (1 sec): > > - Current/Map: 1.6 mio > > - Current/Reduce: 2 mio > > - Next/Map: 2.2 mio > > - Next/Reduce: 4 mio > > > > Sliding (5 sec, slide 1 sec): > > - Current/Map: ca 3 mio (fluctuates a lot) > > - Current/Reduce: No output > > - Next/Map: ca 4 mio (fluctuates) > > - Next/Reduce: 10 mio > > > > The Next/Reduce variant can basically scale indefinitely with window si= ze > > because the internal state does not rely on the number of elements (it = is > > just the current sum). The pre-reducer for sliding elements cannot hand= le > > the amount of tuples, it produces no output. For the two Map variants t= he > > performance fluctuates because they always keep all the elements in an > > internal buffer before emission, this seems to tax the garbage collecto= r > a > > bit and leads to random pauses. > > > > One thing that should be noted is that I had to disable the fake-elemen= t > > emission thread, otherwise the Current versions would deadlock. > > > > So, I started working on this because I thought that out-of-order > > processing would be necessary for correctness. And it is certainly, But > the > > proof-of-concept also shows that performance can be greatly improved. > > > > On Wed, 24 Jun 2015 at 09:46 Gyula F=C3=B3ra wro= te: > >> > >> I agree lets separate these topics from each other so we can get faste= r > >> resolution. > >> > >> There is already a state discussion in the thread we started with Pari= s. > >> > >> On Wed, Jun 24, 2015 at 9:24 AM Kostas Tzoumas > > wrote: > >> > >> > I agree with supporting out-of-order out of the box :-), even if thi= s > > means > >> > a major refactoring. This is the right time to refactor the streamin= g > > API > >> > before we pull it out of beta. I think that this is more important > than > > new > >> > features in the streaming API, which can be prioritized once the API > is > > out > >> > of beta (meaning, that IMO this is the right time to stall PRs until > we > >> > agree on the design). > >> > > >> > There are three sections in the document: windowing, state, and API. > How > >> > convoluted are those with each other? Can we separate the discussion > or > > do > >> > we need to discuss those all together? I think part of the difficult= y > is > >> > that we are discussing three design choices at once. > >> > > >> > On Tue, Jun 23, 2015 at 7:43 PM, Ted Dunning > >> > wrote: > >> > > >> > > Out of order is ubiquitous in the real-world. Typically, what > > happens is > >> > > that businesses will declare a maximum allowable delay for delayed > >> > > transactions and will commit to results when that delay is reached= . > >> > > Transactions that arrive later than this cutoff are collected > > specially > >> > as > >> > > corrections which are reported/used when possible. > >> > > > >> > > Clearly, ordering can also be violated during processing, but if t= he > > data > >> > > is originally out of order the situation can't be repaired by any > >> > protocol > >> > > fixes that prevent transactions from becoming disordered but has t= o > >> > handled > >> > > at the data level. > >> > > > >> > > > >> > > > >> > > > >> > > On Tue, Jun 23, 2015 at 9:29 AM, Aljoscha Krettek < > aljoscha@apache.org > >> > >> > > wrote: > >> > > > >> > > > I also don't like big changes but sometimes they are necessary. > The > >> > > reason > >> > > > why I'm so adamant about out-of-order processing is that > > out-of-order > >> > > > elements are not some exception that occurs once in a while; the= y > > occur > >> > > > constantly in a distributed system. For example, in this: > >> > > > https://gist.github.com/aljoscha/a367012646ab98208d27 the > resulting > >> > > > windows > >> > > > are completely bogus because the current windowing system assume= s > >> > > elements > >> > > > to globally arrive in order, which is simply not true. (The > example > >> > has a > >> > > > source that generates increasing integers. Then these pass > through a > >> > map > >> > > > and are unioned with the original DataStream before a window > > operator.) > >> > > > This simulates elements arriving from different operators at a > >> > windowing > >> > > > operator. The example is also DOP=3D1, I imagine this to get wor= se > > with > >> > > > higher DOP. > >> > > > > >> > > > What do you mean by costly? As I said, I have a proof-of-concept > >> > > windowing > >> > > > operator that can handle out-or-order elements. This is an examp= le > >> > using > >> > > > the current Flink API: > >> > > > https://gist.github.com/aljoscha/f8dce0691732e344bbe8. > >> > > > (It is an infinite source of tuples and a 5 second window operat= or > > that > >> > > > counts the tuples.) The first problem is that this code deadlock= s > >> > because > >> > > > of the thread that emits fake elements. If I disable the fake > > element > >> > > code > >> > > > it works, but the throughput using my mockup is 4 times higher . > The > >> > gap > >> > > > widens dramatically if the window size increases. > >> > > > > >> > > > So, it actually increases performance (unless I'm making a mista= ke > > in > >> > my > >> > > > explorations) and can handle elements that arrive out-of-order > > (which > >> > > > happens basically always in a real-world windowing use-cases). > >> > > > > >> > > > > >> > > > On Tue, 23 Jun 2015 at 12:51 Stephan Ewen > wrote: > >> > > > > >> > > > > What I like a lot about Aljoscha's proposed design is that we > > need no > >> > > > > different code for "system time" vs. "event time". It only > > differs in > >> > > > where > >> > > > > the timestamps are assigned. > >> > > > > > >> > > > > The OOP approach also gives you the semantics of total orderin= g > >> > without > >> > > > > imposing merges on the streams. > >> > > > > > >> > > > > On Tue, Jun 23, 2015 at 10:43 AM, Matthias J. Sax < > >> > > > > mjsax@informatik.hu-berlin.de> wrote: > >> > > > > > >> > > > > > I agree that there should be multiple alternatives the user(= !) > > can > >> > > > > > choose from. Partial out-of-order processing works for > many/most > >> > > > > > aggregates. However, if you consider Event-Pattern-Matching, > > global > >> > > > > > ordering in necessary (even if the performance penalty might > be > >> > > high). > >> > > > > > > >> > > > > > I would also keep "system-time windows" as an alternative to > >> > "source > >> > > > > > assigned ts-windows". > >> > > > > > > >> > > > > > It might also be interesting to consider the following paper > for > >> > > > > > overlapping windows: "Resource sharing in continuous > > sliding-window > >> > > > > > aggregates" > >> > > > > > > >> > > > > > > https://dl.acm.org/citation.cfm?id=3D1316720 > >> > > > > > > >> > > > > > -Matthias > >> > > > > > > >> > > > > > On 06/23/2015 10:37 AM, Gyula F=C3=B3ra wrote: > >> > > > > > > Hey > >> > > > > > > > >> > > > > > > I think we should not block PRs unnecessarily if your > > suggested > >> > > > changes > >> > > > > > > might touch them at some point. > >> > > > > > > > >> > > > > > > Also I still think we should not put everything in the > > Datastream > >> > > > > because > >> > > > > > > it will be a huge mess. > >> > > > > > > > >> > > > > > > Also we need to agree on the out of order processing, > whether > > we > >> > > want > >> > > > > it > >> > > > > > > the way you proposed it(which is quite costly). Another > >> > alternative > >> > > > > > > approach there which fits in the current windowing is to > > filter > >> > out > >> > > > if > >> > > > > > > order events and apply a special handling operator on them= . > > This > >> > > > would > >> > > > > be > >> > > > > > > fairly lightweight. > >> > > > > > > > >> > > > > > > My point is that we need to consider some alternative > > solutions. > >> > > And > >> > > > we > >> > > > > > > should not block contributions along the way. > >> > > > > > > > >> > > > > > > Cheers > >> > > > > > > Gyula > >> > > > > > > > >> > > > > > > On Tue, Jun 23, 2015 at 9:55 AM Aljoscha Krettek < > >> > > > aljoscha@apache.org> > >> > > > > > > wrote: > >> > > > > > > > >> > > > > > >> The reason I posted this now is that we need to think abo= ut > > the > >> > > API > >> > > > > and > >> > > > > > >> windowing before proceeding with the PRs of Gabor (invers= e > >> > reduce) > >> > > > and > >> > > > > > >> Gyula (removal of "aggregate" functions on DataStream). > >> > > > > > >> > >> > > > > > >> For the windowing, I think that the current model does no= t > > work > >> > > for > >> > > > > > >> out-of-order processing. Therefore, the whole windowing > >> > > > infrastructure > >> > > > > > will > >> > > > > > >> basically have to be redone. Meaning also that any work o= n > > the > >> > > > > > >> pre-aggregators or optimizations that we do now becomes > > useless. > >> > > > > > >> > >> > > > > > >> For the API, I proposed to restructure the interactions > > between > >> > > all > >> > > > > the > >> > > > > > >> different *DataStream classes and grouping/windowing. (Se= e > > API > >> > > > section > >> > > > > > of > >> > > > > > >> the doc I posted.) > >> > > > > > >> > >> > > > > > >> On Mon, 22 Jun 2015 at 21:56 Gyula F=C3=B3ra < > gyula.fora@gmail.com > >> > >> > > > wrote: > >> > > > > > >> > >> > > > > > >>> Hi Aljoscha, > >> > > > > > >>> > >> > > > > > >>> Thanks for the nice summary, this is a very good > initiative. > >> > > > > > >>> > >> > > > > > >>> I added some comments to the respective sections (where = I > > didnt > >> > > > fully > >> > > > > > >> agree > >> > > > > > >>> :).). > >> > > > > > >>> At some point I think it would be good to have a public > > hangout > >> > > > > session > >> > > > > > >> on > >> > > > > > >>> this, which could make a more dynamic discussion. > >> > > > > > >>> > >> > > > > > >>> Cheers, > >> > > > > > >>> Gyula > >> > > > > > >>> > >> > > > > > >>> Aljoscha Krettek ezt =C3=ADrta (id= =C5=91pont: > >> > 2015. > >> > > > j=C3=BAn. > >> > > > > > >> 22., > >> > > > > > >>> H, 21:34): > >> > > > > > >>> > >> > > > > > >>>> Hi, > >> > > > > > >>>> with people proposing changes to the streaming part I > also > >> > > wanted > >> > > > to > >> > > > > > >>> throw > >> > > > > > >>>> my hat into the ring. :D > >> > > > > > >>>> > >> > > > > > >>>> During the last few months, while I was getting > acquainted > >> > with > >> > > > the > >> > > > > > >>>> streaming system, I wrote down some thoughts I had abou= t > > how > >> > > > things > >> > > > > > >> could > >> > > > > > >>>> be improved. Hopefully, they are in somewhat coherent > shape > >> > now, > >> > > > so > >> > > > > > >>> please > >> > > > > > >>>> have a look if you are interested in this: > >> > > > > > >>>> > >> > > > > > >>>> > >> > > > > > >>> > >> > > > > > >> > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > https://docs.google.com/document/d/1rSoHyhUhm2IE30o5tkR8GEetjFvMRMNxvsCfo= PsW6_4/edit?usp=3Dsharing > >> > > > > > >>>> > >> > > > > > >>>> This mostly covers: > >> > > > > > >>>> - Timestamps assigned at sources > >> > > > > > >>>> - Out-of-order processing of elements in window > operators > >> > > > > > >>>> - API design > >> > > > > > >>>> > >> > > > > > >>>> Please let me know what you think. Comment in the > document > > or > >> > > here > >> > > > > in > >> > > > > > >> the > >> > > > > > >>>> mailing list. > >> > > > > > >>>> > >> > > > > > >>>> I have a PR in the makings that would introduce source > >> > > timestamps > >> > > > > and > >> > > > > > >>>> watermarks for keeping track of them. I also hacked a > >> > > > > proof-of-concept > >> > > > > > >>> of a > >> > > > > > >>>> windowing system that is able to process out-of-order > > elements > >> > > > > using a > >> > > > > > >>>> FlatMap operator. (It uses panes to perform efficient > >> > > > > > >> pre-aggregations.) > >> > > > > > >>>> > >> > > > > > >>>> Cheers, > >> > > > > > >>>> Aljoscha > >> > > > > > >>>> > >> > > > > > >>> > >> > > > > > >> > >> > > > > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > --e89a8f3ba5cf5ac9750519552016--