flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zhangrucong <zhangruc...@huawei.com>
Subject re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
Date Thu, 13 Oct 2016 11:04:09 GMT
Hi Fabian:
        What is the strategy for new syntax which calcite does not support? The calcite will
support it? For example, the row window syntax.

Thank you very much!



-----邮件原件-----
发件人: Fabian Hueske [mailto:fhueske@gmail.com] 
发送时间: 2016年10月13日 18:17
收件人: dev@flink.apache.org
抄送: Sean Wang; Timo Walther
主题: Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations

Hi Zhangrucong,

yes, we want to use Calcite's SQL parser including its window syntax, i.e.,

- the standard SQL OVER windows (in streaming with a few restriction such as no different
partitionings or orders)
- the GroupBy window functions (TUMBLE, HOP, SESSION).

The GroupBy window function are not implemented in Calcite yet. There is
CALCITE-1345 [1] to track the issue.

As Shaoxuan mentioned, we are not using the STREAM keyword to be SQL compliant.

Best, Fabian

[1] https://issues.apache.org/jira/browse/CALCITE-1345

2016-10-13 12:05 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:

> Hi everybody,
>
> happy to see a good discussion here :-) I'll reply to Shaoxuan's mail 
> first and comment on Zhangrucong question in a separate mail.
>
> Shaoxuan, thanks for the suggestions! I think we all agree that for 
> SQL we should definitely follow the standard (batch) SQL syntax.
> In my opinion, the Table API does not necessarily have to be as close 
> as possible to SQL but should try to make a few things easier and also 
> safer (easier is of course subjective).
>
> - GroupBy without windows: These are currently intentionally not 
> supported and also not part of FLIP-11. Our motivation for not 
> supporting this, is to guard the user from defining a query that fails 
> when being executed due to a very memory consuming operation. FLIP-11 
> provides a way to define such a query as a sliding row window with 
> unbounded preceding rows. With the upcoming SQL proposal, queries that 
> consume unbounded memory should be identified and rejected. I would be 
> in favor of allowing groupBy without windows once the guarding mechanism are in place.
>
> - GroupBy with window: I think this is a question of taste. Having a
> window() call, makes the feature more explicit in my opinion. However, 
> I'm not opposed to move the windows into the groupBy clause.
> Implementation-wise it should be easy to move the window definition 
> into to groupBy clause for the Scala Table API. For the Java Table API 
> we would need to extend the parser quite a bit because windows would 
> need to be defined as Strings and not via objects.
>
> - RowWindows: The rowWindow() call mimics the standard SQL WINDOW 
> clause (implemented by PostgreSQL and Calcite) which allows to have "reusable"
> window definitions. I think this is a desirable feature. In the 
> FLIP-11 proposal the over() clause in select() refers to the 
> predefined windows with aliases. In case only one window is defined, 
> the over() clause is optional and the same (and only) window is 
> applied to all aggregates. I think we can make the over() call 
> mandatory to have the windowing more explicit. It should also be 
> possible to extend the over clause to directly accept RowWindows 
> instead of window aliases. I would not make this a priority at the 
> moment, but a feature that could be later added, because
> rowWindow() and over() cover all cases. Similar as for GroupBy with 
> windows, we would need to extend the parser for the Java Table API though.
>
> Finally, I have an own suggestion:
> In FLIP-11, groupBy() is  used to define the partitioning of 
> RowWindows. I think this should be changed to partitionBy() because 
> groupBy() groups data and applies an aggregation to all rows of a 
> group which is not happening here. In original SQL, the OVER clause 
> features a PARTITION BY clause. We are moving this out of the window 
> definition, i.e., OVER clause, to enforce the same partitioning for 
> all windows (different partitionings would be a challenge to execute in a parallel system).
>
> @Timo and all: What do you think about:
>
> - moving windows into the groupBy() call
> - making over() for rowWindow() with a single window definition
> - additionally allowing window definitions in over()
> - using partitionBy() instead of groupBy() for row windows?
>
> Best, Fabian
>
> 2016-10-13 11:10 GMT+02:00 Zhangrucong <zhangrucong@huawei.com>:
>
>> Hi shaoxuan:
>>
>> I think,  the streamsql must be excuted in table environment. So I 
>> call this table API ‘s StreamSQL. What do you call for this, stream 
>> Table API or streamsql? It is fu
>>
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> val tblEnv = TableEnvironment.getTableEnvironment(env)
>> val ds: DataStream[(String,Long, Long)] = 
>> env.readTextFile("/home/demo") tblEnv.registerDataStream("Order", ds, 'userID, 'count,
'num)
>>     .map(f=>(f, 1L, 1L))
>> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'")
>>
>> So in my opinion, the grammar which is marked red should be 
>> compatible with calcite's StreamSQL grammar.
>>
>> By the way,  thanks very much for telling me the modified content in 
>> Flink StreamSQL. I will look the new proposal .
>>
>> Thanks!
>> 发件人: Sean Wang [mailto:wshaoxuan@gmail.com]
>> 发送时间: 2016年10月13日 16:29
>> 收件人: dev@flink.apache.org; Zhangrucong
>> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
>>
>> Hi  zhangrucong,
>> I am not sure what you mean by "table API'S StreamSQL", I guess you 
>> mean "stream TableAPI"?
>> TableAPI should be compatible with calcite SQL. (By compatible, My 
>> understanding is that both TableAPI and SQL will be translated to the 
>> same logical plan - the same set of REL and REX).
>> BTW, please note that we recently have merged a change to remove 
>> STREAM keyword for flink stream SQL(FLINK-4546). In our opinion, 
>> batch and stream are not necessarily to be differentiated at the SQL 
>> level. The major difference between batch and stream is "WHEN and HOW to emit the
result".
>> We have been working on a new proposal with Fabian on this change. I 
>> guess it will be sent out for review very soon.
>>
>> Regards,
>> Shaoxuan
>>
>>
>> On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong <zhangrucong@huawei.com 
>> <mailto:zhangrucong@huawei.com>> wrote:
>> Hi shaoxuan:
>> Does the table API'S StreamSQL grammar is compatible with calcite's 
>> StreamSQL grammar?
>>
>>
>> 1、In calcite, the tumble window is realized by using function tumble 
>> or hop. And the function must be used with group by, like this:
>>
>> SELECT
>>   TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,
>>   productId,
>>   COUNT(*) AS c,
>>   SUM(units) AS units
>> FROM Orders
>> GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
>>   productId;
>>
>> 2、 The sliding window uses keywords "window" and "over". Like this:
>>
>> SELECT  *
>> FROM (
>>   SELECT STREAM rowtime,
>>     productId,
>>     units,
>>     AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS m10,
>>     AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7
>>   FROM Orders
>>   WINDOW product AS (
>>     ORDER BY rowtime
>>     PARTITION BY productId))
>>
>>
>>
>> Thanks!
>>
>> -----邮件原件-----
>> 发件人: 王绍翾(大沙) 
>> [mailto:shaoxuan.wsx@alibaba-inc.com<mailto:shaoxuan.wsx@ali
>> baba-inc.com>]
>> 发送时间: 2016年10月13日 2:03
>> 收件人: dev@flink.apache.org<mailto:dev@flink.apache.org>
>> 主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
>>
>> Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This is a 
>> really great and promising proposal. I have a few comments to the "window"
>> operator proposed in this FLIP (I am hoping it is not too late to 
>> bring up this). First, window is not always needed for the stream 
>> aggregation. There are cases where we want do an aggreation on a 
>> stream, while the query/emit strategy decides when to emit a 
>> streaming output. Second, window is needed when we want do an 
>> aggregation for a certain rage, but window is not an operator. We 
>> basically use window to define the range for aggregation. In tableAPI, a window should
be defined together with "groupby" and "select"
>> operators, either inside a "groupby" operator or after an "over" 
>> clause in "select" operator. This will make the TableAPI in the similar manner as
SQL.
>> For instance,[A groupby without window] <Table API> val res = tab
>> .groupBy(‘a)
>> .select(‘a, ‘b.sum)
>> <SQL>
>> SELECT a, SUM(b)
>> FROM tab
>> GROUP BY a
>> [A tumble window inside groupby]
>> <Table API>val res = tab
>> .groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum) 
>> <SQL>SELECT a, SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes , 
>> ‘rowtime) [A row tumble window after OVER] <Table API>.groupby('a) 
>> //optional .select(‘a, ‘b.count over rowTumble(10.minutes, 
>> ‘rowtime))<SQL>SELECT a,
>> COUNT(b) OVER ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a 
>> Please let me know what you think.
>> Regards,Shaoxuan
>> ------------------------------------------------------------------发件人
>> :Fabian
>> Hueske 
>> <fhueske@gmail.com<mailto:fhueske@gmail.com>>发送时间:2016年9月26日(星期一)
>> 21:13收件人:dev@flink.apache.org<mailto:dev@flink.apache.org> <
>> dev@flink.apache.org<mailto:dev@flink.apache.org>>主 题:Re: [DISCUSS]
>> FLIP-11: Table API Stream Aggregations Hi everybody,
>>
>> Timo proposed our FLIP-11 a bit more than three weeks ago.
>> I will update the status of the FLIP to accepted.
>>
>> Thanks,
>> Fabian
>>
>> 2016-09-19 9:16 GMT+02:00 Timo Walther <twalthr@apache.org<mailto:twa
>> lthr@apache.org>>:
>>
>> > Hi Jark,
>> >
>> > yes I think enough time has passed. We can start implementing the
>> changes.
>> > What do you think Fabian?
>> >
>> > If there are no objections, I will create the subtasks in Jira today.
>> >For
>> > FLIP-11/1 I already have implemented a prototype, I just have to do 
>> >some  refactoring/documentation before opening a PR.
>> >
>> > Timo
>> >
>> >
>> > Am 18/09/16 um 04:46 schrieb Jark Wu:
>> >
>> > Hi all,
>> >>
>> >> It seems that there’s no objections to the window design. So could 
>> >> we open subtasks to start working on it now ?
>> >>
>> >> - Jark Wu
>> >>
>> >> 在 2016年9月7日,下午4:29,Jark Wu <wuchong.wc@alibaba-inc.com<mailto:
>> wuchong.wc@alibaba-inc.com>> 写道:
>> >>>
>> >>> Hi Fabian,
>> >>>
>> >>> Thanks for sharing your ideas.
>> >>>
>> >>> They all make sense to me. Regarding to reassigning timestamp, I 
>> >>>do not  have an use case. I come up with this because DataStream 
>> >>>has a  TimestampAssigner :)
>> >>>
>> >>> +1 for this FLIP.
>> >>>
>> >>> - Jark Wu
>> >>>
>> >>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhueske@gmail.com<mailto:fhue
>> ske@gmail.com> <mailto:
>> >>>> fhueske@gmail.com<mailto:fhueske@gmail.com>>> 写道:
>> >>>>
>> >>>> Hi,
>> >>>>
>> >>>> thanks for your comments and questions!
>> >>>> Actually, you are bringing up the points that Timo and I 
>> >>>>discussed the  most  when designing the FLIP ;-)
>> >>>>
>> >>>> - We also thought about the syntactic shortcut for running 
>> >>>>aggregates  like  you proposed (table.groupBy(‘a).select(…)).
Our 
>> >>>>motivation to not allow  this shortcut is to prevent users from 
>> >>>>accidentally performing a  "dangerous" operation. The problem 
>> >>>>with unbounded sliding row-windows is  that their state does 
>> >>>>never expire. If you have an evolving key space,  you  will 
>> >>>>likely run into problems at some point because the operator state
 
>> >>>>grows too large. IMO, a row-window session is a better approach,
 
>> >>>>because it  defines a timeout after which state can be discarded.
>> >>>>groupBy.select is
>> >>>> a
>> >>>> very common operation in batch but its semantics in streaming 
>> >>>>are very  different. In my opinion it makes sense to make users 
>> >>>>aware of these  differences through the API.
>> >>>>
>> >>>> - Reassigning timestamps and watermarks is a very delicate issue.
>> >>>>You
>> >>>> are
>> >>>> right, that Calcite exposes this field which is necessary due to

>> >>>>the  semantics of SQL. However, also in Calcite you cannot freely

>> >>>>choose the  timestamp attribute for streaming queries (it must be

>> >>>>a monotone or  quasi-monotone attribute) which is hard to reason

>> >>>>about (and
>> >>>>guarantee)
>> >>>> after a few operators have been applied. Streaming tables in 
>> >>>>Flink will  likely have a time attribute which is identical to 
>> >>>>the initial
>> rowtime.
>> >>>> However, Flink does modify timestamps internally, e.g., for 
>> >>>>records that  are emitted from time windows, in order to ensure 
>> >>>>that consecutive  windows  perform as expected. Modify or 
>> >>>>reassign timestamps in the middle of a  job  can result in 
>> >>>>unexpected results which are very hard to reason about. Do  you 
>> >>>>have a concrete use case in mind for reassigning timestamps?
>> >>>>
>> >>>> - The idea to represent rowtime and systime as object is good. 
>> >>>>Our  motivation to go for reserved Scala symbols was to have a 
>> >>>>uniform syntax  with windows over streaming and batch tables. On

>> >>>>batch tables you can  compute time windows basically over every 
>> >>>>time attribute (they are  treated  similar to grouping attributes

>> >>>>with a bit of extra logic to extract the  grouping key for 
>> >>>>sliding and session windows). If you write  window(Tumble  over 
>> >>>>10.minutes on 'rowtime) on a streaming table, 'rowtime would  
>> >>>>indicate  event-time. On a batch table with a 'rowtime attribute,

>> >>>>the same  operator  would be internally converted into a group 
>> >>>>by. By going for the object  approach we would lose this 
>> >>>>compatibility (or would need to introduce an  additional column 
>> >>>>attribute to specifiy the window attribute for batch  tables).
>> >>>>
>> >>>> As usual some of the design decisions are based on preferences.
>> >>>> Do they make sense to you? Let me know what you think.
>> >>>>
>> >>>> Best, Fabian
>> >>>>
>> >>>>
>> >>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong.wc@alibaba-inc.com<ma
>> ilto:wuchong.wc@alibaba-inc.com> <mailto:
>> >>>> wuchong.wc@alibaba-inc.com<mailto:wuchong.wc@alibaba-inc.com>>>:
>> >>>>
>> >>>> Hi all,
>> >>>>>
>> >>>>> I'm on vacation for about five days , sorry to have missed this

>> >>>>>great  FLIP.
>> >>>>>
>> >>>>> Yes, the non-windowed aggregates is a special case of row-window.
>> >>>>>And
>> >>>>> the
>> >>>>> proposal looks really good.  Can we have a simplified form for

>> >>>>>the  special  case? Such as : table.groupBy(‘a).rowWindow(Sl
>> >>>>> ideRows.unboundedPreceding).select(…)
>> >>>>> can be simplified to  table.groupBy(‘a).select(…). The latter

>> >>>>>will  actually  call the former.
>> >>>>>
>> >>>>> Another question is about the rowtime. As the FLIP said, 
>> >>>>>DataStream and  StreamTableSource is responsible to assign 
>> >>>>>timestamps and watermarks,  furthermore “rowtime” and 
>> >>>>>“systemtime” are not real column. IMO, it is  different with

>> >>>>>Calcite’s rowtime, which is a real column in the
>> table.
>> >>>>> In
>> >>>>> FLIP's way, we will lose some flexibility. Because the timestamp
>> >>>>> column may
>> >>>>> be created after some transformations or join operation, not
>> >>>>>created at
>> >>>>> beginning. So why do we have to define rowtime at beginning?
>> >>>>>(because
>> >>>>> of
>> >>>>> watermark?)     Can we have a way to define rowtime after source
>> >>>>>table
>> >>>>> like
>> >>>>> TimestampAssinger?
>> >>>>>
>> >>>>> Regarding to “allowLateness” method. I come up a trick that
we can
>> >>>>>make
>> >>>>> ‘rowtime and ‘system to be a Scala object, not a symbol
expression.
>> >>>>> The API
>> >>>>> will looks like this :
>> >>>>>
>> >>>>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w)
>> >>>>>
>> >>>>> The implementation will look like this:
>> >>>>>
>> >>>>> class TumblingWindow(size: Expression) extends Window {
>> >>>>>   def on(time: rowtime.type): TumblingEventTimeWindow =
>> >>>>>       new TumblingEventTimeWindow(alias, ‘rowtime, size) 
      //
>> >>>>>has
>> >>>>> allowLateness() method
>> >>>>>
>> >>>>>   def on(time: systemtime.type): TumblingProcessingTimeWindow=
>> >>>>>      new TumblingProcessingTimeWindow(alias, ‘systemtime,
size)
>> >>>>> // hasn’t allowLateness() method
>> >>>>> }
>> >>>>> object rowtime
>> >>>>> object systemtime
>> >>>>>
>> >>>>> What do you think about this?
>> >>>>>
>> >>>>> - Jark Wu
>> >>>>>
>> >>>>> 在 2016年9月6日,下午11:00,Timo Walther <twalthr@apache.org<mailto:twa
>> lthr@apache.org> <mailto:
>> >>>>>> twalthr@apache.org<mailto:twalthr@apache.org>>>
写道:
>> >>>>>>
>> >>>>>> Hi all,
>> >>>>>>
>> >>>>>> I thought about the API of the FLIP again. If we allow the
>> >>>>>> "systemtime"
>> >>>>>>
>> >>>>> attribute, we cannot implement a nice method chaining where
the
>> >>>>>user
>> >>>>> can
>> >>>>> define a "allowLateness" only on event time. So even if the
user
>> >>>>> expressed
>> >>>>> that "systemtime" is used we have to offer a "allowLateness"
>> >>>>>method
>> >>>>> because
>> >>>>> we have to assume that this attribute can also be the batch
event
>> >>>>>time
>> >>>>> column, which is not very nice.
>> >>>>>
>> >>>>>> class TumblingWindow(size: Expression) extends Window {
>> >>>>>> def on(timeField: Expression): TumblingEventTimeWindow =
>> >>>>>>    new TumblingEventTimeWindow(alias, timeField, size) //
has
>> >>>>>>
>> >>>>> allowLateness() method
>> >>>>>
>> >>>>>> }
>> >>>>>>
>> >>>>>> What do you think?
>> >>>>>>
>> >>>>>> Timo
>> >>>>>>
>> >>>>>>
>> >>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske:
>> >>>>>>
>> >>>>>>> Hi Jark,
>> >>>>>>>
>> >>>>>>> you had asked for non-windowed aggregates in the Table
API a few
>> >>>>>>> times.
>> >>>>>>> FLIP-11 proposes row-window aggregates which are a
>> >>>>>>>generalization of
>> >>>>>>> running aggregates (SlideRow unboundedPreceding).
>> >>>>>>>
>> >>>>>>> Can you have a look at the FLIP and give feedback whether
this
>> >>>>>>>is
>> >>>>>>> what
>> >>>>>>>
>> >>>>>> you
>> >>>>>
>> >>>>>> are looking for?
>> >>>>>>> Improvement suggestions are very welcome as well.
>> >>>>>>>
>> >>>>>>> Thank you,
>> >>>>>>> Fabian
>> >>>>>>>
>> >>>>>>> 2016-09-01 16<tel:2016-09-01%C2%A016>:12 GMT+02:00
Timo Walther <
>> twalthr@apache.org<mailto:twalthr@apache.org> <mailto:
>> >>>>>>> twalthr@apache.org<mailto:twalthr@apache.org>>>:
>> >>>>>>>
>> >>>>>>> Hi all!
>> >>>>>>>>
>> >>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations
in the
>> >>>>>>>>Table
>> >>>>>>>> API.
>> >>>>>>>> You can find the FLIP-11 here:
>> >>>>>>>>
>> >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h
>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25> <
>> >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h
>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>>
>> >>>>>>>> 3A+Table+API+Stream+Aggregations
>> >>>>>>>>
>> >>>>>>>> Motivation for the FLIP:
>> >>>>>>>>
>> >>>>>>>> The Table API is a declarative API to define queries
on static
>> >>>>>>>>and
>> >>>>>>>> streaming tables. So far, only projection, selection,
and union
>> >>>>>>>>are
>> >>>>>>>> supported operations on streaming tables.
>> >>>>>>>>
>> >>>>>>>> This FLIP proposes to add support for different
types of
>> >>>>>>>> aggregations
>> >>>>>>>>
>> >>>>>>> on
>> >>>>>
>> >>>>>> top of streaming tables. In particular, we seek to support:
>> >>>>>>>>
>> >>>>>>>> - Group-window aggregates, i.e., aggregates which
are computed
>> >>>>>>>>for a
>> >>>>>>>>
>> >>>>>>> group
>> >>>>>
>> >>>>>> of elements. A (time or row-count) window is required to
bound
>> >>>>>>the
>> >>>>>>>>
>> >>>>>>> infinite
>> >>>>>
>> >>>>>> input stream into a finite group.
>> >>>>>>>>
>> >>>>>>>> - Row-window aggregates, i.e., aggregates which
are computed
>> >>>>>>>>for
>> >>>>>>>> each
>> >>>>>>>>
>> >>>>>>> row,
>> >>>>>
>> >>>>>> based on a window (range) of preceding and succeeding rows.
>> >>>>>>>> Each type of aggregate shall be supported on keyed/grouped
or
>> >>>>>>>> non-keyed/grouped data streams for streaming tables
as well as
>> >>>>>>>>batch
>> >>>>>>>>
>> >>>>>>> tables.
>> >>>>>
>> >>>>>> We are looking forward to your feedback.
>> >>>>>>>>
>> >>>>>>>> Timo
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>> --
>> >>>>>> Freundliche Grüße / Kind Regards
>> >>>>>>
>> >>>>>> Timo Walther
>> >>>>>>
>> >>>>>> Follow me: @twalthr
>> >>>>>> https://www.linkedin.com/in/twalthr
>> >>>>>><https://www.linkedin.com/in/t
>> >>>>>> walthr>
>> >>>>>>
>> >>>>>
>> >>>>>
>> >>
>> >
>> > --
>> > Freundliche Grüße / Kind Regards
>> >
>> > Timo Walther
>> >
>> > Follow me: @twalthr
>> > https://www.linkedin.com/in/twalthr
>> >
>> >
>>
>>
>
Mime
View raw message