flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jark Wu" <wuchong...@alibaba-inc.com>
Subject Re: [DISCUSS] FLIP-11: Table API Stream Aggregations
Date Fri, 28 Oct 2016 08:05:29 GMT
Hi ,

I agree that Table API should be a SQL-like API, that do not need to be strictly consistent
with SQL. 

> - Not put the window definition into the groupBy clause. 

+1  Putting the window clause out of groupBy, will be easy to learn and easy to discover for
users.

> I like the idea of having separate ".window()" and ".rowWindow()"
> clauses.

+1

>  I would prefer to not have a "partitionBy" statement.

As far as I know, the partitionBy is playing the same role with groupBy in Table API. So maybe
groupBy is enough, there is no need to introduce a new clause. 


- Jark Wu 

> 在 2016年10月27日,上午2:22,Stephan Ewen <sewen@apache.org> 写道:
> 
> Hi all!
> 
> I think that in order to get a better hold on how we what to build the
> Table API, we need to *decide what the role of the Table API should be*. We
> touched on that a few times, but I think we still have different ideas
> about that.
> 
> To get there, let me take back a step and look at the design of Stream SQL
> again. There were basically two competing approaches:
> 
> (1) Keep SQL as it is and make it run on infinite streams via introducing
> dynamic tables
> (2) Do a new language that is similar to SQL, but designed with streaming
> concepts in mind (first class support for time and windows)
> 
> Both approaches had good points. The Stream SQL design doc posted followed
> approach (1) - keep SQL as it is.
> 
> 
> 
> Now, for the Table API, we seem to be having a similar discussion again.
> 
> (1) Let the Table API be as similar to SQL as possible, simply make it feel
> "fluently embedded" in Scala.
> (2) Define the Table API as one would define a new and clean DSL for
> streaming. SQL inspired, of course. Where SQL syntax feels natural, use the
> SQL syntax, but make it very accessible to Java/Scala (non-SQL) programmers.
> 
> 
> I am personally more in favor of variant (2) for the following reasons:
> 
>  - We already have SQL compliant with the standards and tools.
> 
>  - Mirroring SQL too closely into the Table API has the marginal benefit
> that someone close to SQL will find it a bit more familiar. Not sure if
> that is even the case, as they have to re-learn the fluent DSL and Scala
> concepts
> 
>  - We are making it more difficult for all those that come from a more
> Scala/Java DataStream background and simply want to move "a layer up",
> getting schema and more optimizations into the equation.
> 
> 
> 
> 
> What would that mean for the specific issues that are discussed in FLIP-11?
> Based on interpreting the Table API as re-imaged streaming DSL, I would
> suggest to
> 
>  - Not put the window definition into the groupBy clause. It just is
> unexpected for all that are not super familiar with SQL and hard to
> discover in the IDE. A separate window clause is simpler for users coming
> from the DataStream background (or other streaming APIs) and it is more
> discoverable in the IDE.
> 
>  - I like the idea of having separate ".window()" and ".rowWindow()"
> clauses. Makes it more explicit that very different things will happen.
> 
>  - I would prefer to not have a "partitionBy" statement. When we restrain
> the Table API at least initially to having one partitioning for the
> windows, we should be able to express the partitioning by simply adding it
> to the fields in the "groupBy" clause. That would make the API easier
> accessible to users that not SQL powerusers.
> 
> 
> What do others think?
> 
> Greetings,
> Stephan
> 
> 
> On Sat, Oct 15, 2016 at 1:02 AM, Fabian Hueske <fhueske@gmail.com> wrote:
> 
>> Thanks for your reply Shaoxuan!
>> 
>> Please see my replies below.
>> 
>> Best, Fabian
>> 
>> 2016-10-14 11:34 GMT+02:00 Sean Wang <wshaoxuan@gmail.com>:
>> 
>>> Thanks for your quick reply, Fabian.
>>> 
>>> I have a few minor comments&suggestions:
>>> 
>>> <GroupBy with window>
>>> - Agree that we should consider GroupBy without window after the new SQL
>>> proposal is settled down.
>>> 
>>> 
>> OK, so we keep this as it is for now. GroupBy without windows will be
>> supported later when we are able to "guard" the memory requirements of that
>> operation.
>> 
>> 
>>> <GroupBy with window>
>>> - For Java API, we can keep window() call, and put window alias into
>>> Groupby clause. This can be also applied to rowwindow case.
>>> 
>>> 
>> Referring to the window alias in the groupBy clause would require to invert
>> the methods, i.e., groupBy().window() -> window().groupBy(). I am not sure
>> if that is more intuitive. Also, Scala and Java are using the same class
>> (Table) but different methods (Java uses String parameter, Scala Expression
>> parameters). In my opinion it makes sense to have both APIs closely synced.
>> So I would either keep window() (after groupBy) for Scala and Java or
>> remove it for both.
>> 
>> 
>>> <RowWindows> & <partitionby() for rowwindow>
>>> -+1 to support replace groupby() by partitionby(). BTW, in the case of
>>> over, instead of partitionby, are we going to support orderby? If yes, I
>>> would suggest to define rowwindow as  rowwindow(PartionByParaType,
>> OrderBy
>>> ParaType, WindowParaType).
>>> 
>>> 
>> The current FLIP-11 proposal supports defining both partitionBy and orderBy
>> (with a few restrictions).
>> PartitionBy is defined for all windows alike by calling
>> 
>> table.partitionBy(...).rowWindow(Window1 as w1, Window2 as
>> w2).select(count() over w1).
>> 
>> Allowing windows with different partitioning would mean that data is
>> shuffled to different nodes and that we need a distributed join to assemble
>> the result rows. In principle this could be done but would be very
>> expensive to execute (applies to batch and streaming). In my opinion, we
>> should not support this case.
>> 
>> OrderBy is implicitly supported by the on() clause of RowWindows, e.g.,
>> 
>> rowWindow(TumbleRows over 10.minutes on ‘rowtime as ‘w)
>> 
>> says that the window is ordered on the rowtime, i.e., event-time,
>> attribute. For streaming we can only allow event-time order (or
>> processing-time order which is always given). Orders on other attributes
>> would not be possible (for infinite dynamic input tables) or very expensive
>> (memory and computation wise) to maintain (for finite dynamic input
>> tables). For queries on batch tables, in principle all orders are possible.
>> With the current proposal only count windows are supported for arbitrary
>> attribute types and time windows for timestamp attributes.
>> 
>> So
>>> - moving windows into the groupBy() call :   +1
>>> 
>> 
>> +1
>> 
>> 
>>> - making over() for rowWindow() with a single window definition.
>>> 
>> 
>> +1
>> 
>> 
>>> - additionally allowing window definitions in over():  +1 yes for scala,
>>> but use alias for java API.
>>> 
>> 
>> If we have the parser code for Java group windows in groupBy() it should be
>> easy to adapt this for over(). But we should also keep the rowWindow method
>> to define aliases.
>> 
>> 
>>> - using partitionBy() instead of groupBy() for row windows?: +1, but
>>> better to consider orderby, it may be even better to move partitionBy()
>>> into rowwindow.
>>> 
>>> 
>> +1 to change groupBy() to partitionBy().
>> I would not move partitionBy() into the RowWindow definition but keep it
>> outside to ensure only one partitioning is defined. The orderBy definition
>> is already fluently included in the RowWindow via the on() method.
>> 
>> 
>> 
>>> Regards,
>>> Shaoxuan
>>> 
>>> 
>>> On Thu, Oct 13, 2016 at 6:05 PM, Fabian Hueske <fhueske@gmail.com>
>> wrote:
>>> 
>>>> Hi everybody,
>>>> 
>>>> happy to see a good discussion here :-)
>>>> I'll reply to Shaoxuan's mail first and comment on Zhangrucong question
>>>> in a separate mail.
>>>> 
>>>> Shaoxuan, thanks for the suggestions! I think we all agree that for SQL
>>>> we should definitely follow the standard (batch) SQL syntax.
>>>> In my opinion, the Table API does not necessarily have to be as close as
>>>> possible to SQL but should try to make a few things easier and also
>> safer
>>>> (easier is of course subjective).
>>>> 
>>>> - GroupBy without windows: These are currently intentionally not
>>>> supported and also not part of FLIP-11. Our motivation for not
>> supporting
>>>> this, is to guard the user from defining a query that fails when being
>>>> executed due to a very memory consuming operation. FLIP-11 provides a
>> way
>>>> to define such a query as a sliding row window with unbounded preceding
>>>> rows. With the upcoming SQL proposal, queries that consume unbounded
>> memory
>>>> should be identified and rejected. I would be in favor of allowing
>> groupBy
>>>> without windows once the guarding mechanism are in place.
>>>> 
>>>> - GroupBy with window: I think this is a question of taste. Having a
>>>> window() call, makes the feature more explicit in my opinion. However,
>> I'm
>>>> not opposed to move the windows into the groupBy clause.
>>>> Implementation-wise it should be easy to move the window definition
>> into to
>>>> groupBy clause for the Scala Table API. For the Java Table API we would
>>>> need to extend the parser quite a bit because windows would need to be
>>>> defined as Strings and not via objects.
>>>> 
>>>> - RowWindows: The rowWindow() call mimics the standard SQL WINDOW clause
>>>> (implemented by PostgreSQL and Calcite) which allows to have "reusable"
>>>> window definitions. I think this is a desirable feature. In the FLIP-11
>>>> proposal the over() clause in select() refers to the predefined windows
>>>> with aliases. In case only one window is defined, the over() clause is
>>>> optional and the same (and only) window is applied to all aggregates. I
>>>> think we can make the over() call mandatory to have the windowing more
>>>> explicit. It should also be possible to extend the over clause to
>> directly
>>>> accept RowWindows instead of window aliases. I would not make this a
>>>> priority at the moment, but a feature that could be later added, because
>>>> rowWindow() and over() cover all cases. Similar as for GroupBy with
>>>> windows, we would need to extend the parser for the Java Table API
>> though.
>>>> 
>>>> Finally, I have an own suggestion:
>>>> In FLIP-11, groupBy() is  used to define the partitioning of RowWindows.
>>>> I think this should be changed to partitionBy() because groupBy() groups
>>>> data and applies an aggregation to all rows of a group which is not
>>>> happening here. In original SQL, the OVER clause features a PARTITION BY
>>>> clause. We are moving this out of the window definition, i.e., OVER
>> clause,
>>>> to enforce the same partitioning for all windows (different
>> partitionings
>>>> would be a challenge to execute in a parallel system).
>>>> 
>>>> @Timo and all: What do you think about:
>>>> 
>>>> - moving windows into the groupBy() call
>>>> - making over() for rowWindow() with a single window definition
>>>> - additionally allowing window definitions in over()
>>>> - using partitionBy() instead of groupBy() for row windows?
>>>> 
>>>> Best, Fabian
>>>> 
>>>> 2016-10-13 11:10 GMT+02:00 Zhangrucong <zhangrucong@huawei.com>:
>>>> 
>>>>> Hi shaoxuan:
>>>>> 
>>>>> I think,  the streamsql must be excuted in table environment. So I call
>>>>> this table API ‘s StreamSQL. What do you call for this, stream Table
>> API or
>>>>> streamsql? It is fu
>>>>> 
>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>> val tblEnv = TableEnvironment.getTableEnvironment(env)
>>>>> val ds: DataStream[(String,Long, Long)] =
>> env.readTextFile("/home/demo")
>>>>> tblEnv.registerDataStream("Order", ds, 'userID, 'count, 'num)
>>>>>    .map(f=>(f, 1L, 1L))
>>>>> val sql = tblEnv.sql("SELECT Stream * FROM Order WHERE userID='A'")
>>>>> 
>>>>> So in my opinion, the grammar which is marked red should be compatible
>>>>> with calcite's StreamSQL grammar.
>>>>> 
>>>>> By the way,  thanks very much for telling me the modified content in
>>>>> Flink StreamSQL. I will look the new proposal .
>>>>> 
>>>>> Thanks!
>>>>> 发件人: Sean Wang [mailto:wshaoxuan@gmail.com]
>>>>> 发送时间: 2016年10月13日 16:29
>>>>> 收件人: dev@flink.apache.org; Zhangrucong
>>>>> 主题: Re: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
>>>>> 
>>>>> Hi  zhangrucong,
>>>>> I am not sure what you mean by "table API'S StreamSQL", I guess you
>> mean
>>>>> "stream TableAPI"?
>>>>> TableAPI should be compatible with calcite SQL. (By compatible, My
>>>>> understanding is that both TableAPI and SQL will be translated to the
>> same
>>>>> logical plan - the same set of REL and REX).
>>>>> BTW, please note that we recently have merged a change to remove STREAM
>>>>> keyword for flink stream SQL(FLINK-4546). In our opinion, batch and
>> stream
>>>>> are not necessarily to be differentiated at the SQL level. The major
>>>>> difference between batch and stream is "WHEN and HOW to emit the
>> result".
>>>>> We have been working on a new proposal with Fabian on this change. I
>>>>> guess it will be sent out for review very soon.
>>>>> 
>>>>> Regards,
>>>>> Shaoxuan
>>>>> 
>>>>> 
>>>>> On Thu, Oct 13, 2016 at 2:29 PM, Zhangrucong <zhangrucong@huawei.com
>>>>> <mailto:zhangrucong@huawei.com>> wrote:
>>>>> Hi shaoxuan:
>>>>> Does the table API'S StreamSQL grammar is compatible with calcite's
>>>>> StreamSQL grammar?
>>>>> 
>>>>> 
>>>>> 1、In calcite, the tumble window is realized by using function tumble
or
>>>>> hop. And the function must be used with group by, like this:
>>>>> 
>>>>> SELECT
>>>>>  TUMBLE_END(rowtime, INTERVAL '30' MINUTE, TIME '0:12') AS rowtime,
>>>>>  productId,
>>>>>  COUNT(*) AS c,
>>>>>  SUM(units) AS units
>>>>> FROM Orders
>>>>> GROUP BY TUMBLE(rowtime, INTERVAL '30' MINUTE, TIME '0:12'),
>>>>>  productId;
>>>>> 
>>>>> 2、 The sliding window uses keywords "window" and "over". Like this:
>>>>> 
>>>>> SELECT  *
>>>>> FROM (
>>>>>  SELECT STREAM rowtime,
>>>>>    productId,
>>>>>    units,
>>>>>    AVG(units) OVER product (RANGE INTERVAL '10' MINUTE PRECEDING) AS
>>>>> m10,
>>>>>    AVG(units) OVER product (RANGE INTERVAL '7' DAY PRECEDING) AS d7
>>>>>  FROM Orders
>>>>>  WINDOW product AS (
>>>>>    ORDER BY rowtime
>>>>>    PARTITION BY productId))
>>>>> 
>>>>> 
>>>>> 
>>>>> Thanks!
>>>>> 
>>>>> -----邮件原件-----
>>>>> 发件人: 王绍翾(大沙) [mailto:shaoxuan.wsx@alibaba-inc.com<mailto:
>>>>> shaoxuan.wsx@alibaba-inc.com>]
>>>>> 发送时间: 2016年10月13日 2:03
>>>>> 收件人: dev@flink.apache.org<mailto:dev@flink.apache.org>
>>>>> 主题: RE:[DISCUSS] FLIP-11: Table API Stream Aggregations
>>>>> 
>>>>> Hi Fabian, Timo, and Jark.Thanks for kicking off this FLIP. This is a
>>>>> really great and promising proposal. I have a few comments to the
>> "window"
>>>>> operator proposed in this FLIP (I am hoping it is not too late to
>> bring up
>>>>> this). First, window is not always needed for the stream aggregation.
>> There
>>>>> are cases where we want do an aggreation on a stream, while the
>> query/emit
>>>>> strategy decides when to emit a streaming output. Second, window is
>> needed
>>>>> when we want do an aggregation for a certain rage, but window is not
an
>>>>> operator. We basically use window to define the range for aggregation.
>> In
>>>>> tableAPI, a window should be defined together with "groupby" and
>> "select"
>>>>> operators, either inside a "groupby" operator or after an "over"
>> clause in
>>>>> "select" operator. This will make the TableAPI in the similar manner
>> as SQL.
>>>>> For instance,[A groupby without window]
>>>>> <Table API>
>>>>> val res = tab
>>>>> .groupBy(‘a)
>>>>> .select(‘a, ‘b.sum)
>>>>> <SQL>
>>>>> SELECT a, SUM(b)
>>>>> FROM tab
>>>>> GROUP BY a
>>>>> [A tumble window inside groupby]
>>>>> <Table API>val res = tab
>>>>> .groupBy(‘a, tumble(10.minutes, ‘rowtime)) .select(‘a, ‘b.sum)
>>>>> <SQL>SELECT a, SUM(b)FROM tab GROUP BY a, TUMBLE(10.minutes ,
>> ‘rowtime) [A
>>>>> row tumble window after OVER] <Table API>.groupby('a) //optional
>>>>> .select(‘a, ‘b.count over rowTumble(10.minutes, ‘rowtime))<SQL>SELECT
>> a,
>>>>> COUNT(b) OVER ROWTUMBLE(10.minutes, ‘rowtime)FROM tab GROUP BY a
>> Please let
>>>>> me know what you think.
>>>>> Regards,Shaoxuan
>>>>> ------------------------------------------------------------
>> ------发件人:Fabian
>>>>> Hueske <fhueske@gmail.com<mailto:fhueske@gmail.com>>发送时间:2016年9月26日
>> (星期一)
>>>>> 21:13收件人:dev@flink.apache.org<mailto:dev@flink.apache.org>
<
>>>>> dev@flink.apache.org<mailto:dev@flink.apache.org>>主 题:Re:
[DISCUSS]
>>>>> FLIP-11: Table API Stream Aggregations Hi everybody,
>>>>> 
>>>>> Timo proposed our FLIP-11 a bit more than three weeks ago.
>>>>> I will update the status of the FLIP to accepted.
>>>>> 
>>>>> Thanks,
>>>>> Fabian
>>>>> 
>>>>> 2016-09-19 9:16 GMT+02:00 Timo Walther <twalthr@apache.org<mailto:twa
>>>>> lthr@apache.org>>:
>>>>> 
>>>>>> Hi Jark,
>>>>>> 
>>>>>> yes I think enough time has passed. We can start implementing the
>>>>> changes.
>>>>>> What do you think Fabian?
>>>>>> 
>>>>>> If there are no objections, I will create the subtasks in Jira today.
>>>>>> For
>>>>>> FLIP-11/1 I already have implemented a prototype, I just have to
do
>>>>>> some
>>>>>> refactoring/documentation before opening a PR.
>>>>>> 
>>>>>> Timo
>>>>>> 
>>>>>> 
>>>>>> Am 18/09/16 um 04:46 schrieb Jark Wu:
>>>>>> 
>>>>>> Hi all,
>>>>>>> 
>>>>>>> It seems that there’s no objections to the window design. So
could
>> we
>>>>>>> open subtasks to start working on it now ?
>>>>>>> 
>>>>>>> - Jark Wu
>>>>>>> 
>>>>>>> 在 2016年9月7日,下午4:29,Jark Wu <wuchong.wc@alibaba-inc.com<mailto:
>>>>> wuchong.wc@alibaba-inc.com>> 写道:
>>>>>>>> 
>>>>>>>> Hi Fabian,
>>>>>>>> 
>>>>>>>> Thanks for sharing your ideas.
>>>>>>>> 
>>>>>>>> They all make sense to me. Regarding to reassigning timestamp,
I do
>>>>>>>> not
>>>>>>>> have an use case. I come up with this because DataStream
has a
>>>>>>>> TimestampAssigner :)
>>>>>>>> 
>>>>>>>> +1 for this FLIP.
>>>>>>>> 
>>>>>>>> - Jark Wu
>>>>>>>> 
>>>>>>>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhueske@gmail.com<mailto:fhue
>>>>> ske@gmail.com> <mailto:
>>>>> 
>>>>>>>>> fhueske@gmail.com<mailto:fhueske@gmail.com>>>
写道:
>>>>>>>>> 
>>>>>>>>> Hi,
>>>>>>>>> 
>>>>>>>>> thanks for your comments and questions!
>>>>>>>>> Actually, you are bringing up the points that Timo and
I discussed
>>>>>>>>> the
>>>>>>>>> most
>>>>>>>>> when designing the FLIP ;-)
>>>>>>>>> 
>>>>>>>>> - We also thought about the syntactic shortcut for running
>>>>>>>>> aggregates
>>>>>>>>> like
>>>>>>>>> you proposed (table.groupBy(‘a).select(…)). Our motivation
to not
>>>>>>>>> allow
>>>>>>>>> this shortcut is to prevent users from accidentally performing
a
>>>>>>>>> "dangerous" operation. The problem with unbounded sliding
>>>>>>>>> row-windows is
>>>>>>>>> that their state does never expire. If you have an evolving
key
>>>>>>>>> space,
>>>>>>>>> you
>>>>>>>>> will likely run into problems at some point because the
operator
>>>>>>>>> state
>>>>>>>>> grows too large. IMO, a row-window session is a better
approach,
>>>>>>>>> because it
>>>>>>>>> defines a timeout after which state can be discarded.
>>>>>>>>> groupBy.select is
>>>>>>>>> a
>>>>>>>>> very common operation in batch but its semantics in streaming
are
>>>>>>>>> very
>>>>>>>>> different. In my opinion it makes sense to make users
aware of
>>>>>>>>> these
>>>>>>>>> differences through the API.
>>>>>>>>> 
>>>>>>>>> - Reassigning timestamps and watermarks is a very delicate
issue.
>>>>>>>>> You
>>>>>>>>> are
>>>>>>>>> right, that Calcite exposes this field which is necessary
due to
>>>>>>>>> the
>>>>>>>>> semantics of SQL. However, also in Calcite you cannot
freely
>> choose
>>>>>>>>> the
>>>>>>>>> timestamp attribute for streaming queries (it must be
a monotone
>> or
>>>>>>>>> quasi-monotone attribute) which is hard to reason about
(and
>>>>>>>>> guarantee)
>>>>>>>>> after a few operators have been applied. Streaming tables
in Flink
>>>>>>>>> will
>>>>>>>>> likely have a time attribute which is identical to the
initial
>>>>> rowtime.
>>>>>>>>> However, Flink does modify timestamps internally, e.g.,
for
>> records
>>>>>>>>> that
>>>>>>>>> are emitted from time windows, in order to ensure that
consecutive
>>>>>>>>> windows
>>>>>>>>> perform as expected. Modify or reassign timestamps in
the middle
>> of
>>>>>>>>> a
>>>>>>>>> job
>>>>>>>>> can result in unexpected results which are very hard
to reason
>>>>>>>>> about. Do
>>>>>>>>> you have a concrete use case in mind for reassigning
timestamps?
>>>>>>>>> 
>>>>>>>>> - The idea to represent rowtime and systime as object
is good. Our
>>>>>>>>> motivation to go for reserved Scala symbols was to have
a uniform
>>>>>>>>> syntax
>>>>>>>>> with windows over streaming and batch tables. On batch
tables you
>>>>>>>>> can
>>>>>>>>> compute time windows basically over every time attribute
(they are
>>>>>>>>> treated
>>>>>>>>> similar to grouping attributes with a bit of extra logic
to
>> extract
>>>>>>>>> the
>>>>>>>>> grouping key for sliding and session windows). If you
write
>>>>>>>>> window(Tumble
>>>>>>>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime
would
>>>>>>>>> indicate
>>>>>>>>> event-time. On a batch table with a 'rowtime attribute,
the same
>>>>>>>>> operator
>>>>>>>>> would be internally converted into a group by. By going
for the
>>>>>>>>> object
>>>>>>>>> approach we would lose this compatibility (or would need
to
>>>>>>>>> introduce an
>>>>>>>>> additional column attribute to specifiy the window attribute
for
>>>>>>>>> batch
>>>>>>>>> tables).
>>>>>>>>> 
>>>>>>>>> As usual some of the design decisions are based on preferences.
>>>>>>>>> Do they make sense to you? Let me know what you think.
>>>>>>>>> 
>>>>>>>>> Best, Fabian
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong.wc@alibaba-inc.com<ma
>>>>> ilto:wuchong.wc@alibaba-inc.com> <mailto:
>>>>> 
>>>>>>>>> wuchong.wc@alibaba-inc.com<mailto:wuchong.wc@alibaba-inc.com>>>:
>>>>>>>>> 
>>>>>>>>> Hi all,
>>>>>>>>>> 
>>>>>>>>>> I'm on vacation for about five days , sorry to have
missed this
>>>>>>>>>> great
>>>>>>>>>> FLIP.
>>>>>>>>>> 
>>>>>>>>>> Yes, the non-windowed aggregates is a special case
of row-window.
>>>>>>>>>> And
>>>>>>>>>> the
>>>>>>>>>> proposal looks really good.  Can we have a simplified
form for
>> the
>>>>>>>>>> special
>>>>>>>>>> case? Such as : table.groupBy(‘a).rowWindow(Sl
>>>>>>>>>> ideRows.unboundedPreceding).select(…)
>>>>>>>>>> can be simplified to  table.groupBy(‘a).select(…).
The latter
>> will
>>>>>>>>>> actually
>>>>>>>>>> call the former.
>>>>>>>>>> 
>>>>>>>>>> Another question is about the rowtime. As the FLIP
said,
>>>>>>>>>> DataStream and
>>>>>>>>>> StreamTableSource is responsible to assign timestamps
and
>>>>>>>>>> watermarks,
>>>>>>>>>> furthermore “rowtime” and “systemtime” are
not real column. IMO,
>>>>>>>>>> it is
>>>>>>>>>> different with Calcite’s rowtime, which is a real
column in the
>>>>> table.
>>>>>>>>>> In
>>>>>>>>>> FLIP's way, we will lose some flexibility. Because
the timestamp
>>>>>>>>>> column may
>>>>>>>>>> be created after some transformations or join operation,
not
>>>>>>>>>> created at
>>>>>>>>>> beginning. So why do we have to define rowtime at
beginning?
>>>>>>>>>> (because
>>>>>>>>>> of
>>>>>>>>>> watermark?)     Can we have a way to define rowtime
after source
>>>>>>>>>> table
>>>>>>>>>> like
>>>>>>>>>> TimestampAssinger?
>>>>>>>>>> 
>>>>>>>>>> Regarding to “allowLateness” method. I come up
a trick that we
>> can
>>>>>>>>>> make
>>>>>>>>>> ‘rowtime and ‘system to be a Scala object, not
a symbol
>> expression.
>>>>>>>>>> The API
>>>>>>>>>> will looks like this :
>>>>>>>>>> 
>>>>>>>>>> window(Tumble over 10.minutes on rowtime allowLateness
as ‘w)
>>>>>>>>>> 
>>>>>>>>>> The implementation will look like this:
>>>>>>>>>> 
>>>>>>>>>> class TumblingWindow(size: Expression) extends Window
{
>>>>>>>>>>  def on(time: rowtime.type): TumblingEventTimeWindow
=
>>>>>>>>>>      new TumblingEventTimeWindow(alias, ‘rowtime,
size)
>> //
>>>>>>>>>> has
>>>>>>>>>> allowLateness() method
>>>>>>>>>> 
>>>>>>>>>>  def on(time: systemtime.type): TumblingProcessingTimeWindow=
>>>>>>>>>>     new TumblingProcessingTimeWindow(alias, ‘systemtime,
size)
>>>>>>>>>> // hasn’t allowLateness() method
>>>>>>>>>> }
>>>>>>>>>> object rowtime
>>>>>>>>>> object systemtime
>>>>>>>>>> 
>>>>>>>>>> What do you think about this?
>>>>>>>>>> 
>>>>>>>>>> - Jark Wu
>>>>>>>>>> 
>>>>>>>>>> 在 2016年9月6日,下午11:00,Timo Walther
<twalthr@apache.org<mailto:twa
>>>>> lthr@apache.org> <mailto:
>>>>> 
>>>>>>>>>>> twalthr@apache.org<mailto:twalthr@apache.org>>>
写道:
>>>>>>>>>>> 
>>>>>>>>>>> Hi all,
>>>>>>>>>>> 
>>>>>>>>>>> I thought about the API of the FLIP again. If
we allow the
>>>>>>>>>>> "systemtime"
>>>>>>>>>>> 
>>>>>>>>>> attribute, we cannot implement a nice method chaining
where the
>>>>>>>>>> user
>>>>>>>>>> can
>>>>>>>>>> define a "allowLateness" only on event time. So even
if the user
>>>>>>>>>> expressed
>>>>>>>>>> that "systemtime" is used we have to offer a "allowLateness"
>>>>>>>>>> method
>>>>>>>>>> because
>>>>>>>>>> we have to assume that this attribute can also be
the batch event
>>>>>>>>>> time
>>>>>>>>>> column, which is not very nice.
>>>>>>>>>> 
>>>>>>>>>>> class TumblingWindow(size: Expression) extends
Window {
>>>>>>>>>>> def on(timeField: Expression): TumblingEventTimeWindow
=
>>>>>>>>>>>   new TumblingEventTimeWindow(alias, timeField,
size) // has
>>>>>>>>>>> 
>>>>>>>>>> allowLateness() method
>>>>>>>>>> 
>>>>>>>>>>> }
>>>>>>>>>>> 
>>>>>>>>>>> What do you think?
>>>>>>>>>>> 
>>>>>>>>>>> Timo
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi Jark,
>>>>>>>>>>>> 
>>>>>>>>>>>> you had asked for non-windowed aggregates
in the Table API a
>> few
>>>>>>>>>>>> times.
>>>>>>>>>>>> FLIP-11 proposes row-window aggregates which
are a
>>>>>>>>>>>> generalization of
>>>>>>>>>>>> running aggregates (SlideRow unboundedPreceding).
>>>>>>>>>>>> 
>>>>>>>>>>>> Can you have a look at the FLIP and give
feedback whether this
>>>>>>>>>>>> is
>>>>>>>>>>>> what
>>>>>>>>>>>> 
>>>>>>>>>>> you
>>>>>>>>>> 
>>>>>>>>>>> are looking for?
>>>>>>>>>>>> Improvement suggestions are very welcome
as well.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thank you,
>>>>>>>>>>>> Fabian
>>>>>>>>>>>> 
>>>>>>>>>>>> 2016-09-01 16<tel:2016-09-01%C2%A016>:12
GMT+02:00 Timo Walther
>>>>> <twalthr@apache.org<mailto:twalthr@apache.org> <mailto:
>>>>>>>>>>>> twalthr@apache.org<mailto:twalthr@apache.org>>>:
>>>>>>>>>>>> 
>>>>>>>>>>>> Hi all!
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Fabian and I worked on a FLIP for Stream
Aggregations in the
>>>>>>>>>>>>> Table
>>>>>>>>>>>>> API.
>>>>>>>>>>>>> You can find the FLIP-11 here:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h
>>>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25> <
>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%<h
>>>>> ttps://cwiki.apache.org/confluence/display/FLINK/FLIP-11%25>>
>>>>> 
>>>>>>>>>>>>> 3A+Table+API+Stream+Aggregations
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Motivation for the FLIP:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> The Table API is a declarative API to
define queries on static
>>>>>>>>>>>>> and
>>>>>>>>>>>>> streaming tables. So far, only projection,
selection, and
>> union
>>>>>>>>>>>>> are
>>>>>>>>>>>>> supported operations on streaming tables.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> This FLIP proposes to add support for
different types of
>>>>>>>>>>>>> aggregations
>>>>>>>>>>>>> 
>>>>>>>>>>>> on
>>>>>>>>>> 
>>>>>>>>>>> top of streaming tables. In particular, we seek
to support:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> - Group-window aggregates, i.e., aggregates
which are computed
>>>>>>>>>>>>> for a
>>>>>>>>>>>>> 
>>>>>>>>>>>> group
>>>>>>>>>> 
>>>>>>>>>>> of elements. A (time or row-count) window is
required to bound
>>>>>>>>>>> the
>>>>>>>>>>>>> 
>>>>>>>>>>>> infinite
>>>>>>>>>> 
>>>>>>>>>>> input stream into a finite group.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> - Row-window aggregates, i.e., aggregates
which are computed
>>>>>>>>>>>>> for
>>>>>>>>>>>>> each
>>>>>>>>>>>>> 
>>>>>>>>>>>> row,
>>>>>>>>>> 
>>>>>>>>>>> based on a window (range) of preceding and succeeding
rows.
>>>>>>>>>>>>> Each type of aggregate shall be supported
on keyed/grouped or
>>>>>>>>>>>>> non-keyed/grouped data streams for streaming
tables as well as
>>>>>>>>>>>>> batch
>>>>>>>>>>>>> 
>>>>>>>>>>>> tables.
>>>>>>>>>> 
>>>>>>>>>>> We are looking forward to your feedback.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Timo
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>> --
>>>>>>>>>>> Freundliche Grüße / Kind Regards
>>>>>>>>>>> 
>>>>>>>>>>> Timo Walther
>>>>>>>>>>> 
>>>>>>>>>>> Follow me: @twalthr
>>>>>>>>>>> https://www.linkedin.com/in/twalthr
>>>>>>>>>>> <https://www.linkedin.com/in/t
>>>>>>>>>>> walthr>
>>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> --
>>>>>> Freundliche Grüße / Kind Regards
>>>>>> 
>>>>>> Timo Walther
>>>>>> 
>>>>>> Follow me: @twalthr
>>>>>> https://www.linkedin.com/in/twalthr
>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 


Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message