flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: [DISCUSS] FLIP-11: Table API Stream Aggregations
Date Mon, 19 Sep 2016 07:16:41 GMT
Hi Jark,

yes I think enough time has passed. We can start implementing the 
changes. What do you think Fabian?

If there are no objections, I will create the subtasks in Jira today. 
For FLIP-11/1 I already have implemented a prototype, I just have to do 
some refactoring/documentation before opening a PR.


Am 18/09/16 um 04:46 schrieb Jark Wu:
> Hi all,
> It seems that there’s no objections to the window design. So could we open subtasks
to start working on it now ?
> - Jark Wu
>> 在 2016年9月7日,下午4:29,Jark Wu <wuchong.wc@alibaba-inc.com> 写道:
>> Hi Fabian,
>> Thanks for sharing your ideas.
>> They all make sense to me. Regarding to reassigning timestamp, I do not have an use
case. I come up with this because DataStream has a TimestampAssigner :)
>> +1 for this FLIP.
>> - Jark Wu
>>> 在 2016年9月7日,下午2:59,Fabian Hueske <fhueske@gmail.com <mailto:fhueske@gmail.com>>
>>> Hi,
>>> thanks for your comments and questions!
>>> Actually, you are bringing up the points that Timo and I discussed the most
>>> when designing the FLIP ;-)
>>> - We also thought about the syntactic shortcut for running aggregates like
>>> you proposed (table.groupBy(‘a).select(…)). Our motivation to not allow
>>> this shortcut is to prevent users from accidentally performing a
>>> "dangerous" operation. The problem with unbounded sliding row-windows is
>>> that their state does never expire. If you have an evolving key space, you
>>> will likely run into problems at some point because the operator state
>>> grows too large. IMO, a row-window session is a better approach, because it
>>> defines a timeout after which state can be discarded. groupBy.select is a
>>> very common operation in batch but its semantics in streaming are very
>>> different. In my opinion it makes sense to make users aware of these
>>> differences through the API.
>>> - Reassigning timestamps and watermarks is a very delicate issue. You are
>>> right, that Calcite exposes this field which is necessary due to the
>>> semantics of SQL. However, also in Calcite you cannot freely choose the
>>> timestamp attribute for streaming queries (it must be a monotone or
>>> quasi-monotone attribute) which is hard to reason about (and guarantee)
>>> after a few operators have been applied. Streaming tables in Flink will
>>> likely have a time attribute which is identical to the initial rowtime.
>>> However, Flink does modify timestamps internally, e.g., for records that
>>> are emitted from time windows, in order to ensure that consecutive windows
>>> perform as expected. Modify or reassign timestamps in the middle of a job
>>> can result in unexpected results which are very hard to reason about. Do
>>> you have a concrete use case in mind for reassigning timestamps?
>>> - The idea to represent rowtime and systime as object is good. Our
>>> motivation to go for reserved Scala symbols was to have a uniform syntax
>>> with windows over streaming and batch tables. On batch tables you can
>>> compute time windows basically over every time attribute (they are treated
>>> similar to grouping attributes with a bit of extra logic to extract the
>>> grouping key for sliding and session windows). If you write window(Tumble
>>> over 10.minutes on 'rowtime) on a streaming table, 'rowtime would indicate
>>> event-time. On a batch table with a 'rowtime attribute, the same operator
>>> would be internally converted into a group by. By going for the object
>>> approach we would lose this compatibility (or would need to introduce an
>>> additional column attribute to specifiy the window attribute for batch
>>> tables).
>>> As usual some of the design decisions are based on preferences.
>>> Do they make sense to you? Let me know what you think.
>>> Best, Fabian
>>> 2016-09-07 5:12 GMT+02:00 Jark Wu <wuchong.wc@alibaba-inc.com <mailto:wuchong.wc@alibaba-inc.com>>:
>>>> Hi all,
>>>> I'm on vacation for about five days , sorry to have missed this great FLIP.
>>>> Yes, the non-windowed aggregates is a special case of row-window. And the
>>>> proposal looks really good.  Can we have a simplified form for the special
>>>> case? Such as : table.groupBy(‘a).rowWindow(SlideRows.unboundedPreceding).select(…)
>>>> can be simplified to  table.groupBy(‘a).select(…). The latter will actually
>>>> call the former.
>>>> Another question is about the rowtime. As the FLIP said, DataStream and
>>>> StreamTableSource is responsible to assign timestamps and watermarks,
>>>> furthermore “rowtime” and “systemtime” are not real column. IMO,
it is
>>>> different with Calcite’s rowtime, which is a real column in the table.
>>>> FLIP's way, we will lose some flexibility. Because the timestamp column may
>>>> be created after some transformations or join operation, not created at
>>>> beginning. So why do we have to define rowtime at beginning? (because of
>>>> watermark?)     Can we have a way to define rowtime after source table like
>>>> TimestampAssinger?
>>>> Regarding to “allowLateness” method. I come up a trick that we can make
>>>> ‘rowtime and ‘system to be a Scala object, not a symbol expression. The
>>>> will looks like this :
>>>> window(Tumble over 10.minutes on rowtime allowLateness as ‘w)
>>>> The implementation will look like this:
>>>> class TumblingWindow(size: Expression) extends Window {
>>>>   def on(time: rowtime.type): TumblingEventTimeWindow =
>>>>       new TumblingEventTimeWindow(alias, ‘rowtime, size)        // has
>>>> allowLateness() method
>>>>   def on(time: systemtime.type): TumblingProcessingTimeWindow=
>>>>      new TumblingProcessingTimeWindow(alias, ‘systemtime, size)
>>>> // hasn’t allowLateness() method
>>>> }
>>>> object rowtime
>>>> object systemtime
>>>> What do you think about this?
>>>> - Jark Wu
>>>>> 在 2016年9月6日,下午11:00,Timo Walther <twalthr@apache.org
<mailto:twalthr@apache.org>> 写道:
>>>>> Hi all,
>>>>> I thought about the API of the FLIP again. If we allow the "systemtime"
>>>> attribute, we cannot implement a nice method chaining where the user can
>>>> define a "allowLateness" only on event time. So even if the user expressed
>>>> that "systemtime" is used we have to offer a "allowLateness" method because
>>>> we have to assume that this attribute can also be the batch event time
>>>> column, which is not very nice.
>>>>> class TumblingWindow(size: Expression) extends Window {
>>>>> def on(timeField: Expression): TumblingEventTimeWindow =
>>>>>    new TumblingEventTimeWindow(alias, timeField, size) // has
>>>> allowLateness() method
>>>>> }
>>>>> What do you think?
>>>>> Timo
>>>>> Am 05/09/16 um 10:41 schrieb Fabian Hueske:
>>>>>> Hi Jark,
>>>>>> you had asked for non-windowed aggregates in the Table API a few
>>>>>> FLIP-11 proposes row-window aggregates which are a generalization
>>>>>> running aggregates (SlideRow unboundedPreceding).
>>>>>> Can you have a look at the FLIP and give feedback whether this is
>>>> you
>>>>>> are looking for?
>>>>>> Improvement suggestions are very welcome as well.
>>>>>> Thank you,
>>>>>> Fabian
>>>>>> 2016-09-01 16:12 GMT+02:00 Timo Walther <twalthr@apache.org <mailto:twalthr@apache.org>>:
>>>>>>> Hi all!
>>>>>>> Fabian and I worked on a FLIP for Stream Aggregations in the
Table API.
>>>>>>> You can find the FLIP-11 here:
>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-11% <https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%>
>>>>>>> 3A+Table+API+Stream+Aggregations
>>>>>>> Motivation for the FLIP:
>>>>>>> The Table API is a declarative API to define queries on static
>>>>>>> streaming tables. So far, only projection, selection, and union
>>>>>>> supported operations on streaming tables.
>>>>>>> This FLIP proposes to add support for different types of aggregations
>>>> on
>>>>>>> top of streaming tables. In particular, we seek to support:
>>>>>>> - Group-window aggregates, i.e., aggregates which are computed
for a
>>>> group
>>>>>>> of elements. A (time or row-count) window is required to bound
>>>> infinite
>>>>>>> input stream into a finite group.
>>>>>>> - Row-window aggregates, i.e., aggregates which are computed
for each
>>>> row,
>>>>>>> based on a window (range) of preceding and succeeding rows.
>>>>>>> Each type of aggregate shall be supported on keyed/grouped or
>>>>>>> non-keyed/grouped data streams for streaming tables as well as
>>>> tables.
>>>>>>> We are looking forward to your feedback.
>>>>>>> Timo
>>>>> --
>>>>> Freundliche Grüße / Kind Regards
>>>>> Timo Walther
>>>>> Follow me: @twalthr
>>>>> https://www.linkedin.com/in/twalthr <https://www.linkedin.com/in/twalthr>

Freundliche Grüße / Kind Regards

Timo Walther

Follow me: @twalthr

View raw message