Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1F0601779F for ; Tue, 14 Jul 2015 10:41:16 +0000 (UTC) Received: (qmail 5695 invoked by uid 500); 14 Jul 2015 10:41:16 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 5639 invoked by uid 500); 14 Jul 2015 10:41:15 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 5627 invoked by uid 99); 14 Jul 2015 10:41:15 -0000 Received: from Unknown (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 14 Jul 2015 10:41:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 21E2BD4784 for ; Tue, 14 Jul 2015 10:41:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.898 X-Spam-Level: ** X-Spam-Status: No, score=2.898 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=3, RCVD_IN_MSPIKE_H2=-0.001, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-us-west.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 4Tdj0tDLTM21 for ; Tue, 14 Jul 2015 10:41:12 +0000 (UTC) Received: from mail-lb0-f171.google.com (mail-lb0-f171.google.com [209.85.217.171]) by mx1-us-west.apache.org (ASF Mail Server at mx1-us-west.apache.org) with ESMTPS id 6415622F0B for ; Tue, 14 Jul 2015 10:41:11 +0000 (UTC) Received: by lbbyj8 with SMTP id yj8so3595582lbb.0 for ; Tue, 14 Jul 2015 03:40:18 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :content-type; bh=whUEVr6+XsSv2Ma0GGREX/C7QBXToN/OsoaZN9DRT/w=; b=m8RbimOeat/VVkZmU1cFos5bw6hSphitOnEZIiDdxufFxELr2+uLSRhBj5GwVfTleD lJDgruRXnetJgU89vox8zyKdSbTF3PPx9WLO1HEX1ocMiYI6IKtNYU8F1AoMYSxqGCpC 5s9zwwtEbFSSVAb7YlO7DmCIBfJuCuULN2n0EHfcJk0XOxjmqRlg3HgQ9meqyDVnGavz H+ysFHJSvRcCA/po28EOU6xohGx7NwNXxxcewksSrLsOmRifN6gG7/Y+o/CMZnHzt9Bm T+KJulB/eXgo6z8CAHUlzaKwCSORpbAMTO8D/4ze/vj/rpQjScoRg2eJpyNP32Tqpyf3 PqQg== X-Received: by 10.152.29.97 with SMTP id j1mr31213738lah.104.1436870418665; Tue, 14 Jul 2015 03:40:18 -0700 (PDT) MIME-Version: 1.0 References: In-Reply-To: From: =?UTF-8?Q?Gyula_F=C3=B3ra?= Date: Tue, 14 Jul 2015 10:40:09 +0000 Message-ID: Subject: Re: Design documents for consolidated DataStream API To: "dev@flink.apache.org" Content-Type: multipart/alternative; boundary=089e0160bdaae498e0051ad37627 --089e0160bdaae498e0051ad37627 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable I see your point, reduceByKey is much clearer. The question is whether we want to introduce this inconsistency across the two api-s or stick with what we have. On Tue, Jul 14, 2015 at 10:57 AM Aljoscha Krettek wrote: > I agree, the groupBy, in the batch API is misleading, since a > ds.groupBy().reduce() does not really build any groups, it is really a > ds.keyBy().reduceByKey(). In the streaming API we can still fix this, IMH= O. > > On Tue, 14 Jul 2015 at 10:56 Stephan Ewen wrote: > > > It is not a bit different than the batch API, because streaming semanti= cs > > are a bit different ;-) > > > > One good thing is that we can make things better that were sub-optimal = in > > the Batch API. > > > > On Tue, Jul 14, 2015 at 10:55 AM, Stephan Ewen wrote= : > > > > > keyBy() does not do any grouping. Grouping in streams in not defined > > > without windows. > > > > > > On Tue, Jul 14, 2015 at 10:48 AM, Gyula F=C3=B3ra > > wrote: > > > > > >> If we only want to have either keyBy or groupBy, why not keep groupB= y? > > >> That > > >> would be more consistent with the batch api. > > >> On Tue, Jul 14, 2015 at 10:35 AM Stephan Ewen > wrote: > > >> > > >> > Concerning your comments: > > >> > > > >> > 1) In the new design, there is no grouping without windowing. The > > >> > KeyedDataStream subsumes the grouping and key-ing for partitioned > > state. > > >> > > > >> > The keyBy() + window() makes a parallel grouped window > > >> > keyBy() alone allows access to partitioned state. > > >> > > > >> > My thought was that this is simpler, because it needs not > > groupBy() > > >> and > > >> > keyBy(), but one construct to handle both cases. > > >> > > > >> > 2) The discretization is a rough thought and is nothing for the > short > > >> term. > > >> > It totally needs more thoughts. I put it there to have it as a > sketch > > >> for > > >> > how to evolve this. > > >> > > > >> > The idea is of course to not have a single data set, but a > series > > of > > >> > data set. In each discrete time slice, the data set can be treated > > like > > >> a > > >> > regular data set. > > >> > > > >> > Let's kick off a separate design for the discretization. Joins > are > > >> good > > >> > to talk about (data sets can be joined with data set), and I am su= re > > >> there > > >> > are more questions coming up. > > >> > > > >> > > > >> > Does that make sense? > > >> > > > >> > > > >> > > > >> > > > >> > > > >> > On Tue, Jul 14, 2015 at 10:05 AM, Gyula F=C3=B3ra > > >> wrote: > > >> > > > >> > > I think Marton has some good points here. > > >> > > > > >> > > 1) Is KeyedDataStream a better name if this is only a renaming? > > >> > > > > >> > > 2) the discretize semantics is unclear indeed. Are we operating > on a > > >> > single > > >> > > or sequence of datasets? If the latter why not call it something > > else > > >> > > (dstream). How are joins and other binary operators defined for > > >> different > > >> > > discretizations etc. > > >> > > On Mon, Jul 13, 2015 at 7:37 PM M=C3=A1rton Balassi < > mbalassi@apache.org > > > > > >> > > wrote: > > >> > > > > >> > > > Generally I agree with the new design. Two concerns: > > >> > > > > > >> > > > 1) Does KeyedDataStream replace GroupedDataStream or is it the > > >> latter a > > >> > > > special case of the former? > > >> > > > > > >> > > > The KeyedDataStream as described in the design document is a b= it > > >> > unclear > > >> > > > for me. It lists the following usages: > > >> > > > a) It is the first step in building a window stream, on top = of > > >> which > > >> > > the > > >> > > > grouped/windowed aggregation and reduce-style function can be > > >> applied > > >> > > > b) It allows to use the "by-key" state of functions. Here, > every > > >> > record > > >> > > > has access to a state that is scoped by its key. Key-scoped > state > > >> can > > >> > be > > >> > > > automatically redistributed and repartitioned. > > >> > > > > > >> > > > The code snippet describes a use case where the computation an= d > > the > > >> > > access > > >> > > > of the state is used the way currently the GroupedDataStream > > should > > >> > > work. I > > >> > > > suppose this is the example for case b). Would case a) also > window > > >> > > elements > > >> > > > by key? If yes, then this is practically a renaming and > > enhancement > > >> of > > >> > > the > > >> > > > GroupedDataStream functionality with keyed state. Then the > > >> > > > StreamExecutionEnvironment.createKeyedStream(Partitioner, > > >> > > > KeySelector)construction does not make much sense as the user > only > > >> > > operates > > >> > > > within the scope of the keyselector and not the partitioner > > anyway. > > >> > > > > > >> > > > I personally think KeyedDataStream as a name does not > necessarily > > >> > suggest > > >> > > > that the records are grouped by key, it only suggests > partitioning > > >> by > > >> > > key - > > >> > > > at least for me. :) > > >> > > > > > >> > > > 2) The API for discretization is not convenient IMHO > > >> > > > > > >> > > > The discretization part declares that the output of > > >> > > DataStream.discretize() > > >> > > > is a sequence of DataSets. I love this approach, but then in t= he > > >> code > > >> > > > snippet the return value of this function is simply a DataSet > and > > >> uses > > >> > it > > >> > > > as such. The take home message of that code is the following: > this > > >> is > > >> > > > actually the way you would like to program on these sequence o= f > > >> > DataSets, > > >> > > > most probably you would like to do the same with each of them. > If > > >> that > > >> > is > > >> > > > the case we should provide a nice utility for that. I think > Spark > > >> > > > Streaming's DStream.foreachRDD() is fairly useful for this > > purpose. > > >> > > > > > >> > > > On Mon, Jul 13, 2015 at 6:30 PM, Gyula F=C3=B3ra < > gyula.fora@gmail.com > > > > > >> > > wrote: > > >> > > > > > >> > > > > +1 > > >> > > > > On Mon, Jul 13, 2015 at 6:23 PM Stephan Ewen < > sewen@apache.org> > > >> > wrote: > > >> > > > > > > >> > > > > > If naming is the only concern, then we should go ahead, > > because > > >> we > > >> > > can > > >> > > > > > change names easily (before the release). > > >> > > > > > > > >> > > > > > In fact, I don't think it leaves a bad impression. Global > > >> windows > > >> > are > > >> > > > > > non-parallel windows. There are also parallel windows. Pic= k > > what > > >> > you > > >> > > > need > > >> > > > > > and what works. > > >> > > > > > > > >> > > > > > > > >> > > > > > On Mon, Jul 13, 2015 at 6:13 PM, Gyula F=C3=B3ra < > > >> gyula.fora@gmail.com> > > >> > > > > wrote: > > >> > > > > > > > >> > > > > > > I think we agree on everything its more of a naming issu= e > :) > > >> > > > > > > > > >> > > > > > > I thought it might be misleading that global time window= s > > are > > >> > > > > > > "non-parallel" windows. We dont want to give a bad > > impression. > > >> > > (Also > > >> > > > we > > >> > > > > > > dont want them to think that every global window is > parallel > > >> but > > >> > > > thats > > >> > > > > > not > > >> > > > > > > a problem here) > > >> > > > > > > > > >> > > > > > > Gyula > > >> > > > > > > On Mon, Jul 13, 2015 at 5:22 PM Stephan Ewen < > > >> sewen@apache.org> > > >> > > > wrote: > > >> > > > > > > > > >> > > > > > > > Okay, what is missing about the windowing in your > opinion? > > >> > > > > > > > > > >> > > > > > > > The core points of the document are: > > >> > > > > > > > > > >> > > > > > > > - The parallel windows are per group only. > > >> > > > > > > > > > >> > > > > > > > - The implementation of the parallel windows holds > > window > > >> > data > > >> > > in > > >> > > > > the > > >> > > > > > > > group buffers. > > >> > > > > > > > > > >> > > > > > > > - The global windows are non-parallel. May have > parallel > > >> > > > > > > pre-aggregation, > > >> > > > > > > > if they are time windows. > > >> > > > > > > > > > >> > > > > > > > - Time may be operator time (timer thread), or > watermark > > >> > time. > > >> > > > > > > Watermark > > >> > > > > > > > time can refer to ingress or event time. > > >> > > > > > > > > > >> > > > > > > > - Windows that do not pre-aggregate may require > elements > > >> in > > >> > > > order. > > >> > > > > > Not > > >> > > > > > > > part of the first prototype. > > >> > > > > > > > > > >> > > > > > > > Do we agree on those points? > > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > On Mon, Jul 13, 2015 at 4:50 PM, Gyula F=C3=B3ra < > > >> > > gyula.fora@gmail.com> > > >> > > > > > > wrote: > > >> > > > > > > > > > >> > > > > > > > > In general I like it, although the main difference > > between > > >> > the > > >> > > > > > current > > >> > > > > > > > and > > >> > > > > > > > > the new one is the windowing and that is still not > very > > >> > clear. > > >> > > > > > > > > > > >> > > > > > > > > Where do we have the full stream time windows for > > >> > > instance?(which > > >> > > > > is > > >> > > > > > > > > parallel but not keyed) > > >> > > > > > > > > On Mon, Jul 13, 2015 at 4:28 PM Aljoscha Krettek < > > >> > > > > > aljoscha@apache.org> > > >> > > > > > > > > wrote: > > >> > > > > > > > > > > >> > > > > > > > > > +1 I like it as well. > > >> > > > > > > > > > > > >> > > > > > > > > > On Mon, 13 Jul 2015 at 16:17 Kostas Tzoumas < > > >> > > > ktzoumas@apache.org > > >> > > > > > > > >> > > > > > > > wrote: > > >> > > > > > > > > > > > >> > > > > > > > > > > +1 from my side > > >> > > > > > > > > > > > > >> > > > > > > > > > > On Mon, Jul 13, 2015 at 4:15 PM, Stephan Ewen < > > >> > > > > sewen@apache.org> > > >> > > > > > > > > wrote: > > >> > > > > > > > > > > > > >> > > > > > > > > > > > Do we have consensus on these designs? > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > If we have, we should get to implementing this > > soon, > > >> > > > because > > >> > > > > > > > > basically > > >> > > > > > > > > > > all > > >> > > > > > > > > > > > streaming patches will have to be revisited in > > >> light of > > >> > > > > this... > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > On Tue, Jul 7, 2015 at 3:41 PM, Gyula F=C3=B3r= a < > > >> > > > > > gyula.fora@gmail.com > > >> > > > > > > > > > >> > > > > > > > > > wrote: > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > You are right thats an important issue. > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > And I think we should also do some renaming > with > > >> the > > >> > > > > > > "iterations" > > >> > > > > > > > > > > because > > >> > > > > > > > > > > > > they are not really iterations like in the > batch > > >> case > > >> > > and > > >> > > > > it > > >> > > > > > > > might > > >> > > > > > > > > > > > confuse > > >> > > > > > > > > > > > > some users. > > >> > > > > > > > > > > > > Maybe we can call them loops or cycles and > > rename > > >> the > > >> > > api > > >> > > > > > calls > > >> > > > > > > > to > > >> > > > > > > > > > make > > >> > > > > > > > > > > > it > > >> > > > > > > > > > > > > more intuitive what happens. It is really > just a > > >> > cyclic > > >> > > > > > > dataflow. > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > Aljoscha Krettek ezt > =C3=ADrta > > >> > > > (id=C5=91pont: > > >> > > > > > > 2015. > > >> > > > > > > > > j=C3=BAl. > > >> > > > > > > > > > > 7., > > >> > > > > > > > > > > > > K, > > >> > > > > > > > > > > > > 15:35): > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Hi, > > >> > > > > > > > > > > > > > I just noticed that we don't have anything > > about > > >> > how > > >> > > > > > > iterations > > >> > > > > > > > > and > > >> > > > > > > > > > > > > > timestamps/watermarks should interact. > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > Cheers, > > >> > > > > > > > > > > > > > Aljoscha > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > On Mon, 6 Jul 2015 at 23:56 Stephan Ewen < > > >> > > > > sewen@apache.org > > >> > > > > > > > > >> > > > > > > > > wrote: > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Hi all! > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > As many of you know, there are a ongoing > > >> efforts > > >> > to > > >> > > > > > > > consolidate > > >> > > > > > > > > > the > > >> > > > > > > > > > > > > > > streaming API for the next release, and > then > > >> > > graduate > > >> > > > > it > > >> > > > > > > > (from > > >> > > > > > > > > > beta > > >> > > > > > > > > > > > > > > status). > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > In the process of this consolidation, we > > want > > >> to > > >> > > > > achieve > > >> > > > > > > the > > >> > > > > > > > > > > > following > > >> > > > > > > > > > > > > > > goals. > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > - Make the code more robust and simplif= y > it > > >> in > > >> > > parts > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > - Clearly define the semantics of the > > >> > constructs. > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > - Prepare it for support of more advanc= ed > > >> > > concepts, > > >> > > > > like > > >> > > > > > > > > > > > partitionable > > >> > > > > > > > > > > > > > > state, and event time. > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > - Cut support for certain corner cases > that > > >> were > > >> > > > > > > prototyped, > > >> > > > > > > > > but > > >> > > > > > > > > > > > > turned > > >> > > > > > > > > > > > > > > out to be not efficiently doable > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Based on prior discussions on the mailin= g > > >> list, > > >> > > > > Aljoscha > > >> > > > > > > and > > >> > > > > > > > me > > >> > > > > > > > > > > > drafted > > >> > > > > > > > > > > > > > the > > >> > > > > > > > > > > > > > > design documents below, which outline ho= w > > the > > >> > > > > > consolidated > > >> > > > > > > > API > > >> > > > > > > > > > > would > > >> > > > > > > > > > > > > > like. > > >> > > > > > > > > > > > > > > We focused in constructs, time, and wind= ow > > >> > > semantics. > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Design document on how to restructure th= e > > >> > Streaming > > >> > > > > API: > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/Streams+and+Operations+= on+Streams > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Design document on definitions of time, > > order, > > >> > and > > >> > > > the > > >> > > > > > > > > resulting > > >> > > > > > > > > > > > > > semantics: > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/Time+and+Order+in+Strea= ms > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Note: The design of the interfaces and > > >> concepts > > >> > for > > >> > > > > > > advanced > > >> > > > > > > > > > state > > >> > > > > > > > > > > in > > >> > > > > > > > > > > > > > > functions is not in here. That is part o= f > a > > >> > > separate > > >> > > > > > design > > >> > > > > > > > > > > > discussion > > >> > > > > > > > > > > > > > and > > >> > > > > > > > > > > > > > > orthogonal to the designs drafted here. > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Please have a look and voice questions a= nd > > >> > > concerns. > > >> > > > > > Since > > >> > > > > > > we > > >> > > > > > > > > > > should > > >> > > > > > > > > > > > > not > > >> > > > > > > > > > > > > > > break the streaming API more than once, = we > > >> should > > >> > > > make > > >> > > > > > sure > > >> > > > > > > > > this > > >> > > > > > > > > > > > > > > consolidation brings it into the shape w= e > > >> want it > > >> > > to > > >> > > > be > > >> > > > > > in. > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > Greetings, > > >> > > > > > > > > > > > > > > Stephan > > >> > > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > > >> > > > > > > > > > > > > > > >> > > > > > > > > > > > > > >> > > > > > > > > > > > > >> > > > > > > > > > > > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > > > > > > > > --089e0160bdaae498e0051ad37627--