flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: [DISCUSS] FLIP-11: Table API Stream Aggregations
Date Wed, 02 Nov 2016 08:47:01 GMT
Thanks everybody for the input.

I updated the FLIP-11 document [1] and did the following changes:

- made over() mandatory also for single row windows
- removed allowedLateness() from windows which would mean to add some kind
of retraction output mode. This is out-of-scope for FLIP-11 and is
addressed in the Stream SQL design doc [2].
- removed the 'systemtime keyword to distinguish event-time and
processing-time on a syntactical level (proposed by Timo in PR #2562 [3]).

I did *not* do the following changes that were discussed in this thread:

- change groupBy() to partitionBy() for row windows
- allow window specification in groupBy()
- allow row window specification in over()
- allow groupBy() without group window on streaming tables

Best,
Fabian

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations
[2]
https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQPW4tnl8THw6rzGUdaqU
[3] https://github.com/apache/flink/pull/2562

2016-11-02 4:49 GMT+01:00 Sean Wang <wshaoxuan@gmail.com>:

> Hi Stephan and Fabian,
> Thanks for the replies and very happy to see that my questions have raised
> a good discussion here. This is more than just FLIP-11, but the future of
> flink SQL (glad all of us agree to use the standard compliant SQL
> interface) and the roll of Table API.
> The syntax of TableAPI is just one's favour, but IMO, the tradeoff between
> SQL syntax and TableAPI flexibility should be well considered during the
> design. I hope to see a powerful, concise, and compatible TableAPI.
> I agree with your other comments on Flip11.
>
> Regards,
> Shaoxuan
>
>
> On Fri, Oct 28, 2016 at 11:09 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
> > Thanks for bringing up this point Stephan.
> >
> > I think it is a good decision to have a standard compliant SQL interface
> > that SQL developers and tools can use. The SQL proposal [1] I posted a
> few
> > days ago follows this approach.
> >
> > However, SQL was not designed with stream processing in mind and many
> > important streaming concepts are either cumbersome to express or not at
> all
> > expressible with plain SQL (see Tyler Akidau's [2] proposal for
> instance).
> > I think the Table API has the chance to improve a lot of these issues by
> > not following the SQL standard too closely.
> >
> > +1 for defining window aggregates with the explicit the window() and
> > rowWindow() clauses.
> >
> > +1 for sticking to groupBy() instead of partitionBy().
> >
> > There are a few more ideas that can make the Table API better suited for
> > streaming such as:
> >
> > - have built-in emission and refinement strategies (see StreamSQL doc
> [1])
> > - easier joins against time-variant tables (Temporal table proposal of
> > Julian Hyde [3])
> > - support for DataStream-like UDFs
> >
> > These issues should be discussed separately though.
> >
> > Best,
> > Fabian
> >
> > [1]
> > https://docs.google.com/document/d/1qVVt_16kdaZQ8RTfA_f4konQ
> > PW4tnl8THw6rzGUdaqU
> > [2]
> > https://docs.google.com/document/d/1tSey4CeTrbb4VjWvtSA78OcU
> > 6BERXXDZ3t0HzSLij9Q
> > [3]
> > https://docs.google.com/document/d/1RvnLEEQK92axdAaZ9XIU5szp
> > kbGqFMBtzYiIY4dHe0Q
> >
> > 2016-10-28 10:05 GMT+02:00 Jark Wu <wuchong.wc@alibaba-inc.com>:
> >
> > > Hi ,
> > >
> > > I agree that Table API should be a SQL-like API, that do not need to be
> > > strictly consistent with SQL.
> > >
> > > > - Not put the window definition into the groupBy clause.
> > >
> > > +1  Putting the window clause out of groupBy, will be easy to learn and
> > > easy to discover for users.
> > >
> > > > I like the idea of having separate ".window()" and ".rowWindow()"
> > > > clauses.
> > >
> > > +1
> > >
> > > >  I would prefer to not have a "partitionBy" statement.
> > >
> > > As far as I know, the partitionBy is playing the same role with groupBy
> > in
> > > Table API. So maybe groupBy is enough, there is no need to introduce a
> > new
> > > clause.
> > >
> > >
> > > - Jark Wu
> > >
> > > > 在 2016年10月27日,上午2:22,Stephan Ewen <sewen@apache.org> 写道:
> > > >
> > > > Hi all!
> > > >
> > > > I think that in order to get a better hold on how we what to build
> the
> > > > Table API, we need to *decide what the role of the Table API should
> > be*.
> > > We
> > > > touched on that a few times, but I think we still have different
> ideas
> > > > about that.
> > > >
> > > > To get there, let me take back a step and look at the design of
> Stream
> > > SQL
> > > > again. There were basically two competing approaches:
> > > >
> > > > (1) Keep SQL as it is and make it run on infinite streams via
> > introducing
> > > > dynamic tables
> > > > (2) Do a new language that is similar to SQL, but designed with
> > streaming
> > > > concepts in mind (first class support for time and windows)
> > > >
> > > > Both approaches had good points. The Stream SQL design doc posted
> > > followed
> > > > approach (1) - keep SQL as it is.
> > > >
> > > >
> > > >
> > > > Now, for the Table API, we seem to be having a similar discussion
> > again.
> > > >
> > > > (1) Let the Table API be as similar to SQL as possible, simply make
> it
> > > feel
> > > > "fluently embedded" in Scala.
> > > > (2) Define the Table API as one would define a new and clean DSL for
> > > > streaming. SQL inspired, of course. Where SQL syntax feels natural,
> use
> > > the
> > > > SQL syntax, but make it very accessible to Java/Scala (non-SQL)
> > > programmers.
> > > >
> > > >
> > > > I am personally more in favor of variant (2) for the following
> reasons:
> > > >
> > > >  - We already have SQL compliant with the standards and tools.
> > > >
> > > >  - Mirroring SQL too closely into the Table API has the marginal
> > benefit
> > > > that someone close to SQL will find it a bit more familiar. Not sure
> if
> > > > that is even the case, as they have to re-learn the fluent DSL and
> > Scala
> > > > concepts
> > > >
> > > >  - We are making it more difficult for all those that come from a
> more
> > > > Scala/Java DataStream background and simply want to move "a layer
> up",
> > > > getting schema and more optimizations into the equation.
> > > >
> > > >
> > > >
> > > >
> > > > What would that mean for the specific issues that are discussed in
> > > FLIP-11?
> > > > Based on interpreting the Table API as re-imaged streaming DSL, I
> would
> > > > suggest to
> > > >
> > > >  - Not put the window definition into the groupBy clause. It just is
> > > > unexpected for all that are not super familiar with SQL and hard to
> > > > discover in the IDE. A separate window clause is simpler for users
> > coming
> > > > from the DataStream background (or other streaming APIs) and it is
> more
> > > > discoverable in the IDE.
> > > >
> > > >  - I like the idea of having separate ".window()" and ".rowWindow()"
> > > > clauses. Makes it more explicit that very different things will
> happen.
> > > >
> > > >  - I would prefer to not have a "partitionBy" statement. When we
> > restrain
> > > > the Table API at least initially to having one partitioning for the
> > > > windows, we should be able to express the partitioning by simply
> adding
> > > it
> > > > to the fields in the "groupBy" clause. That would make the API easier
> > > > accessible to users that not SQL powerusers.
> > > >
> > > >
> > > > What do others think?
> > > >
> > > > Greetings,
> > > > Stephan
> > > >
> > > >
> > > > On Sat, Oct 15, 2016 at 1:02 AM, Fabian Hueske <fhueske@gmail.com>
> > > wrote:
> > > >
> > > >> Thanks for your reply Shaoxuan!
> > > >>
> > > >> Please see my replies below.
> > > >>
> > > >> Best, Fabian
> > > >>
> > > >> 2016-10-14 11:34 GMT+02:00 Sean Wang <wshaoxuan@gmail.com>:
> > > >>
> > > >>> Thanks for your quick reply, Fabian.
> > > >>>
> > > >>> I have a few minor comments&suggestions:
> > > >>>
> > > >>> <GroupBy with window>
> > > >>> - Agree that we should consider GroupBy without window after the
> new
> > > SQL
> > > >>> proposal is settled down.
> > > >>>
> > > >>>
> > > >> OK, so we keep this as it is for now. GroupBy without windows will
> be
> > > >> supported later when we are able to "guard" the memory requirements
> of
> > > that
> > > >> operation.
> > > >>
> > > >>
> > > >>> <GroupBy with window>
> > > >>> - For Java API, we can keep window() call, and put window alias
> into
> > > >>> Groupby clause. This can be also applied to rowwindow case.
> > > >>>
> > > >>>
> > > >> Referring to the window alias in the groupBy clause would require to
> > > invert
> > > >> the methods, i.e., groupBy().window() -> window().groupBy(). I am
> not
> > > sure
> > > >> if that is more intuitive. Also, Scala and Java are using the same
> > class
> > > >> (Table) but different methods (Java uses String parameter, Scala
> > > Expression
> > > >> parameters). In my opinion it makes sense to have both APIs closely
> > > synced.
> > > >> So I would either keep window() (after groupBy) for Scala and Java
> or
> > > >> remove it for both.
> > > >>
> > > >>
> > > >>> <RowWindows> & <partitionby() for rowwindow>
> > > >>> -+1 to support replace groupby() by partitionby(). BTW, in the case
> > of
> > > >>> over, instead of partitionby, are we going to support orderby? If
> > yes,
> > > I
> > > >>> would suggest to define rowwindow as  rowwindow(PartionByParaType,
> > > >> OrderBy
> > > >>> ParaType, WindowParaType).
> > > >>>
> > > >>>
> > > >> The current FLIP-11 proposal supports defining both partitionBy and
> > > orderBy
> > > >> (with a few restrictions).
> > > >> PartitionBy is defined for all windows alike by calling
> > > >>
> > > >> table.partitionBy(...).rowWindow(Window1 as w1, Window2 as
> > > >> w2).select(count() over w1).
> > > >>
> > > >> Allowing windows with different partitioning would mean that data is
> > > >> shuffled to different nodes and that we need a distributed join to
> > > assemble
> > > >> the result rows. In principle this could be done but would be very
> > > >> expensive to execute (applies to batch and streaming). In my
> opinion,
> > we
> > > >> should not support this case.
> > > >>
> > > >> OrderBy is implicitly supported by the on() clause of RowWindows,
> > e.g.,
> > > >>
> > > >> rowWindow(TumbleRows over 10.minutes on ‘rowtime as ‘w)
> > > >>
> > > >> says that the window is ordered on the rowtime, i.e., event-time,
> > > >> attribute. For streaming we can only allow event-time order (or
> > > >> processing-time order which is always given). Orders on other
> > attributes
> > > >> would not be possible (for infinite dynamic input tables) or very
> > > expensive
> > > >> (memory and computation wise) to maintain (for finite dynamic input
> > > >> tables). For queries on batch tables, in principle all orders are
> > > possible.
> > > >> With the current proposal only count windows are supported for
> > arbitrary
> > > >> attribute types and time windows for timestamp attributes.
> > > >>
> > > >> So
> > > >>> - moving windows into the groupBy() call :   +1
> > > >>>
> > > >>
> > > >> +1
> > > >>
> > > >>
> > > >>> - making over() for rowWindow() with a single window definition.
> > > >>>
> > > >>
> > > >> +1
> > > >>
> > > >>
> > > >>> - additionally allowing window definitions in over():  +1 yes for
> > > scala,
> > > >>> but use alias for java API.
> > > >>>
> > > >>
> > > >> If we have the parser code for Java group windows in groupBy() it
> > > should be
> > > >> easy to adapt this for over(). But we should also keep the rowWindow
> > > method
> > > >> to define aliases.
> > > >>
> > > >>
> > > >>> - using partitionBy() instead of groupBy() for row windows?: +1,
> but
> > > >>> better to consider orderby, it may be even better to move
> > partitionBy()
> > > >>> into rowwindow.
> > > >>>
> > > >>>
> > > >> +1 to change groupBy() to partitionBy().
> > > >> I would not move partitionBy() into the RowWindow definition but
> keep
> > it
> > > >> outside to ensure only one partitioning is defined. The orderBy
> > > definition
> > > >> is already fluently included in the RowWindow via the on() method.
> > > >>
> > > >>
> > > >>
> > > >>> Regards,
> > > >>> Shaoxuan
> > > >>>
> > > >>>
> > > >>> On Thu, Oct 13, 2016 at 6:05 PM, Fabian Hueske <fhueske@gmail.com>
> > > >> wrote:
> > > >>>
> > > >>>> Hi everybody,
> > > >>>>
> > > >>>> happy to see a good discussion here :-)
> > > >>>> I'll reply to Shaoxuan's mail first and comment on Zhangrucong
> > > question
> > > >>>> in a separate mail.
> > > >>>>
> > > >>>> Shaoxuan, thanks for the suggestions! I think we all agree that
> for
> > > SQL
> > > >>>> we should definitely follow the standard (batch) SQL syntax.
> > > >>>> In my opinion, the Table API does not necessarily have to be as
> > close
> > > as
> > > >>>> possible to SQL but should try to make a few things easier and
> also
> > > >> safer
> > > >>>> (easier is of course subjective).
> > > >>>>
> > > >>>> - GroupBy without windows: These are currently intentionally not
> > > >>>> supported and also not part of FLIP-11. Our motivation for not
> > > >> supporting
> > > >>>> this, is to guard the user from defining a query that fails when
> > being
> > > >>>> executed due to a very memory consuming operation. FLIP-11
> provides
> > a
> > > >> way
> > > >>>> to define such a query as a sliding row window with unbounded
> > > preceding
> > > >>>> rows. With the upcoming SQL proposal, queries that consume
> unbounded
> > > >> memory
> > > >>>> should be identified and rejected. I would be in favor of allowing
> > > >> groupBy
> > > >>>> without windows once the guarding mechanism are in place.
> > > >>>>
> > > >>>> - GroupBy with window: I think this is a question of taste.
> Having a
> > > >>>> window() call, makes the feature more explicit in my opinion.
> > However,
> > > >> I'm
> > > >>>> not opposed to move the windows into the groupBy clause.
> > > >>>> Implementation-wise it should be easy to move the window
> definition
> > > >> into to
> > > >>>> groupBy clause for the Scala Table API. For the Java Table API we
> > > would
> > > >>>> need to extend the parser quite a bit because windows would need
> to
> > be
> > > >>>> defined as Strings and not via objects.
> > > >>>>
> > > >>>> - RowWindows: The rowWindow() call mimics the standard SQL WINDOW
> > > clause
> > > >>>> (implemented by PostgreSQL and Calcite) which allows to have
> > > "reusable"
> > > >>>> window definitions. I think this is a desirable feature. In the
> > > FLIP-11
> > > >>>> proposal the over() clause in select() refers to the predefined
> > > windows
> > > >>>> with aliases. In case only one window is defined, the over()
> clause
> > is
> > > >>>> optional and the same (and only) window is applied to all
> > aggregates.
> > > I
> > > >>>> think we can make the over() call mandatory to have the windowing
> > more
> > > >>>> explicit. It should also be possible to extend the over clause to
> > > >> directly
> > > >>>> accept RowWindows instead of window aliases. I would not make
> this a
> > > >>>> priority at the moment, but a feature that could be later added,
> > > because
> > > >>>> rowWindow() and over() cover all cases. Similar as for GroupBy
> with
> > > >>>> windows, we would need to extend the parser for the Java Table API
> > > >> though.
> > > >>>>
> > > >>>> Finally, I have an own suggestion:
> > > >>>> In FLIP-11, groupBy() is  used to define the partitioning of
> > > RowWindows.
> > > >>>> I think this should be changed to partitionBy() because groupBy()
> > > groups
> > > >>>> data and applies an aggregation to all rows of a group which is
> not
> > > >>>> happening here. In original SQL, the OVER clause features a
> > PARTITION
> > > BY
> > > >>>> clause. We are moving this out of the window definition, i.e.,
> OVER
> > > >> clause,
> > > >>>> to enforce the same partitioning for all windows (different
> > > >> partitionings
> > > >>>> would be a challenge to execute in a parallel system).
> > > >>>>
> > > >>>> @Timo and all: What do you think about:
> > > >>>>
> > > >>>> - moving windows into the groupBy() call
> > > >>>> - making over() for rowWindow() with a single window definition
> > > >>>> - additionally allowing window definitions in over()
> > > >>>> - using partitionBy() instead of groupBy() for row windows?
> > > >>>>
> > > >>>> Best, Fabian
> > > >>>>
> > > >>>> 2016-10-13 11:10 GMT+02:00 Zhangrucong <zhangrucong@huawei.com>:
> > > >>>>
> > > >>>>> Hi shaoxuan:
> > > >>>>>
> > > >>>>> I think,  the streamsql must be excuted in table environment. So
> I
> > > call
> > > >>>>> this table API ‘s StreamSQL. What do you call for this, stream
> > Table
> > > >> API or
> > > >>>>> streamsql? It is fu
> > > >>>>>
> > > >>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
> > > >>>>> val tblEnv = TableEnvironment.getTableEnvironment(env)
> > > >>>>> val ds: DataStream[(String,Long, Long)] =
> > > >> env.readTextFile("/home/demo")
> > > >>>>> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num)
> > > >>>>>    .map(f=>(f, 1L, 1L))
> > > >>>>> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE
> userID='A'")
> > > >>>>>
> > > >>>>> So in my opinion, the grammar which is marked red should be
> > > compatible
> > > >>>>> with calcite's StreamSQL grammar.
> > > >>>>>
> > > >>>>> By the way,  thanks very much for telling me the modified content
> > in
> > > >>>>> Flink StreamSQL. I will look the new proposal .
> > > >>>>>
> > > >>>>> Thanks!
> > > >>>>> 发件人: Sean Wang [mailto:wshaoxuan@gmail.com]
> > > >>>>> 发送时间: 2016年10月13日 16:29
> > > >>>>> 收件人: dev@flink.apache.org; Zhangrucong
> > > >>>>> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
> > > >>>>>
> > > >>>>> Hi  zhangrucong,
> > > >>>>> I am not sure what you mean by "table API'S StreamSQL", I guess
> you
> > > >> mean
> > > >>>>> "stream TableAPI"?
> > > >>>>> TableAPI should be compatible with calcite SQL. (By compatible,
> My
> > > >>>>> understanding is that both TableAPI and SQL will be translated to
> > the
> > > >> same
> > > >>>>> logical plan - the same set of REL and REX).
> > > >>>>> BTW, please note that we recently have merged a change to remove
> > > STREAM
> > > >>>>> keyword for flink stream SQL(FLINK-4546). In our opinion, batch
> and
> > > >> stream
> > > >>>>> are not necessarily to be differentiated at the SQL level. The
> > major
> > > >>>>> difference between batch and stream is "WHEN and HOW to emit the
> > > >> result".
> > > >>>>> We have been working on a new proposal with Fabian on this
> change.
> > I
> > > >>>>> guess it will be sent out for review very soon.
> > > >>>>>
> > > >>>>> Regards,
> > > >>>>> Shaoxuan
> > > >>>>>
> > > >>>>>
> > > >>>>> On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong <
> > zhangrucong@huawei.com
> > > >>>>> <mailto:zhangrucong@huawei.com>> wrote:
> > > >>>>> Hi shaoxuan:
> > > >>>>> Does the table API'S StreamSQL grammar is compatible with
> calcite's
> > > >>>>> StreamSQL grammar?
> > > >>>>>
> > > >>>>>
> > > >>>>> 1、In calcite, the tumble window is realized by using function
> > tumble
> > > or
> > > >>>>> hop. And the function must be used with group by, like this:
> > > >>>>>
> > > >>>>> SELECT
> > > >>>>>  TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS
> rowtime,
> > > >>>>>  productId,
> > > >>>>>  COUNT(*) AS c,
> > > >>>>>  SUM(units) AS units
> > > >>>>> FROM Orders
> > > >>>>> GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
> > > >>>>>  productId;
> > > >>>>>
> > > >>>>> 2、 The sliding window uses keywords "window" and "over". Like
> this:
> > > >>>>>
> > > >>>>> SELECT  *
> > > >>>>> FROM (
> > > >>>>>  SELECT STREAM rowtime,
> > > >>>>>    productId,
> > > >>>>>    units,
> > > >>>>>    AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING)
> > AS
> > > >>>>> m10,
> > > >>>>>    AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS
> d7
> > > >>>>>  FROM Orders
> > > >>>>>  WINDOW product AS (
> > > >>>>>    ORDER BY rowtime
> > > >>>>>    PARTITION BY productId))
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> Thanks!
> > > >>>>>
> > > >>>>> -----邮件原件-----
> > > >>>>> 发件人: 王绍翾(大沙) [mailto:shaoxuan.wsx@alibaba-inc.com<mailto:
> > > >>>>> shaoxuan.wsx@alibaba-inc.com>]
> > > >>>>> 发送时间: 2016年10月13日 2:03
> > > >>>>> 收件人: dev@flink.apache.org<mailto:dev@flink.apache.org>
> > > >>>>> 主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
> > > >>>>>
> > > >>>>> Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This
> > is a
> > > >>>>> really great and promising proposal. I have a few comments to the
> > > >> "window"
> > > >>>>> operator proposed in this FLIP (I am hoping it is not too late to
> > > >> bring up
> > > >>>>> this). First, window is not always needed for the stream
> > aggregation.
> > > >> There
> > > >>>>> are cases where we want do an aggreation on a stream, while the
> > > >> query/emit
> > > >>>>> strategy decides when to emit a streaming output. Second, window
> is
> > > >> needed
> > > >>>>> when we want do an aggregation for a certain rage, but window is
> > not
> > > an
> > > >>>>> operator. We basically use window to define the range for
> > > aggregation.
> > > >> In
> > > >>>>> tableAPI, a window should be defined together with "groupby" and
> > > >> "select"
> > > >>>>> operators, either inside a "groupby" operator or after an "over"
> > > >> clause in
> > > >>>>> "select" operator. This will make the TableAPI in the similar
> > manner
> > > >> as SQL.
> > > >>>>> For instance,[A groupby without window]
> > > >>>>> <Table API>
> > > >>>>> val res = tab
> > > >>>>> .groupBy(‘a)
> > > >>>>> .select(‘a, ‘b.sum)
> > > >>>>> <SQL>
> > > >>>>> SELECT a, SUM(b)
> > > >>>>> FROM tab
> > > >>>>> GROUP BY a
> > > >>>>> [A tumble window inside groupby]
> > > >>>>> <Table API>val res = tab
> > > >>>>> .groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum)
> > > >>>>> <SQL>SELECT a, SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes ,
> > > >> ‘rowtime) [A
> > > >>>>> row tumble window after OVER] <Table API>.groupby('a) //optional
> > > >>>>> .select(‘a, ‘b.count over rowTumble(10.minutes,
> > ‘rowtime))<SQL>SELECT
> > > >> a,
> > > >>>>> COUNT(b) OVER ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a
> > > >> Please let
> > > >>>>> me know what you think.
> > > >>>>> Regards,Shaoxuan
> > > >>>>> ------------------------------------------------------------
> > > >> ------发件人:Fabian
> > > >>>>> Hueske <fhueske@gmail.com<mailto:fhueske@gmail.com
> > >>发送时间:2016年9月26日
> > > >> (星期一)
> > > >>>>> 21:13收件人:dev@flink.apache.org<mailto:dev@flink.apache.org> <
> > > >>>>> dev@flink.apache.org<mailto:dev@flink.apache.org>>主 题:Re:
> > [DISCUSS]
> > > >>>>> FLIP-11: Table API Stream Aggregations Hi everybody,
> > > >>>>>
> > > >>>>> Timo proposed our FLIP-11 a bit more than three weeks ago.
> > > >>>>> I will update the status of the FLIP to accepted.
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>> Fabian
> > > >>>>>
> > > >>>>> 2016-09-19 9:16 GMT+02:00 Timo Walther <twalthr@apache.org
> <mailto:
> > > twa
> > > >>>>> lthr@apache.org>>:
> > > >>>>>
> > > >>>>>> Hi Jark,
> > > >>>>>>
> > > >>>>>> yes I think enough time has passed. We can start implementing
> the
> > > >>>>> changes.
> > > >>>>>> What do you think Fabian?
> > > >>>>>>
> > > >>>>>> If there are no objections, I will create the subtasks in Jira
> > > today.
> > > >>>>>> For
> > > >>>>>> FLIP-11/1 I already have implemented a prototype, I just have to
> > do
> > > >>>>>> some
> > > >>>>>> refactoring/documentation before opening a PR.
> > > >>>>>>
> > > >>>>>> Timo
> > > >>>>>>
> > > >>>>>>
> > > >>>>>> Am 18/09/16 um 04:46 schrieb Jark Wu:
> > > >>>>>>
> > > >>>>>> Hi all,
> > > >>>>>>>
> > > >>>>>>> It seems that there’s no objections to the window design. So
> > could
> > > >> we
> > > >>>>>>> open subtasks to start working on it now ?
> > > >>>>>>>
> > > >>>>>>> - Jark Wu
> > > >>>>>>>
> > > >>>>>>> 在 2016年9月7日,下午4:29,Jark Wu <wuchong.wc@alibaba-inc.com<mailto:
> > > >>>>> wuchong.wc@alibaba-inc.com>> 写道:
> > > >>>>>>>>
> > > >>>>>>>> Hi Fabian,
> > > >>>>>>>>
> > > >>>>>>>> Thanks for sharing your ideas.
> > > >>>>>>>>
> > > >>>>>>>> They all make sense to me. Regarding to reassigning
> timestamp, I
> > > do
> > > >>>>>>>> not
> > > >>>>>>>> have an use case. I come up with this because DataStream has a
> > > >>>>>>>> TimestampAssigner :)
> > > >>>>>>>>
> > > >>>>>>>> +1 for this FLIP.
> > > >>>>>>>>
> > > >>>>>>>> - Jark Wu
> > > >>>>>>>>
> > > >>>>>>>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhueske@gmail.com<mailto:
> fhue
> > > >>>>> ske@gmail.com> <mailto:
> > > >>>>>
> > > >>>>>>>>> fhueske@gmail.com<mailto:fhueske@gmail.com>>> 写道:
> > > >>>>>>>>>
> > > >>>>>>>>> Hi,
> > > >>>>>>>>>
> > > >>>>>>>>> thanks for your comments and questions!
> > > >>>>>>>>> Actually, you are bringing up the points that Timo and I
> > > discussed
> > > >>>>>>>>> the
> > > >>>>>>>>> most
> > > >>>>>>>>> when designing the FLIP ;-)
> > > >>>>>>>>>
> > > >>>>>>>>> - We also thought about the syntactic shortcut for running
> > > >>>>>>>>> aggregates
> > > >>>>>>>>> like
> > > >>>>>>>>> you proposed (table.groupBy(‘a).select(…)). Our motivation to
> > not
> > > >>>>>>>>> allow
> > > >>>>>>>>> this shortcut is to prevent users from accidentally
> performing
> > a
> > > >>>>>>>>> "dangerous" operation. The problem with unbounded sliding
> > > >>>>>>>>> row-windows is
> > > >>>>>>>>> that their state does never expire. If you have an evolving
> key
> > > >>>>>>>>> space,
> > > >>>>>>>>> you
> > > >>>>>>>>> will likely run into problems at some point because the
> > operator
> > > >>>>>>>>> state
> > > >>>>>>>>> grows too large. IMO, a row-window session is a better
> > approach,
> > > >>>>>>>>> because it
> > > >>>>>>>>> defines a timeout after which state can be discarded.
> > > >>>>>>>>> groupBy.select is
> > > >>>>>>>>> a
> > > >>>>>>>>> very common operation in batch but its semantics in streaming
> > are
> > > >>>>>>>>> very
> > > >>>>>>>>> different. In my opinion it makes sense to make users aware
> of
> > > >>>>>>>>> these
> > > >>>>>>>>> differences through the API.
> > > >>>>>>>>>
> > > >>>>>>>>> - Reassigning timestamps and watermarks is a very delicate
> > issue.
> > > >>>>>>>>> You
> > > >>>>>>>>> are
> > > >>>>>>>>> right, that Calcite exposes this field which is necessary due
> > to
> > > >>>>>>>>> the
> > > >>>>>>>>> semantics of SQL. However, also in Calcite you cannot freely
> > > >> choose
> > > >>>>>>>>> the
> > > >>>>>>>>> timestamp attribute for streaming queries (it must be a
> > monotone
> > > >> or
> > > >>>>>>>>> quasi-monotone attribute) which is hard to reason about (and
> > > >>>>>>>>> guarantee)
> > > >>>>>>>>> after a few operators have been applied. Streaming tables in
> > > Flink
> > > >>>>>>>>> will
> > > >>>>>>>>> likely have a time attribute which is identical to the
> initial
> > > >>>>> rowtime.
> > > >>>>>>>>> However, Flink does modify timestamps internally, e.g., for
> > > >> records
> > > >>>>>>>>> that
> > > >>>>>>>>> are emitted from time windows, in order to ensure that
> > > consecutive
> > > >>>>>>>>> windows
> > > >>>>>>>>> perform as expected. Modify or reassign timestamps in the
> > middle
> > > >> of
> > > >>>>>>>>> a
> > > >>>>>>>>> job
> > > >>>>>>>>> can result in unexpected results which are very hard to
> reason
> > > >>>>>>>>> about. Do
> > > >>>>>>>>> you have a concrete use case in mind for reassigning
> > timestamps?
> > > >>>>>>>>>
> > > >>>>>>>>> - The idea to represent rowtime and systime as object is
> good.
> > > Our
> > > >>>>>>>>> motivation to go for reserved Scala symbols was to have a
> > uniform
> > > >>>>>>>>> syntax
> > > >>>>>>>>> with windows over streaming and batch tables. On batch tables
> > you
> > > >>>>>>>>> can
> > > >>>>>>>>> compute time windows basically over every time attribute
> (they
> > > are
> > > >>>>>>>>> treated
> > > >>>>>>>>> similar to grouping attributes with a bit of extra logic to
> > > >> extract
> > > >>>>>>>>> the
> > > >>>>>>>>> grouping key for sliding and session windows). If you write
> > > >>>>>>>>> window(Tumble
> > > >>>>>>>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime
> > would
> > > >>>>>>>>> indicate
> > > >>>>>>>>> event-time. On a batch table with a 'rowtime attribute, the
> > same
> > > >>>>>>>>> operator
> > > >>>>>>>>> would be internally converted into a group by. By going for
> the
> > > >>>>>>>>> object
> > > >>>>>>>>> approach we would lose this compatibility (or would need to
> > > >>>>>>>>> introduce an
> > > >>>>>>>>> additional column attribute to specifiy the window attribute
> > for
> > > >>>>>>>>> batch
> > > >>>>>>>>> tables).
> > > >>>>>>>>>
> > > >>>>>>>>> As usual some of the design decisions are based on
> preferences.
> > > >>>>>>>>> Do they make sense to you? Let me know what you think.
> > > >>>>>>>>>
> > > >>>>>>>>> Best, Fabian
> > > >>>>>>>>>
> > > >>>>>>>>>
> > > >>>>>>>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <
> wuchong.wc@alibaba-inc.com
> > <ma
> > > >>>>> ilto:wuchong.wc@alibaba-inc.com> <mailto:
> > > >>>>>
> > > >>>>>>>>> wuchong.wc@alibaba-inc.com<mailto:wuchong.wc@alibaba-inc.com
> > >>>:
> > > >>>>>>>>>
> > > >>>>>>>>> Hi all,
> > > >>>>>>>>>>
> > > >>>>>>>>>> I'm on vacation for about five days , sorry to have missed
> > this
> > > >>>>>>>>>> great
> > > >>>>>>>>>> FLIP.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Yes, the non-windowed aggregates is a special case of
> > > row-window.
> > > >>>>>>>>>> And
> > > >>>>>>>>>> the
> > > >>>>>>>>>> proposal looks really good.  Can we have a simplified form
> for
> > > >> the
> > > >>>>>>>>>> special
> > > >>>>>>>>>> case? Such as : table.groupBy(‘a).rowWindow(Sl
> > > >>>>>>>>>> ideRows.unboundedPreceding).select(…)
> > > >>>>>>>>>> can be simplified to  table.groupBy(‘a).select(…). The
> latter
> > > >> will
> > > >>>>>>>>>> actually
> > > >>>>>>>>>> call the former.
> > > >>>>>>>>>>
> > > >>>>>>>>>> Another question is about the rowtime. As the FLIP said,
> > > >>>>>>>>>> DataStream and
> > > >>>>>>>>>> StreamTableSource is responsible to assign timestamps and
> > > >>>>>>>>>> watermarks,
> > > >>>>>>>>>> furthermore “rowtime” and “systemtime” are not real column.
> > IMO,
> > > >>>>>>>>>> it is
> > > >>>>>>>>>> different with Calcite’s rowtime, which is a real column in
> > the
> > > >>>>> table.
> > > >>>>>>>>>> In
> > > >>>>>>>>>> FLIP's way, we will lose some flexibility. Because the
> > timestamp
> > > >>>>>>>>>> column may
> > > >>>>>>>>>> be created after some transformations or join operation, not
> > > >>>>>>>>>> created at
> > > >>>>>>>>>> beginning. So why do we have to define rowtime at beginning?
> > > >>>>>>>>>> (because
> > > >>>>>>>>>> of
> > > >>>>>>>>>> watermark?)     Can we have a way to define rowtime after
> > source
> > > >>>>>>>>>> table
> > > >>>>>>>>>> like
> > > >>>>>>>>>> TimestampAssinger?
> > > >>>>>>>>>>
> > > >>>>>>>>>> Regarding to “allowLateness” method. I come up a trick that
> we
> > > >> can
> > > >>>>>>>>>> make
> > > >>>>>>>>>> ‘rowtime and ‘system to be a Scala object, not a symbol
> > > >> expression.
> > > >>>>>>>>>> The API
> > > >>>>>>>>>> will looks like this :
> > > >>>>>>>>>>
> > > >>>>>>>>>> window(Tumble over 10.minutes on rowtime allowLateness as
> ‘w)
> > > >>>>>>>>>>
> > > >>>>>>>>>> The implementation will look like this:
> > > >>>>>>>>>>
> > > >>>>>>>>>> class TumblingWindow(size: Expression) extends Window {
> > > >>>>>>>>>>  def on(time: rowtime.type): TumblingEventTimeWindow =
> > > >>>>>>>>>>      new TumblingEventTimeWindow(alias, ‘rowtime, size)
> > > >> //
> > > >>>>>>>>>> has
> > > >>>>>>>>>> allowLateness() method
> > > >>>>>>>>>>
> > > >>>>>>>>>>  def on(time: systemtime.type):
> TumblingProcessingTimeWindow=
> > > >>>>>>>>>>     new TumblingProcessingTimeWindow(alias, ‘systemtime,
> > size)
> > > >>>>>>>>>> // hasn’t allowLateness() method
> > > >>>>>>>>>> }
> > > >>>>>>>>>> object rowtime
> > > >>>>>>>>>> object systemtime
> > > >>>>>>>>>>
> > > >>>>>>>>>> What do you think about this?
> > > >>>>>>>>>>
> > > >>>>>>>>>> - Jark Wu
> > > >>>>>>>>>>
> > > >>>>>>>>>> 在 2016年9月6日,下午11:00,Timo Walther <twalthr@apache.org
> <mailto:
> > twa
> > > >>>>> lthr@apache.org> <mailto:
> > > >>>>>
> > > >>>>>>>>>>> twalthr@apache.org<mailto:twalthr@apache.org>>> 写道:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Hi all,
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> I thought about the API of the FLIP again. If we allow the
> > > >>>>>>>>>>> "systemtime"
> > > >>>>>>>>>>>
> > > >>>>>>>>>> attribute, we cannot implement a nice method chaining where
> > the
> > > >>>>>>>>>> user
> > > >>>>>>>>>> can
> > > >>>>>>>>>> define a "allowLateness" only on event time. So even if the
> > user
> > > >>>>>>>>>> expressed
> > > >>>>>>>>>> that "systemtime" is used we have to offer a "allowLateness"
> > > >>>>>>>>>> method
> > > >>>>>>>>>> because
> > > >>>>>>>>>> we have to assume that this attribute can also be the batch
> > > event
> > > >>>>>>>>>> time
> > > >>>>>>>>>> column, which is not very nice.
> > > >>>>>>>>>>
> > > >>>>>>>>>>> class TumblingWindow(size: Expression) extends Window {
> > > >>>>>>>>>>> def on(timeField: Expression): TumblingEventTimeWindow =
> > > >>>>>>>>>>>   new TumblingEventTimeWindow(alias, timeField, size) //
> has
> > > >>>>>>>>>>>
> > > >>>>>>>>>> allowLateness() method
> > > >>>>>>>>>>
> > > >>>>>>>>>>> }
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> What do you think?
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Timo
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske:
> > > >>>>>>>>>>>
> > > >>>>>>>>>>>> Hi Jark,
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> you had asked for non-windowed aggregates in the Table
> API a
> > > >> few
> > > >>>>>>>>>>>> times.
> > > >>>>>>>>>>>> FLIP-11 proposes row-window aggregates which are a
> > > >>>>>>>>>>>> generalization of
> > > >>>>>>>>>>>> running aggregates (SlideRow unboundedPreceding).
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Can you have a look at the FLIP and give feedback whether
> > this
> > > >>>>>>>>>>>> is
> > > >>>>>>>>>>>> what
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>> you
> > > >>>>>>>>>>
> > > >>>>>>>>>>> are looking for?
> > > >>>>>>>>>>>> Improvement suggestions are very welcome as well.
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Thank you,
> > > >>>>>>>>>>>> Fabian
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> 2016-09-01 16<tel:2016-09-01%C2%A016>:12 GMT+02:00 Timo
> > > Walther
> > > >>>>> <twalthr@apache.org<mailto:twalthr@apache.org> <mailto:
> > > >>>>>>>>>>>> twalthr@apache.org<mailto:twalthr@apache.org>>>:
> > > >>>>>>>>>>>>
> > > >>>>>>>>>>>> Hi all!
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in
> > the
> > > >>>>>>>>>>>>> Table
> > > >>>>>>>>>>>>> API.
> > > >>>>>>>>>>>>> You can find the FLIP-11 here:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 11%
> > <h
> > > >>>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25> <
> > > >>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> 11%
> > <h
> > > >>>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>>
> > > >>>>>
> > > >>>>>>>>>>>>> 3A+Table+API+Stream+Aggregations
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Motivation for the FLIP:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> The Table API is a declarative API to define queries on
> > > static
> > > >>>>>>>>>>>>> and
> > > >>>>>>>>>>>>> streaming tables. So far, only projection, selection, and
> > > >> union
> > > >>>>>>>>>>>>> are
> > > >>>>>>>>>>>>> supported operations on streaming tables.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> This FLIP proposes to add support for different types of
> > > >>>>>>>>>>>>> aggregations
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>> on
> > > >>>>>>>>>>
> > > >>>>>>>>>>> top of streaming tables. In particular, we seek to support:
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> - Group-window aggregates, i.e., aggregates which are
> > > computed
> > > >>>>>>>>>>>>> for a
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>> group
> > > >>>>>>>>>>
> > > >>>>>>>>>>> of elements. A (time or row-count) window is required to
> > bound
> > > >>>>>>>>>>> the
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>> infinite
> > > >>>>>>>>>>
> > > >>>>>>>>>>> input stream into a finite group.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> - Row-window aggregates, i.e., aggregates which are
> > computed
> > > >>>>>>>>>>>>> for
> > > >>>>>>>>>>>>> each
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>> row,
> > > >>>>>>>>>>
> > > >>>>>>>>>>> based on a window (range) of preceding and succeeding rows.
> > > >>>>>>>>>>>>> Each type of aggregate shall be supported on
> keyed/grouped
> > or
> > > >>>>>>>>>>>>> non-keyed/grouped data streams for streaming tables as
> well
> > > as
> > > >>>>>>>>>>>>> batch
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>> tables.
> > > >>>>>>>>>>
> > > >>>>>>>>>>> We are looking forward to your feedback.
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>> Timo
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>>>>
> > > >>>>>>>>>>> --
> > > >>>>>>>>>>> Freundliche Grüße / Kind Regards
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Timo Walther
> > > >>>>>>>>>>>
> > > >>>>>>>>>>> Follow me: @twalthr
> > > >>>>>>>>>>> https://www.linkedin.com/in/twalthr
> > > >>>>>>>>>>> <https://www.linkedin.com/in/t
> > > >>>>>>>>>>> walthr>
> > > >>>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>> --
> > > >>>>>> Freundliche Grüße / Kind Regards
> > > >>>>>>
> > > >>>>>> Timo Walther
> > > >>>>>>
> > > >>>>>> Follow me: @twalthr
> > > >>>>>> https://www.linkedin.com/in/twalthr
> > > >>>>>>
> > > >>>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>
> > > >>>
> > > >>
> > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message