flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: 答复: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
Date Fri, 14 Oct 2016 23:02:06 GMT
Thanks for your reply Shaoxuan!

Please see my replies below.

Best, Fabian

2016-10-14 11:34 GMT+02:00 Sean Wang <wshaoxuan@gmail.com>:

> Thanks for your quick reply, Fabian.
>
> I have a few minor comments&suggestions:
>
> <GroupBy with window>
> - Agree that we should consider GroupBy without window after the new SQL
> proposal is settled down.
>
>
OK, so we keep this as it is for now. GroupBy without windows will be
supported later when we are able to "guard" the memory requirements of that
operation.


> <GroupBy with window>
> - For Java API, we can keep window() call, and put window alias into
> Groupby clause. This can be also applied to rowwindow case.
>
>
Referring to the window alias in the groupBy clause would require to invert
the methods, i.e., groupBy().window() -> window().groupBy(). I am not sure
if that is more intuitive. Also, Scala and Java are using the same class
(Table) but different methods (Java uses String parameter, Scala Expression
parameters). In my opinion it makes sense to have both APIs closely synced.
So I would either keep window() (after groupBy) for Scala and Java or
remove it for both.


> <RowWindows> & <partitionby() for rowwindow>
> -+1 to support replace groupby() by partitionby(). BTW, in the case of
> over, instead of partitionby, are we going to support orderby? If yes, I
> would suggest to define rowwindow as  rowwindow(PartionByParaType, OrderBy
> ParaType, WindowParaType).
>
>
The current FLIP-11 proposal supports defining both partitionBy and orderBy
(with a few restrictions).
PartitionBy is defined for all windows alike by calling

table.partitionBy(...).rowWindow(Window1 as w1, Window2 as
w2).select(count() over w1).

Allowing windows with different partitioning would mean that data is
shuffled to different nodes and that we need a distributed join to assemble
the result rows. In principle this could be done but would be very
expensive to execute (applies to batch and streaming). In my opinion, we
should not support this case.

OrderBy is implicitly supported by the on() clause of RowWindows, e.g.,

rowWindow(TumbleRows over 10.minutes on ‘rowtime as ‘w)

says that the window is ordered on the rowtime, i.e., event-time,
attribute. For streaming we can only allow event-time order (or
processing-time order which is always given). Orders on other attributes
would not be possible (for infinite dynamic input tables) or very expensive
(memory and computation wise) to maintain (for finite dynamic input
tables). For queries on batch tables, in principle all orders are possible.
With the current proposal only count windows are supported for arbitrary
attribute types and time windows for timestamp attributes.

So
> - moving windows into the groupBy() call :   +1
>

+1


> - making over() for rowWindow() with a single window definition.
>

+1


> - additionally allowing window definitions in over():  +1 yes for scala,
> but use alias for java API.
>

If we have the parser code for Java group windows in groupBy() it should be
easy to adapt this for over(). But we should also keep the rowWindow method
to define aliases.


> - using partitionBy() instead of groupBy() for row windows?: +1, but
> better to consider orderby, it may be even better to move partitionBy()
> into rowwindow.
>
>
+1 to change groupBy() to partitionBy().
I would not move partitionBy() into the RowWindow definition but keep it
outside to ensure only one partitioning is defined. The orderBy definition
is already fluently included in the RowWindow via the on() method.



> Regards,
> Shaoxuan
>
>
> On Thu, Oct 13, 2016 at 6:05 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Hi everybody,
>>
>> happy to see a good discussion here :-)
>> I'll reply to Shaoxuan's mail first and comment on Zhangrucong question
>> in a separate mail.
>>
>> Shaoxuan, thanks for the suggestions! I think we all agree that for SQL
>> we should definitely follow the standard (batch) SQL syntax.
>> In my opinion, the Table API does not necessarily have to be as close as
>> possible to SQL but should try to make a few things easier and also safer
>> (easier is of course subjective).
>>
>> - GroupBy without windows: These are currently intentionally not
>> supported and also not part of FLIP-11. Our motivation for not supporting
>> this, is to guard the user from defining a query that fails when being
>> executed due to a very memory consuming operation. FLIP-11 provides a way
>> to define such a query as a sliding row window with unbounded preceding
>> rows. With the upcoming SQL proposal, queries that consume unbounded memory
>> should be identified and rejected. I would be in favor of allowing groupBy
>> without windows once the guarding mechanism are in place.
>>
>> - GroupBy with window: I think this is a question of taste. Having a
>> window() call, makes the feature more explicit in my opinion. However, I'm
>> not opposed to move the windows into the groupBy clause.
>> Implementation-wise it should be easy to move the window definition into to
>> groupBy clause for the Scala Table API. For the Java Table API we would
>> need to extend the parser quite a bit because windows would need to be
>> defined as Strings and not via objects.
>>
>> - RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause
>> (implemented by PostgreSQL and Calcite) which allows to have "reusable"
>> window definitions. I think this is a desirable feature. In the FLIP-11
>> proposal the over() clause in select() refers to the predefined windows
>> with aliases. In case only one window is defined, the over() clause is
>> optional and the same (and only) window is applied to all aggregates. I
>> think we can make the over() call mandatory to have the windowing more
>> explicit. It should also be possible to extend the over clause to directly
>> accept RowWindows instead of window aliases. I would not make this a
>> priority at the moment, but a feature that could be later added, because
>> rowWindow() and over() cover all cases. Similar as for GroupBy with
>> windows, we would need to extend the parser for the Java Table API though.
>>
>> Finally, I have an own suggestion:
>> In FLIP-11, groupBy() is  used to define the partitioning of RowWindows.
>> I think this should be changed to partitionBy() because groupBy() groups
>> data and applies an aggregation to all rows of a group which is not
>> happening here. In original SQL, the OVER clause features a PARTITION BY
>> clause. We are moving this out of the window definition, i.e., OVER clause,
>> to enforce the same partitioning for all windows (different partitionings
>> would be a challenge to execute in a parallel system).
>>
>> @Timo and all: What do you think about:
>>
>> - moving windows into the groupBy() call
>> - making over() for rowWindow() with a single window definition
>> - additionally allowing window definitions in over()
>> - using partitionBy() instead of groupBy() for row windows?
>>
>> Best, Fabian
>>
>> 2016-10-13 11:10 GMT+02:00 Zhangrucong <zhangrucong@huawei.com>:
>>
>>> Hi shaoxuan:
>>>
>>> I think,  the streamsql must be excuted in table environment. So I call
>>> this table API ‘s StreamSQL. What do you call for this, stream Table API or
>>> streamsql? It is fu
>>>
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>> val tblEnv = TableEnvironment.getTableEnvironment(env)
>>> val ds: DataStream[(String,Long, Long)] = env.readTextFile("/home/demo")
>>> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num)
>>>     .map(f=>(f, 1L, 1L))
>>> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'")
>>>
>>> So in my opinion, the grammar which is marked red should be compatible
>>> with calcite's StreamSQL grammar.
>>>
>>> By the way,  thanks very much for telling me the modified content in
>>> Flink StreamSQL. I will look the new proposal .
>>>
>>> Thanks!
>>> 发件人: Sean Wang [mailto:wshaoxuan@gmail.com]
>>> 发送时间: 2016年10月13日 16:29
>>> 收件人: dev@flink.apache.org; Zhangrucong
>>> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
>>>
>>> Hi  zhangrucong,
>>> I am not sure what you mean by "table API'S StreamSQL", I guess you mean
>>> "stream TableAPI"?
>>> TableAPI should be compatible with calcite SQL. (By compatible, My
>>> understanding is that both TableAPI and SQL will be translated to the same
>>> logical plan - the same set of REL and REX).
>>> BTW, please note that we recently have merged a change to remove STREAM
>>> keyword for flink stream SQL(FLINK-4546). In our opinion, batch and stream
>>> are not necessarily to be differentiated at the SQL level. The major
>>> difference between batch and stream is "WHEN and HOW to emit the result".
>>> We have been working on a new proposal with Fabian on this change. I
>>> guess it will be sent out for review very soon.
>>>
>>> Regards,
>>> Shaoxuan
>>>
>>>
>>> On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong <zhangrucong@huawei.com
>>> <mailto:zhangrucong@huawei.com>> wrote:
>>> Hi shaoxuan:
>>> Does the table API'S StreamSQL grammar is compatible with calcite's
>>> StreamSQL grammar?
>>>
>>>
>>> 1、In calcite, the tumble window is realized by using function tumble or
>>> hop. And the function must be used with group by, like this:
>>>
>>> SELECT
>>>   TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,
>>>   productId,
>>>   COUNT(*) AS c,
>>>   SUM(units) AS units
>>> FROM Orders
>>> GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
>>>   productId;
>>>
>>> 2、 The sliding window uses keywords "window" and "over". Like this:
>>>
>>> SELECT  *
>>> FROM (
>>>   SELECT STREAM rowtime,
>>>     productId,
>>>     units,
>>>     AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS
>>> m10,
>>>     AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7
>>>   FROM Orders
>>>   WINDOW product AS (
>>>     ORDER BY rowtime
>>>     PARTITION BY productId))
>>>
>>>
>>>
>>> Thanks!
>>>
>>> -----邮件原件-----
>>> 发件人: 王绍翾(大沙) [mailto:shaoxuan.wsx@alibaba-inc.com<mailto:
>>> shaoxuan.wsx@alibaba-inc.com>]
>>> 发送时间: 2016年10月13日 2:03
>>> 收件人: dev@flink.apache.org<mailto:dev@flink.apache.org>
>>> 主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
>>>
>>> Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This is a
>>> really great and promising proposal. I have a few comments to the "window"
>>> operator proposed in this FLIP (I am hoping it is not too late to bring up
>>> this). First, window is not always needed for the stream aggregation. There
>>> are cases where we want do an aggreation on a stream, while the query/emit
>>> strategy decides when to emit a streaming output. Second, window is needed
>>> when we want do an aggregation for a certain rage, but window is not an
>>> operator. We basically use window to define the range for aggregation. In
>>> tableAPI, a window should be defined together with "groupby" and "select"
>>> operators, either inside a "groupby" operator or after an "over" clause in
>>> "select" operator. This will make the TableAPI in the similar manner as SQL.
>>> For instance,[A groupby without window]
>>> <Table API>
>>> val res = tab
>>> .groupBy(‘a)
>>> .select(‘a, ‘b.sum)
>>> <SQL>
>>> SELECT a, SUM(b)
>>> FROM tab
>>> GROUP BY a
>>> [A tumble window inside groupby]
>>> <Table API>val res = tab
>>> .groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum)
>>> <SQL>SELECT a, SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes , ‘rowtime)
[A
>>> row tumble window after OVER] <Table API>.groupby('a) //optional
>>> .select(‘a, ‘b.count over rowTumble(10.minutes, ‘rowtime))<SQL>SELECT
a,
>>> COUNT(b) OVER ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a Please let
>>> me know what you think.
>>> Regards,Shaoxuan
>>> ------------------------------------------------------------------发件人:Fabian
>>> Hueske <fhueske@gmail.com<mailto:fhueske@gmail.com>>发送时间:2016年9月26日(星期一)
>>> 21:13收件人:dev@flink.apache.org<mailto:dev@flink.apache.org> <
>>> dev@flink.apache.org<mailto:dev@flink.apache.org>>主 题:Re: [DISCUSS]
>>> FLIP-11: Table API Stream Aggregations Hi everybody,
>>>
>>> Timo proposed our FLIP-11 a bit more than three weeks ago.
>>> I will update the status of the FLIP to accepted.
>>>
>>> Thanks,
>>> Fabian
>>>
>>> 2016-09-19 9:16 GMT+02:00 Timo Walther <twalthr@apache.org<mailto:twa
>>> lthr@apache.org>>:
>>>
>>> > Hi Jark,
>>> >
>>> > yes I think enough time has passed. We can start implementing the
>>> changes.
>>> > What do you think Fabian?
>>> >
>>> > If there are no objections, I will create the subtasks in Jira today.
>>> >For
>>> > FLIP-11/1 I already have implemented a prototype, I just have to do
>>> >some
>>> > refactoring/documentation before opening a PR.
>>> >
>>> > Timo
>>> >
>>> >
>>> > Am 18/09/16 um 04:46 schrieb Jark Wu:
>>> >
>>> > Hi all,
>>> >>
>>> >> It seems that there’s no objections to the window design. So could
we
>>> >> open subtasks to start working on it now ?
>>> >>
>>> >> - Jark Wu
>>> >>
>>> >> 在 2016年9月7日,下午4:29,Jark Wu <wuchong.wc@alibaba-inc.com<mailto:
>>> wuchong.wc@alibaba-inc.com>> 写道:
>>> >>>
>>> >>> Hi Fabian,
>>> >>>
>>> >>> Thanks for sharing your ideas.
>>> >>>
>>> >>> They all make sense to me. Regarding to reassigning timestamp, I
do
>>> >>>not
>>> >>> have an use case. I come up with this because DataStream has a
>>> >>> TimestampAssigner :)
>>> >>>
>>> >>> +1 for this FLIP.
>>> >>>
>>> >>> - Jark Wu
>>> >>>
>>> >>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhueske@gmail.com<mailto:fhue
>>> ske@gmail.com> <mailto:
>>>
>>> >>>> fhueske@gmail.com<mailto:fhueske@gmail.com>>> 写道:
>>> >>>>
>>> >>>> Hi,
>>> >>>>
>>> >>>> thanks for your comments and questions!
>>> >>>> Actually, you are bringing up the points that Timo and I discussed
>>> >>>>the
>>> >>>> most
>>> >>>> when designing the FLIP ;-)
>>> >>>>
>>> >>>> - We also thought about the syntactic shortcut for running
>>> >>>>aggregates
>>> >>>> like
>>> >>>> you proposed (table.groupBy(‘a).select(…)). Our motivation
to not
>>> >>>>allow
>>> >>>> this shortcut is to prevent users from accidentally performing
a
>>> >>>> "dangerous" operation. The problem with unbounded sliding
>>> >>>>row-windows is
>>> >>>> that their state does never expire. If you have an evolving
key
>>> >>>>space,
>>> >>>> you
>>> >>>> will likely run into problems at some point because the operator
>>> >>>>state
>>> >>>> grows too large. IMO, a row-window session is a better approach,
>>> >>>> because it
>>> >>>> defines a timeout after which state can be discarded.
>>> >>>>groupBy.select is
>>> >>>> a
>>> >>>> very common operation in batch but its semantics in streaming
are
>>> >>>>very
>>> >>>> different. In my opinion it makes sense to make users aware
of
>>> >>>>these
>>> >>>> differences through the API.
>>> >>>>
>>> >>>> - Reassigning timestamps and watermarks is a very delicate issue.
>>> >>>>You
>>> >>>> are
>>> >>>> right, that Calcite exposes this field which is necessary due
to
>>> >>>>the
>>> >>>> semantics of SQL. However, also in Calcite you cannot freely
choose
>>> >>>>the
>>> >>>> timestamp attribute for streaming queries (it must be a monotone
or
>>> >>>> quasi-monotone attribute) which is hard to reason about (and
>>> >>>>guarantee)
>>> >>>> after a few operators have been applied. Streaming tables in
Flink
>>> >>>>will
>>> >>>> likely have a time attribute which is identical to the initial
>>> rowtime.
>>> >>>> However, Flink does modify timestamps internally, e.g., for
records
>>> >>>>that
>>> >>>> are emitted from time windows, in order to ensure that consecutive
>>> >>>> windows
>>> >>>> perform as expected. Modify or reassign timestamps in the middle
of
>>> >>>>a
>>> >>>> job
>>> >>>> can result in unexpected results which are very hard to reason
>>> >>>>about. Do
>>> >>>> you have a concrete use case in mind for reassigning timestamps?
>>> >>>>
>>> >>>> - The idea to represent rowtime and systime as object is good.
Our
>>> >>>> motivation to go for reserved Scala symbols was to have a uniform
>>> >>>>syntax
>>> >>>> with windows over streaming and batch tables. On batch tables
you
>>> >>>>can
>>> >>>> compute time windows basically over every time attribute (they
are
>>> >>>> treated
>>> >>>> similar to grouping attributes with a bit of extra logic to
extract
>>> >>>>the
>>> >>>> grouping key for sliding and session windows). If you write
>>> >>>> window(Tumble
>>> >>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime
would
>>> >>>> indicate
>>> >>>> event-time. On a batch table with a 'rowtime attribute, the
same
>>> >>>> operator
>>> >>>> would be internally converted into a group by. By going for
the
>>> >>>>object
>>> >>>> approach we would lose this compatibility (or would need to
>>> >>>>introduce an
>>> >>>> additional column attribute to specifiy the window attribute
for
>>> >>>>batch
>>> >>>> tables).
>>> >>>>
>>> >>>> As usual some of the design decisions are based on preferences.
>>> >>>> Do they make sense to you? Let me know what you think.
>>> >>>>
>>> >>>> Best, Fabian
>>> >>>>
>>> >>>>
>>> >>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong.wc@alibaba-inc.com<ma
>>> ilto:wuchong.wc@alibaba-inc.com> <mailto:
>>>
>>> >>>> wuchong.wc@alibaba-inc.com<mailto:wuchong.wc@alibaba-inc.com>>>:
>>> >>>>
>>> >>>> Hi all,
>>> >>>>>
>>> >>>>> I'm on vacation for about five days , sorry to have missed
this
>>> >>>>>great
>>> >>>>> FLIP.
>>> >>>>>
>>> >>>>> Yes, the non-windowed aggregates is a special case of row-window.
>>> >>>>>And
>>> >>>>> the
>>> >>>>> proposal looks really good.  Can we have a simplified form
for the
>>> >>>>> special
>>> >>>>> case? Such as : table.groupBy(‘a).rowWindow(Sl
>>> >>>>> ideRows.unboundedPreceding).select(…)
>>> >>>>> can be simplified to  table.groupBy(‘a).select(…). The
latter will
>>> >>>>> actually
>>> >>>>> call the former.
>>> >>>>>
>>> >>>>> Another question is about the rowtime. As the FLIP said,
>>> >>>>>DataStream and
>>> >>>>> StreamTableSource is responsible to assign timestamps and
>>> >>>>>watermarks,
>>> >>>>> furthermore “rowtime” and “systemtime” are not real
column. IMO,
>>> >>>>>it is
>>> >>>>> different with Calcite’s rowtime, which is a real column
in the
>>> table.
>>> >>>>> In
>>> >>>>> FLIP's way, we will lose some flexibility. Because the timestamp
>>> >>>>> column may
>>> >>>>> be created after some transformations or join operation,
not
>>> >>>>>created at
>>> >>>>> beginning. So why do we have to define rowtime at beginning?
>>> >>>>>(because
>>> >>>>> of
>>> >>>>> watermark?)     Can we have a way to define rowtime after
source
>>> >>>>>table
>>> >>>>> like
>>> >>>>> TimestampAssinger?
>>> >>>>>
>>> >>>>> Regarding to “allowLateness” method. I come up a trick
that we can
>>> >>>>>make
>>> >>>>> ‘rowtime and ‘system to be a Scala object, not a symbol
expression.
>>> >>>>> The API
>>> >>>>> will looks like this :
>>> >>>>>
>>> >>>>> window(Tumble over 10.minutes on rowtime allowLateness as
‘w)
>>> >>>>>
>>> >>>>> The implementation will look like this:
>>> >>>>>
>>> >>>>> class TumblingWindow(size: Expression) extends Window {
>>> >>>>>   def on(time: rowtime.type): TumblingEventTimeWindow =
>>> >>>>>       new TumblingEventTimeWindow(alias, ‘rowtime, size)
       //
>>> >>>>>has
>>> >>>>> allowLateness() method
>>> >>>>>
>>> >>>>>   def on(time: systemtime.type): TumblingProcessingTimeWindow=
>>> >>>>>      new TumblingProcessingTimeWindow(alias, ‘systemtime,
size)
>>> >>>>> // hasn’t allowLateness() method
>>> >>>>> }
>>> >>>>> object rowtime
>>> >>>>> object systemtime
>>> >>>>>
>>> >>>>> What do you think about this?
>>> >>>>>
>>> >>>>> - Jark Wu
>>> >>>>>
>>> >>>>> 在 2016年9月6日,下午11:00,Timo Walther <twalthr@apache.org<mailto:twa
>>> lthr@apache.org> <mailto:
>>>
>>> >>>>>> twalthr@apache.org<mailto:twalthr@apache.org>>>
写道:
>>> >>>>>>
>>> >>>>>> Hi all,
>>> >>>>>>
>>> >>>>>> I thought about the API of the FLIP again. If we allow
the
>>> >>>>>> "systemtime"
>>> >>>>>>
>>> >>>>> attribute, we cannot implement a nice method chaining where
the
>>> >>>>>user
>>> >>>>> can
>>> >>>>> define a "allowLateness" only on event time. So even if
the user
>>> >>>>> expressed
>>> >>>>> that "systemtime" is used we have to offer a "allowLateness"
>>> >>>>>method
>>> >>>>> because
>>> >>>>> we have to assume that this attribute can also be the batch
event
>>> >>>>>time
>>> >>>>> column, which is not very nice.
>>> >>>>>
>>> >>>>>> class TumblingWindow(size: Expression) extends Window
{
>>> >>>>>> def on(timeField: Expression): TumblingEventTimeWindow
=
>>> >>>>>>    new TumblingEventTimeWindow(alias, timeField, size)
// has
>>> >>>>>>
>>> >>>>> allowLateness() method
>>> >>>>>
>>> >>>>>> }
>>> >>>>>>
>>> >>>>>> What do you think?
>>> >>>>>>
>>> >>>>>> Timo
>>> >>>>>>
>>> >>>>>>
>>> >>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske:
>>> >>>>>>
>>> >>>>>>> Hi Jark,
>>> >>>>>>>
>>> >>>>>>> you had asked for non-windowed aggregates in the
Table API a few
>>> >>>>>>> times.
>>> >>>>>>> FLIP-11 proposes row-window aggregates which are
a
>>> >>>>>>>generalization of
>>> >>>>>>> running aggregates (SlideRow unboundedPreceding).
>>> >>>>>>>
>>> >>>>>>> Can you have a look at the FLIP and give feedback
whether this
>>> >>>>>>>is
>>> >>>>>>> what
>>> >>>>>>>
>>> >>>>>> you
>>> >>>>>
>>> >>>>>> are looking for?
>>> >>>>>>> Improvement suggestions are very welcome as well.
>>> >>>>>>>
>>> >>>>>>> Thank you,
>>> >>>>>>> Fabian
>>> >>>>>>>
>>> >>>>>>> 2016-09-01 16<tel:2016-09-01%C2%A016>:12 GMT+02:00
Timo Walther
>>> <twalthr@apache.org<mailto:twalthr@apache.org> <mailto:
>>> >>>>>>> twalthr@apache.org<mailto:twalthr@apache.org>>>:
>>> >>>>>>>
>>> >>>>>>> Hi all!
>>> >>>>>>>>
>>> >>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations
in the
>>> >>>>>>>>Table
>>> >>>>>>>> API.
>>> >>>>>>>> You can find the FLIP-11 here:
>>> >>>>>>>>
>>> >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h
>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25> <
>>> >>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h
>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>>
>>>
>>> >>>>>>>> 3A+Table+API+Stream+Aggregations
>>> >>>>>>>>
>>> >>>>>>>> Motivation for the FLIP:
>>> >>>>>>>>
>>> >>>>>>>> The Table API is a declarative API to define
queries on static
>>> >>>>>>>>and
>>> >>>>>>>> streaming tables. So far, only projection, selection,
and union
>>> >>>>>>>>are
>>> >>>>>>>> supported operations on streaming tables.
>>> >>>>>>>>
>>> >>>>>>>> This FLIP proposes to add support for different
types of
>>> >>>>>>>> aggregations
>>> >>>>>>>>
>>> >>>>>>> on
>>> >>>>>
>>> >>>>>> top of streaming tables. In particular, we seek to support:
>>> >>>>>>>>
>>> >>>>>>>> - Group-window aggregates, i.e., aggregates
which are computed
>>> >>>>>>>>for a
>>> >>>>>>>>
>>> >>>>>>> group
>>> >>>>>
>>> >>>>>> of elements. A (time or row-count) window is required
to bound
>>> >>>>>>the
>>> >>>>>>>>
>>> >>>>>>> infinite
>>> >>>>>
>>> >>>>>> input stream into a finite group.
>>> >>>>>>>>
>>> >>>>>>>> - Row-window aggregates, i.e., aggregates which
are computed
>>> >>>>>>>>for
>>> >>>>>>>> each
>>> >>>>>>>>
>>> >>>>>>> row,
>>> >>>>>
>>> >>>>>> based on a window (range) of preceding and succeeding
rows.
>>> >>>>>>>> Each type of aggregate shall be supported on
keyed/grouped or
>>> >>>>>>>> non-keyed/grouped data streams for streaming
tables as well as
>>> >>>>>>>>batch
>>> >>>>>>>>
>>> >>>>>>> tables.
>>> >>>>>
>>> >>>>>> We are looking forward to your feedback.
>>> >>>>>>>>
>>> >>>>>>>> Timo
>>> >>>>>>>>
>>> >>>>>>>>
>>> >>>>>> --
>>> >>>>>> Freundliche Grüße / Kind Regards
>>> >>>>>>
>>> >>>>>> Timo Walther
>>> >>>>>>
>>> >>>>>> Follow me: @twalthr
>>> >>>>>> https://www.linkedin.com/in/twalthr
>>> >>>>>><https://www.linkedin.com/in/t
>>> >>>>>> walthr>
>>> >>>>>>
>>> >>>>>
>>> >>>>>
>>> >>
>>> >
>>> > --
>>> > Freundliche Grüße / Kind Regards
>>> >
>>> > Timo Walther
>>> >
>>> > Follow me: @twalthr
>>> > https://www.linkedin.com/in/twalthr
>>> >
>>> >
>>>
>>>
>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message