Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B734E200C09 for ; Wed, 25 Jan 2017 09:19:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id B5A23160B50; Wed, 25 Jan 2017 08:19:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8D9B0160B4E for ; Wed, 25 Jan 2017 09:19:25 +0100 (CET) Received: (qmail 88036 invoked by uid 500); 25 Jan 2017 08:19:24 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 88024 invoked by uid 99); 25 Jan 2017 08:19:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 25 Jan 2017 08:19:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id D69CCC0C6D for ; Wed, 25 Jan 2017 08:19:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.379 X-Spam-Level: ** X-Spam-Status: No, score=2.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd4-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id SDEOahv-2SOT for ; Wed, 25 Jan 2017 08:19:17 +0000 (UTC) Received: from mail-qk0-f178.google.com (mail-qk0-f178.google.com [209.85.220.178]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 3173B5F2F1 for ; Wed, 25 Jan 2017 08:19:16 +0000 (UTC) Received: by mail-qk0-f178.google.com with SMTP id j126so48445205qkf.1 for ; Wed, 25 Jan 2017 00:19:16 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=Cm1GLnzPCddHtguL+Z8OX8C+6Kc9uMlceYP4Y+ospDo=; b=HTD8hwWBf9Kb3Za/VekhCJPucIHkx9x+EAWtDvv+Gf4+UdflRKnj0g167GyDf9GXCD R7iEJJg5OyHjJ6kbiEVrRXXroozbygIGKBwarjXEf78v2F3xjdZnpCifNzhicrzo+MQW uWhVEYXaduNXYlKXY7nHv9d4tuOrK9SRUX9MnaP6YhY3v+zu9ru6CeLSVNSAhCpp+yoV dhq3lDAZrv9xsmH2GRKEfKvJuhj6hSKkCm5SJJ61a2+87hSSlUrCjFDG7UdZXTM3FO3t uSEr7o9wbJmyr7/EoqJVHLwVIL0lRRnk3RkgAZeZsHewZ9SgUJDJJpuu8F5iRAPoyqEE QQ+w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=Cm1GLnzPCddHtguL+Z8OX8C+6Kc9uMlceYP4Y+ospDo=; b=dIgkhWPSJiOJ058YPXf12tAKkuOmDjA4VxJcwyDxXL5Y1X+HNfmc2gCM2033/3zz0I 6a3xDfaWZlupaKekCHoGNHFdMR63XG0UEnXodE7nQQYL5zbElgLbC2lN9n+bNOfoPWH1 SDFuaeSf94rr+X0XDj8YbQ1jY8YYfMfOKxuYS8C28q7Q/tQUGk9J8ZlUKIrEafhkfnR1 ksC3e5TQMX+hbOVSf1PudXsey33Y/x9NM8ngzvOLe2IqQtb5ixo9qmVt7iUJbjeGlD+z RT09+smW0kiQq0IxxIknWHURShvMrP9IcLYBbT74BmCddZ6RdgiTHFHlwWoso2P8K14W sUXA== X-Gm-Message-State: AIkVDXLKxtxUgCnOSSAKu5tjVISv7W3ySIQ3BSiccEnab2tqxyIAN0WGt737FRJ4tGOF3+SdEqZF325lBFpuBA== X-Received: by 10.233.235.66 with SMTP id b63mr34774791qkg.144.1485332354183; Wed, 25 Jan 2017 00:19:14 -0800 (PST) MIME-Version: 1.0 Received: by 10.55.78.68 with HTTP; Wed, 25 Jan 2017 00:19:13 -0800 (PST) In-Reply-To: <81855F577D8EA841ACA61B25347E999692F543@SZXEMI504-MBS.china.huawei.com> References: <8B754047F81D6B4290B9F4CE928333A517BF0F3F@lhreml503-mbx> <81855F577D8EA841ACA61B25347E999692F543@SZXEMI504-MBS.china.huawei.com> From: Shaoxuan Wang Date: Wed, 25 Jan 2017 16:19:13 +0800 Message-ID: Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Windows for streaming tables To: dev@flink.apache.org Content-Type: multipart/alternative; boundary=94eb2c097570583b9c0546e6e346 archived-at: Wed, 25 Jan 2017 08:19:26 -0000 --94eb2c097570583b9c0546e6e346 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable Hi Liuxinchun, I am not sure where did you get the inception: anyone has suggested "to process Event time window in Sliding Row Window". If you were referring my post, there may be some misunderstanding there. I think you were asking the similar question as Hongyuhong. I have just replied to him. Please take a look and let me know if that makes sense to you. "Retraction" is an important building block to compute correct incremental results in streaming. It is another big topic, we should discuss this in another thread. Regards, Shaoxuan On Wed, Jan 25, 2017 at 3:44 PM, liuxinchun wrote: > I don't think it is a good idea to process Event time window in Sliding > Row Window. In Sliding Time window, when an element is late, we can trigg= er > the recalculation of the related windows. And the sliding period is > coarse-gained, We only need to recalculate size/sliding number of windows= . > But in Sliding Row Window, the calculation is triggered when every elemen= t > is coming. The sliding period is becoming fine-gained. When an element is > late, there are so many "windows" are influenced. Even if we store all th= e > raw data, the computation is very large. > > I think if it is possible to set a standard to sliding Event Time Row > Window, When certain elements are late, we can only recalculate partial > windows and permit some error. For example, we can only recalculate the > windows end in range between (lateElement.timestamp - leftDelta, > lateElement.timestamp] and those windows begin in range between > [lateElement.timestamp, lateElement.timestamp + rightDelta). > //////////////////////////////////////////////////////////// > ////////////////////////// > Hi everyone, > Thanks for this great discussion, and glad to see more and more people ar= e > interested on stream SQL & tableAPI. > > IMO, the key problems for Over window design are the SQL semantics and th= e > runtime design. I totally agree with Fabian that we should skip the desig= n > of TumbleRows and SessionRows windows for now, as they are not well defin= ed > in SQL semantics. > > Runtime design is the most crucial part we are interested in and > volunteered to contribute into. We have thousands of machines running fli= nk > streaming jobs. The costs in terms of CPU, memory, and state are the vita= l > factors that we have to taken into account. We have been working on the > design of OVER window in the past months, and planning to send out a > detailed design doc to DEV quite soon. But since Fabian started a good > discussion on OVER window, I would like to share our ideas/thoughts about > the runtime design for OVER window. > > 1. As SunJincheng pointed out earlier, sliding window does not work fo= r > unbounded preceding, we need alternative approach for unbound over > window. > 2. Though sliding window may work for some cases of bounded window, > it is not very efficient thereby should not be used for production. To > the > best of my understanding, the current runtime implementation of slidin= g > window has not leveraged the concepts of state Panes yet. This means > that > if we use sliding window for OVER window, there will be a backend sta= te > created per each group (partition by) and each row, and whenever a new > record arrives, it will be accumulated to all the existing windows tha= t > has > not been closed. This would cause quite a lot of overhead in terms of > both > CPU and memory&state. > 3. Fabian has mentioned an approach of leveraging =E2=80=9CProcessFunc= tion=E2=80=9D and > a =E2=80=9CsortedState=E2=80=9D. I like this idea. The design details = on this are not > quite > clear yet. So I would like to add more thoughts on this. Regardless > which dataStream API we are going to use (it is very likely that we ne= ed > a new API), we should come out with an optimal approach. The purpose o= f > grouping window and over window is to partition the data, such that we > can > generate the aggregate results. So when we talk about the design of OV= ER > window, we have to think about the aggregates. As we proposed in our > recent > UDAGG doc https://goo.gl/6ntclB, the user defined accumulator will be > stored in the aggregate state. Besides accumulator, we have also > introduced > a retract API for UDAGG. With aggregate accumulator and retract API, I > am > proposing a runtime approach to implement the OVER window as following= s. > 4. > - We first implement a sorted state interface > - Per each group, we just create one sorted state. When a new recor= d > arrives, it will insert into this sorted state, in the meanwhile it > will be > accumulated to the aggregate accumulator. > - For over window, we keep the aggregate accumulator for the entire > job lifelong time. This is different than the case where we delete > the > accumulator for each group/window when a grouping-window is finishe= d. > - When an over window is up to trigger, we grab the > previous accumulator from the state and accumulate values onto it > with all > the records till the upperBoundary of the current window, and > retract all > the out of scope records till its lowerBoundary. We emit the > aggregate result and save the accumulator for the next window. > > > Hello Fabian, > I would suggest we should first start working on runtime design of over > window and aggregate. Once we have a good design there, one can easily ad= d > the support for SQL as well as tableAPI. What do you think? > > Regards, > Shaoxuan > > On Tue, Jan 24, 2017 at 10:42 PM, Fabian Hueske wrote= : > > > Hi Radu, > > > > thanks for your comments! > > > > Yes, my intention is to open new JIRA issues to structure the > > development process. Everybody is very welcome to pick up issues and > > discuss the design proposals. > > At the moment I see the following six issues to start with: > > > > - streaming SQL OVER ROW for processing time > > - bounded PRECEDING > > - unbounded PRECEDING > > > > - streaming SQL OVER RANGE for processing time > > - bounded PRECEDING > > - unbounded PRECEDING > > > > - streaming SQL OVER RANGE for event time > > - bounded PRECEDING > > - unbounded PRECEDING > > > > For each of these windows we need corresponding translation rules and > > execution code. > > > > Subsequent JIRAs would be > > - extending the Table API for supported SQL windows > > - add support for FOLLOWING > > - etc. > > > > Regarding the requirement for a sorted state. I am not sure if the > > OVER windows should be implemented using Flink's DataStream window > framework. > > We need a good design document to figure out what is the best > > approach. A ProcessFunction with a sorted state might be a good solutio= n > as well. > > > > Best, Fabian > > > > > > 2017-01-24 10:41 GMT+01:00 Radu Tudoran : > > > > > Hi all, > > > > > > Thanks for starting these discussion - it is very useful. > > > It does make sense indeed to refactor all these and coordinate a bit > > > the efforts not to have overlapping implementations and incompatible > > solutions. > > > > > > If you close the 3 jira issues you mentioned - do you plan to > > > redesign them and open new ones? Do you need help from our side - we > > > can also pick the redesign of some of these new jira issues. For > > > example we already > > have > > > an implementation for this and we can help with the design. > > > Nevertheless, let's coordinate the effort. > > > > > > Regarding the support for the different types of window - I think > > > the > > best > > > option is to split the implementation in small units. We can easily > > > do > > this > > > from the transformation rule class and with this each particular > > > type of window (session/sliding/sliderows/processing time/...) will > > > have a clear implementation and a corresponding architecture within > the jira issue? > > What > > > do you think about such a granularity? > > > > > > Regarding the issue of " Q4: The implementaion of SlideRows still > > > need a custom operator that collects records in a priority queue > > > ordered by the "rowtime", which is similar to the design we > > > discussed in FLINK-4697, right? " > > > Why would you need this operator? The window buffer can act to some > > extent > > > as a priority queue as long as the trigger and evictor is set to > > > work > > based > > > on the rowtime - or maybe I am missing something... Can you please > > clarify > > > this. > > > > > > > > > Dr. Radu Tudoran > > > Senior Research Engineer - Big Data Expert IT R&D Division > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > European Research Center > > > Riesstrasse 25, 80992 M=C3=BCnchen > > > > > > E-mail: radu.tudoran@huawei.com > > > Mobile: +49 15209084330 > > > Telephone: +49 891588344173 > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH > > > Hansaallee 205, 40549 D=C3=BCsseldorf, Germany, www.huawei.com > > > Registered Office: D=C3=BCsseldorf, Register Court D=C3=BCsseldorf, H= RB 56063, > > > Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN > > > Sitz der Gesellschaft: D=C3=BCsseldorf, Amtsgericht D=C3=BCsseldorf, = HRB 56063, > > > Gesch=C3=A4ftsf=C3=BChrer: Bo PENG, Wanzhou MENG, Lifang CHEN > > > This e-mail and its attachments contain confidential information from > > > HUAWEI, which is intended only for the person or entity whose address > is > > > listed above. Any use of the information contained herein in any way > > > (including, but not limited to, total or partial disclosure, > > reproduction, > > > or dissemination) by persons other than the intended recipient(s) is > > > prohibited. If you receive this e-mail in error, please notify the > sender > > > by phone or email immediately and delete it! > > > > > > > > > -----Original Message----- > > > From: Jark Wu [mailto:wuchong.wc@alibaba-inc.com] > > > Sent: Tuesday, January 24, 2017 6:53 AM > > > To: dev@flink.apache.org > > > Subject: Re: [DISCUSS] Development of SQL OVER / Table API Row Window= s > > for > > > streaming tables > > > > > > Hi Fabian, > > > > > > Thanks for bringing up this discussion and the nice approach to avoid > > > overlapping contributions. > > > > > > All of these make sense to me. But I have some questions. > > > > > > Q1: If I understand correctly, we will not support TumbleRows and > > > SessionRows at the beginning. But maybe support them as a syntax suga= r > > (in > > > Table API) when the SlideRows is supported in the future. Right ? > > > > > > Q2: How to support SessionRows based on SlideRows ? I don't get how = to > > > partition on "gap-separated". > > > > > > Q3: Should we break down the approach into smaller tasks for streamin= g > > > tables and batch tables ? > > > > > > Q4: The implementaion of SlideRows still need a custom operator that > > > collects records in a priority queue ordered by the "rowtime", which = is > > > similar to the design we discussed in FLINK-4697, right? > > > > > > +1 not support for OVER ROW for event time at this point. > > > > > > Regards, Jark > > > > > > > > > > =E5=9C=A8 2017=E5=B9=B41=E6=9C=8824=E6=97=A5=EF=BC=8C=E4=B8=8A=E5= =8D=8810:28=EF=BC=8CHongyuhong =E5=86=99=E9=81=93= =EF=BC=9A > > > > > > > > Hi, > > > > We are also interested in streaming sql and very willing to > participate > > > and contribute. > > > > > > > > We are now in progress and we will also contribute to calcite to pu= sh > > > forward the window and stream-join support. > > > > > > > > > > > > > > > > -------------- > > > > Sender: Fabian Hueske [mailto:fhueske@gmail.com] Send Time: > 2017=E5=B9=B41=E6=9C=8824=E6=97=A5 > > > > 5:55 > > > > Receiver: dev@flink.apache.org > > > > Theme: Re: [DISCUSS] Development of SQL OVER / Table API Row Window= s > > > > for streaming tables > > > > > > > > Hi Haohui, > > > > > > > > our plan was in fact to piggy-back on Calcite and use the TUMBLE > > > function [1] once is it is available (CALCITE-1345 [2]). > > > > Unfortunately, this issue does not seem to be very active, so I don= 't > > > know what the progress is. > > > > > > > > I would suggest to move the discussion about group windows to a > > separate > > > thread and keep this one focused on the organization of the SQL OVER > > > windows. > > > > > > > > Best, > > > > Fabian > > > > > > > > [1] http://calcite.apache.org/docs/stream.html) > > > > [2] https://issues.apache.org/jira/browse/CALCITE-1345 > > > > > > > > 2017-01-23 22:42 GMT+01:00 Haohui Mai : > > > > > > > >> Hi Fabian, > > > >> > > > >> FLINK-4692 has added the support for tumbling window and we are > > > >> excited to try it out and expose it as a SQL construct. > > > >> > > > >> Just curious -- what's your thought on the SQL syntax on tumbling > > > window? > > > >> > > > >> Implementation wise it might make sense to think tumbling window a= s > a > > > >> special case of the sliding window. > > > >> > > > >> The problem I see is that the OVER construct might be insufficient > to > > > >> support all the use cases of tumbling windows. For example, it fai= ls > > > >> to express tumbling windows that have fractional time units (as > > > >> pointed out in http://calcite.apache.org/docs/stream.html). > > > >> > > > >> It looks to me that the Calcite / Azure Stream Analytics have > > > >> introduced a new construct (TUMBLE / TUMBLINGWINDOW) to address th= is > > > issue. > > > >> > > > >> Do you think it is a good idea to follow the same conventions? You= r > > > >> ideas are appreciated. > > > >> > > > >> Regards, > > > >> Haohui > > > >> > > > >> > > > >> On Mon, Jan 23, 2017 at 1:02 PM Haohui Mai > > wrote: > > > >> > > > >>> +1 > > > >>> > > > >>> We are also quite interested in these features and would love to > > > >>> participate and contribute. > > > >>> > > > >>> ~Haohui > > > >>> > > > >>> On Mon, Jan 23, 2017 at 7:31 AM Fabian Hueske > > > wrote: > > > >>> > > > >>>> Hi everybody, > > > >>>> > > > >>>> it seems that currently several contributors are working on new > > > >>>> features for the streaming Table API / SQL around row windows (a= s > > > >>>> defined in > > > >>>> FLIP-11 > > > >>>> [1]) and SQL OVER-style window (FLINK-4678, FLINK-4679, > FLINK-4680, > > > >>>> FLINK-5584). > > > >>>> Since these efforts overlap quite a bit I spent some time thinki= ng > > > >>>> about how we can approach these features and how to avoid > > > >>>> overlapping contributions. > > > >>>> > > > >>>> The challenge here is the following. Some of the Table API row > > > >>>> windows > > > >> as > > > >>>> defined by FLIP-11 [1] are basically SQL OVER windows while othe= r > > > >>>> cannot be easily expressed as such (TumbleRows for row-count > > > >>>> intervals, SessionRows). > > > >>>> However, since Calcite already supports SQL OVER windows, we can > > > >>>> reuse > > > >> the > > > >>>> optimization logic for some of the Table API row windows. I also > > > >>>> thought about the semantics of the TumbleRows and SessionRows > > > >>>> windows as defined in > > > >>>> FLIP-11 and came to the conclusion that these are not well defin= ed > > > >>>> in > > > >>>> FLIP-11 and should rather be defined as SlideRows windows with a > > > >>>> special PARTITION BY clause. > > > >>>> > > > >>>> I propose to approach SQL OVER windows and Table API row windows > as > > > >>>> follows: > > > >>>> > > > >>>> We start with three simple cases for SQL OVER windows (not Table > > > >>>> API > > > >> yet): > > > >>>> > > > >>>> * OVER RANGE for event time > > > >>>> * OVER RANGE for processing time > > > >>>> * OVER ROW for processing time > > > >>>> > > > >>>> All cases fulfill the following restrictions: > > > >>>> - All aggregations in SELECT must refer to the same window. > > > >>>> - PARTITION BY may not contain the rowtime attribute. > > > >>>> - ORDER BY must be on rowtime attribute (for event time) or on a > > > >>>> marker function that indicates processing time. Additional sort > > > >>>> attributes are not supported initially. > > > >>>> - only "BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW" and "BETWEE= N > x > > > >>>> PRECEDING AND CURRENT ROW" are supported. > > > >>>> > > > >>>> OVER ROW for event time cannot be easily supported. With event > > > >>>> time, we may have late records which need to be injected into th= e > > > >>>> order of records. > > > >>>> When > > > >>>> a record in injected in to the order where a row-count window ha= s > > > >> already > > > >>>> been computed, this and all following windows will change. We > could > > > >> either > > > >>>> drop the record or sent out many retraction records. I think it = is > > > >>>> best > > > >> to > > > >>>> not open this can of worms at this point. > > > >>>> > > > >>>> The rational for all of the above restrictions is to have first > > > >>>> versions of OVER windows soon. > > > >>>> Once we have the above cases covered we can extend and remove > > > >> limitations > > > >>>> as follows: > > > >>>> > > > >>>> - Table API SlideRow windows (with the same restrictions as > above). > > > >>>> This will be mostly API work since the execution part has been > > solved > > > before. > > > >>>> - Add support for FOLLOWING (except UNBOUNDED FOLLOWING) > > > >>>> - Add support for different windows in SELECT. All windows must = be > > > >>>> partitioned and ordered in the same way. > > > >>>> - Add support for additional ORDER BY attributes (besides time). > > > >>>> > > > >>>> As I said before, TumbleRows and SessionRows windows as in FLIP-= 11 > > > >>>> are > > > >> not > > > >>>> well defined, IMO. > > > >>>> They can be expressed as SlideRows windows with special > > > >>>> partitioning (partitioning on fixed, non-overlapping time ranges > > > >>>> for TumbleRows, and gap-separated, non-overlapping time ranges f= or > > > >>>> SessionRows) I would not start to work on those yet. > > > >>>> > > > >>>> I would like to close all related JIRA issues (FLINK-4678, > > > >>>> FLINK-4679, FLINK-4680, FLINK-5584) and restructure the > development > > > >>>> of these > > > >> features > > > >>>> as outlined above with corresponding JIRA issues. > > > >>>> > > > >>>> What do others think? (I cc'ed the contributors assigned to the > > > >>>> above > > > >> JIRA > > > >>>> issues) > > > >>>> > > > >>>> Best, Fabian > > > >>>> > > > >>>> [1] > > > >>>> > > > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP- > > > >> 11%3A+Table+API+Stream+Aggregations > > > >>>> > > > >>> > > > >> > > > > > > > > > --94eb2c097570583b9c0546e6e346--