flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: [DISCUSS] Table API / SQL indicators for event and processing time
Date Wed, 15 Feb 2017 08:32:52 GMT
Hi all,

at first I also thought that built-in functions (rowtime() and 
proctime()) are the easiest solution. However, I think to be 
future-proof we should make them system attributes; esp. to relate them 
to a corresponding table in case of multiple tables. Logically they are 
attributes of each row, which is already done in Table API.

I will ask on the Calcite ML if there is a good way for integrating 
system attributes. Right now, I would propose the following implementation:

- we introduce a custom row type (extending RelDataType)
- in a streaming environment every row has two attributes by default 
(rowtime and proctime)
- we do not allow creating a row type with those attributes (this should 
already prevent `SELECT field AS rowtime FROM ...`)
- we need to ensure that these attributes are not part of expansion like 
`SELECT * FROM ...`
- implement some rule/logic that translates the attributes to special 
RexNodes internally, such that the opimizer does not modify these attributes

What do you think?

Regards,
Timo




Am 15/02/17 um 03:36 schrieb Xingcan Cui:
> Hi all,
>
> thanks for this thread.
>
> @Fabian If I didn't miss the point, the main difference between the two
> approaches is whether or not taking these time attributes as common table
> fields that are directly available to users. Whatever, these time
> attributes should be attached to records (right?), and the discussion lies
> in whether give them public qualifiers like other common fields or private
> qualifiers and related get/set methods.
>
> The former (system attributes) approach will be more compatible with
> existing SQL read-only operations (e.g., select, join), but we need to add
> restrictions on SQL modification operation (like what?). I think there are
> no needs to forbid users modifying these attributes via table APIs (like
> map function). Just inform them about these special attribute names like
> system built in aggregator names in iteration.
>
> As for the built in function approach, I don't know if, for now, there are
> functions applied on a single row (maybe the value access functions like
> COMPOSITE.get(STRING)?). It seems that most of the built in functions work
> for a single field or on columns and thus it will be mountains of work if
> we want to add a new kind of function to SQL. Maybe all existing operations
> should be modified to support it.
>
> All in all, if there are existing supports for single row function, I
> prefer the built in function approach. Otherwise the system attributes
> approach should be better. After all there are not so much modification
> operations in SQL and maybe we can use alias to support time attributes
> setting (just hypothesis, not sure if it's feasible).
>
> @Haohui I think the given query is valid if we add a aggregate
> function to (PROCTIME()
> - ROWTIME()) / 1000 and it should be executed efficiently.
>
> Best,
> Xingcan
>
> On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <ricetons@gmail.com> wrote:
>
>> Hi,
>>
>> Thanks for starting the discussion. I can see there are multiple trade-offs
>> in these two approaches. One question I have is that to which extent Flink
>> wants to open its APIs to allow users to access both processing and event
>> time.
>>
>> Before we talk about joins, my understanding for the two approaches that
>> you mentioned are essentially (1) treating the value of event / processing
>> time as first-class fields for each row, (2) limiting the scope of time
>> indicators to only specifying windows. Take the following query as an
>> example:
>>
>> SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table GROUP BY
>> FLOOR(PROCTIME() TO MINUTES)
>>
>> There are several questions we can ask:
>>
>> (1) Is it a valid query?
>> (2) How efficient the query will be?
>>
>> For this query I can see arguments from both sides. I think at the end of
>> the day it really comes down to what Flink wants to support. After working
>> on FLINK-5624 I'm more inclined to support the second approach (i.e.,
>> built-in functions). The main reason why is that the APIs of Flink are
>> designed to separate times from the real payloads. It probably makes sense
>> for the Table / SQL APIs to have the same designs.
>>
>> For joins I don't have a clear answer on top of my head. Flink requires two
>> streams to be put in the same window before doing the joins. This is
>> essentially a subset of what SQL can express. I don't know what would be
>> the best approach here.
>>
>> Regards,
>> Haohui
>>
>>
>> On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> It would as in the query I gave as an example before:
>>>
>>> SELECT
>>>    a,
>>>    SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2
>>> PRECEDING AND CURRENT ROW) AS sumB,
>>> FROM myStream
>>>
>>> Here "proctime" would be a system attribute of the table "myStream".
>>> The table would also have another system attribute called "rowtime" which
>>> would be used to indicate event time semantics.
>>> These attributes would always be present in tables which are derived from
>>> streams.
>>> Because we still require that streams have timestamps and watermarks
>>> assigned (either by the StreamTableSource or the somewhere downstream the
>>> DataStream program) when they are converted into a table, there is no
>> need
>>> to register anything.
>>>
>>> Does that answer your questions?
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2017-02-14 2:04 GMT+01:00 Radu Tudoran <radu.tudoran@huawei.com>:
>>>
>>>> Hi Fabian,
>>>>
>>>> Thanks for starting the discussion. Before I give my thoughts on this
>> can
>>>> you please give some examples of how would you see option of using
>>> "system
>>>> attributes"?
>>>> Do you use this when you register the stream as a table, do you use if
>>>> when you call an SQL query, do you use it when you translate back a
>> table
>>>> to a stream / write it to a dynamic table?
>>>>
>>>> Dr. Radu Tudoran
>>>> Senior Research Engineer - Big Data Expert
>>>> IT R&D Division
>>>>
>>>>
>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>> European Research Center
>>>> Riesstrasse 25, 80992 München
>>>>
>>>> E-mail: radu.tudoran@huawei.com
>>>> Mobile: +49 15209084330 <+49%201520%209084330>
>>>> Telephone: +49 891588344173 <+49%2089%201588344173>
>>>>
>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>> This e-mail and its attachments contain confidential information from
>>>> HUAWEI, which is intended only for the person or entity whose address
>> is
>>>> listed above. Any use of the information contained herein in any way
>>>> (including, but not limited to, total or partial disclosure,
>>> reproduction,
>>>> or dissemination) by persons other than the intended recipient(s) is
>>>> prohibited. If you receive this e-mail in error, please notify the
>> sender
>>>> by phone or email immediately and delete it!
>>>>
>>>> -----Original Message-----
>>>> From: Fabian Hueske [mailto:fhueske@gmail.com]
>>>> Sent: Tuesday, February 14, 2017 1:01 AM
>>>> To: dev@flink.apache.org
>>>> Subject: [DISCUSS] Table API / SQL indicators for event and processing
>>> time
>>>> Hi,
>>>>
>>>> I'd like to start an discussion about how Table API / SQL queries
>>> indicate
>>>> whether an operation is done in event or processing time.
>>>>
>>>> 1) Why do we need to indicate the time mode?
>>>>
>>>> We need to distinguish event time and processing time mode for
>> operations
>>>> in queries in order to have the semantics of a query fully defined.
>>>> This cannot be globally done in the TableEnvironment because some
>> queries
>>>> explicitly request an expression such as the ORDER BY clause of an OVER
>>>> window with PRECEDING / FOLLOWING clauses.
>>>> So we need a way to specify something like the following query:
>>>>
>>>> SELECT
>>>>    a,
>>>>    SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN 2
>> PRECEDING
>>>> AND CURRENT ROW) AS sumB, FROM myStream
>>>>
>>>> where "proctime" indicates processing time. Equivalently "rowtime"
>> would
>>>> indicate event time.
>>>>
>>>> 2) Current state
>>>>
>>>> The current master branch implements time support only for grouping
>>>> windows in the Table API.
>>>> Internally, the Table API converts a 'rowtime symbol (which looks like
>> a
>>>> regular attribute) into a special expression which indicates
>> event-time.
>>>> For example:
>>>>
>>>> table
>>>>    .window(Tumble over 5.milli on 'rowtime as 'w)
>>>>    .groupBy('a, 'w)
>>>>    .select(...)
>>>>
>>>> defines a tumbling event-time window.
>>>>
>>>> Processing-time is indicated by omitting a time attribute
>>>> (table.window(Tumble over 5.milli as 'w) ).
>>>>
>>>> 3) How can we do that in SQL?
>>>>
>>>> In SQL we cannot add special expressions without touching the parser
>>> which
>>>> we don't want to do because we want to stick to the SQL standard.
>>>> Therefore, I see only two options: adding system attributes or
>>>> (parameterless) built-in functions. I list some pros and cons of the
>>>> approaches below:
>>>>
>>>> 1. System Attributes:
>>>> + most natural way to access a property of a record.
>>>> + works with joins, because time attributes can be related to tables
>>>> - We need to ensure the attributes are not writable and always present
>> in
>>>> streaming tables (i.e., they should be system defined attributes).
>>>> - Need to adapt existing Table API expressions (will not change the API
>>>> but some parts of the internal translation)
>>>> - Event time value must be set when the stream is converted, processing
>>>> time is evaluated on the fly
>>>>
>>>> 2. Built-in Functions
>>>> + Users could try to modify time attributes which is not possible with
>>>> functions
>>>> - do not work with joins, because we need to address different
>> relations
>>>> - not a natural way to access a property of a record
>>>>
>>>> I think the only viable choice are system attributes, because built-in
>>>> functions cannot be used for joins.
>>>> However, system attributes are the more complex solution because they
>>> need
>>>> a better integration with Calcite's SQL validator (preventing user
>>>> attributes which are named rowtime for instance).
>>>>
>>>> Since there are currently a several contributions on the way (such as
>> SQL
>>>> OVER windows FLINK-5653 to FLINK-5658) that need time indicators, we
>>> need a
>>>> solution soon to be able to make progress.
>>>> There are two PRs, #3252 and #3271, which implement the built-in marker
>>>> functions proctime() and rowtime() and which could serve as a temporary
>>>> solution (since we do not work on joins yet).
>>>> I would like to suggest to use these functions as a starting point
>> (once
>>>> the PRs are merged) and later change to the system attribute solution
>>> which
>>>> needs a bit more time to be implemented.
>>>>
>>>> I talked with Timo today about this issue and he said he would like to
>>>> investigate how we can implement this as system functions properly
>>>> integrated with Calcite and the SQL Validator.
>>>>
>>>> What do others think?
>>>>
>>>> Best, Fabian
>>>>


Mime
View raw message