flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timo Walther <twal...@apache.org>
Subject Re: [DISCUSS] Table API / SQL indicators for event and processing time
Date Wed, 22 Feb 2017 15:44:23 GMT
Hi everyone,

I have create an issue [1] to track the progress of this topic. I have 
written a little design document [2] how we could implement the 
indicators and which parts have to be touched. I would suggest to 
implement a prototype, also to see what is possible and can be 
integrated both in Flink and Calcite. Feedback is welcome.

Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-5884
[2] 
https://docs.google.com/document/d/1JRXm09x_wKst6z6UXdCGF9tgF1ueOAsFiQwahR72vbc/edit?usp=sharing



Am 21/02/17 um 15:06 schrieb Fabian Hueske:
> Hi Xingcan,
>
> thanks for your thoughts.
> In principle you are right that the monotone attribute property would be
> sufficient, however there are more aspects to consider than that.
>
> Flink is a parallel stream processor engine which means that data is
> processed in separate processes and shuffle across them.
> Maintaining a strict order when merging parallel streams would be
> prohibitively expensive.
> Flink's watermark mechanism helps operators to deal with out-of-order data
> (due to out-of-order input or shuffles).
> I don't think we can separate the discussion about time attributes from
> watermarks if we want to use Flink as a processing engine and not
> reimplement large parts from scratch.
>
> When transforming a time attribute, we have to either align it with
> existing watermarks or generate new watermarks.
> If we want to allow all kinds of monotone transformations, we have to adapt
> the watermarks which is not trivial.
> Instead, I think we should initially only allow very few monotone
> transformations which are aligned with the existing watermarks. We might
> later relax this condition if we see that users request this feature.
>
> You are right, that we need to track which attribute can be used as a time
> attribute (i.e., is increasing and guarded by watermarks).
> For that we need to expose the time attribute when a Table is created
> (either when a DataStream is converted like: stream.toTable(tEnv, 'a, 'b,
> 't.rowtime) or in a StreamTableSource) and track how it is used in queries.
> I am not sure if the monotone property would be the right choice here,
> since data is only quasi-monotone and a monotone annotation might trigger
> some invalid optimizations which change the semantics of a query.
> Right now, Calcite does not offer a quasi-monotone property (at least I
> haven't found it).
>
> Best, Fabian
>
>
> 2017-02-21 4:41 GMT+01:00 Xingcan Cui <xingcanc@gmail.com>:
>
>> Hi all,
>>
>> As I said in another thread, the main difference between stream and table
>> is that a stream is an ordered list while a table is an unordered set.
>>
>> Without considering the out-of-order problem in practice, whether
>> event-time or processing-time can be just taken as a monotonically
>> increasing field and that's why the given query[1] would work. In other
>> words, we must guarantee the "SELECT MAX(t22.rowtime) ..." subquery returns
>> a single value that can be retrieved from the cached dynamic table since
>> it's dangerous to join two un-windowed streams.
>>
>> Under this circumstance, I just consider adding a "monotonic hint"(INC or
>> DEC) to the field of a (generalized) table (maybe using an annotation on
>> the registerDataXX method) that can be used to indicate whether a field is
>> monotonically increasing or decreasing. Then by taking rowtime as common
>> (monotonically increasing) field, there are several benefits:
>>
>> 1) This can uniform the table and stream by importing total ordering
>> relation to an unordered set.
>>
>> 2) These fields can be modified arbitrarily as long as they keep the
>> declared monotonic feature and the watermark problem does not exist any
>> more.
>>
>> 3) The monotonic hint will be useful in the query optimization process.
>>
>> What do you think?
>>
>> Best,
>> Xingcan
>>
>> [1]
>> SELECT​ ​t1.amount​,​ ​t2.rate
>> FROM​ ​
>>    table1 ​AS​ t1,
>> ​ ​ table2 ​AS​ ​t2
>> WHERE ​
>>    t1.currency = t2.currency AND
>>    t2.rowtime ​=​ ​(
>> ​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
>> ​ ​​ ​  FROM​ ​table2 ​AS​ t22
>> ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
>>
>> On Tue, Feb 21, 2017 at 2:52 AM, Fabian Hueske <fhueske@gmail.com> wrote:
>>
>>> Hi everybody,
>>>
>>> When Timo wrote to the Calcite mailing list, Julian Hyde replied and gave
>>> good advice and explained why a system attribute for event-time would be
>> a
>>> problem [1].
>>> I thought about this and agree with Julian.
>>>
>>> Here is a document to describe the problem, constraints in Flink and a
>>> proposal how to handle processing time and event time in Table API and
>> SQL:
>>> ->
>>> https://docs.google.com/document/d/1MDGViWA_
>> TCqpaVoWub7u_GY4PMFSbT8TuaNl-
>>> EpbTHQ
>>>
>>> Please have a look, comment and ask questions.
>>>
>>> Thank you,
>>> Fabian
>>>
>>> [1]
>>> https://lists.apache.org/thread.html/6397caf0ca37f97f2cd27d96f7a12c
>>> 6fa845d6fd0870214fdce18d96@%3Cdev.calcite.apache.org%3E
>>>
>>> 2017-02-16 1:18 GMT+01:00 Fabian Hueske <fhueske@gmail.com>:
>>>
>>>> Thanks everybody for the comments.
>>>>
>>>> Actually, I think we do not have much choice when deciding whether to
>> use
>>>> attributes or functions.
>>>> Consider the following join query:
>>>>
>>>> SELECT​ ​t1.amount​,​ ​t2.rate
>>>> FROM​ ​
>>>>    table1 ​AS​ t1,
>>>> ​ ​ table2 ​AS​ ​t2
>>>> WHERE ​
>>>>    t1.currency = t2.currency AND
>>>>    t2.rowtime ​=​ ​(
>>>> ​ ​​ ​  SELECT​ ​MAX(t22.rowtime)
>>>> ​ ​​ ​  FROM​ ​table2 ​AS​ t22
>>>> ​ ​​   ​AND​ ​t22.rowtime ​<=​ t1.rowtime)
>>>>
>>>> The query joins two streaming tables. Table 1 is a streaming table with
>>>> amounts in a certain currency. Table 2 is a (slowly changing) streaming
>>>> table of currency exchange rates.
>>>> We want to join the amounts stream with the exchange rate of the
>>>> corresponding currency that is valid (i.e., last received value ->
>>>> MAX(rowtime)) at the rowtime of the amounts row.
>>>> In order to specify the query, we need to refer to the rowtime of the
>>>> different tables. Hence, we need a way to relate the rowtime expression
>>> (or
>>>> marker) to a table.
>>>> This is not possible with a parameterless scalar function.
>>>>
>>>> I'd like to comment on the concerns regarding the performance:
>>>> In fact, the columns could be completely virtual and only exist during
>>>> query parsing and validation.
>>>> During execution, we can directly access the rowtime metadata of a
>> Flink
>>>> streaming record (which is present anyway) or look up the current
>>>> processing time from the machine clock. So the processing overhead
>> would
>>>> actually be the same as with a marker function.
>>>>
>>>> Regarding the question on what should be allowed with a system
>> attribute:
>>>> IMO, it could be used as any other attribute. We need it at least in
>>> GROUP
>>>> BY, ORDER BY, and WHERE to define windows and joins. We could also
>> allow
>>> to
>>>> access it in SELECT if we want users to give access to rowtime and
>>>> processing time. So @Haohui, your query could be supported.
>>>> However, what would not be allowed is to modify the value of the rows,
>>>> i.e., by naming another column rowtime, i.e., "SELECT sometimestamp AS
>>>> rowtime" would not be allowed, because Flink does not support to modify
>>> the
>>>> event time of a row (for good reasons) and processing time should not
>> be
>>>> modifiable anyway.
>>>>
>>>> @Timo:
>>>> I think the approach to only use the system columns during parsing and
>>>> validation and converting them to expressions afterwards makes a lot of
>>>> sense.
>>>> The question is how this approach could be nicely integrated with
>>> Calcite.
>>>> Best, Fabian
>>>>
>>>>
>>>>
>>>> 2017-02-15 16:50 GMT+01:00 Radu Tudoran <radu.tudoran@huawei.com>:
>>>>
>>>>> Hi,
>>>>>
>>>>> My initial thought would be that it makes more sense to thave
>> procTime()
>>>>> and rowTime() only as functions which in fact are to be used as
>> markers.
>>>>> Having the value (even from special system attributes does not make
>>> sense
>>>>> in some scenario such as the ones for creating windows, e.g.,
>>>>> If you have SELECT Count(*) OVER (ORDER BY procTime()...)
>>>>> If you get the value of procTime you cannot do anything as you need
>> the
>>>>> marker to know how to construct the window logic.
>>>>>
>>>>> However, your final idea of having " implement some rule/logic that
>>>>> translates the attributes to special RexNodes internally " I believe
>> is
>>>>> good and gives a solution to both problems. One the one hand for those
>>>>> scenarios where you need the value you can access the value, while for
>>>>> others you can see the special type of the RexNode and use it as a
>>> marker.
>>>>> Regarding keeping this data in a table...i am not sure as you would
>> say
>>>>> we  need to augment the data with two fields whether needed or
>>> not...this
>>>>> is nto necessary very efficient
>>>>>
>>>>>
>>>>> Dr. Radu Tudoran
>>>>> Senior Research Engineer - Big Data Expert
>>>>> IT R&D Division
>>>>>
>>>>>
>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>> European Research Center
>>>>> Riesstrasse 25, 80992 München
>>>>>
>>>>> E-mail: radu.tudoran@huawei.com
>>>>> Mobile: +49 15209084330
>>>>> Telephone: +49 891588344173
>>>>>
>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>> This e-mail and its attachments contain confidential information from
>>>>> HUAWEI, which is intended only for the person or entity whose address
>> is
>>>>> listed above. Any use of the information contained herein in any way
>>>>> (including, but not limited to, total or partial disclosure,
>>> reproduction,
>>>>> or dissemination) by persons other than the intended recipient(s) is
>>>>> prohibited. If you receive this e-mail in error, please notify the
>>> sender
>>>>> by phone or email immediately and delete it!
>>>>>
>>>>> -----Original Message-----
>>>>> From: Timo Walther [mailto:twalthr@apache.org]
>>>>> Sent: Wednesday, February 15, 2017 9:33 AM
>>>>> To: dev@flink.apache.org
>>>>> Subject: Re: [DISCUSS] Table API / SQL indicators for event and
>>>>> processing time
>>>>>
>>>>> Hi all,
>>>>>
>>>>> at first I also thought that built-in functions (rowtime() and
>>>>> proctime()) are the easiest solution. However, I think to be
>>> future-proof
>>>>> we should make them system attributes; esp. to relate them to a
>>>>> corresponding table in case of multiple tables. Logically they are
>>>>> attributes of each row, which is already done in Table API.
>>>>>
>>>>> I will ask on the Calcite ML if there is a good way for integrating
>>>>> system attributes. Right now, I would propose the following
>>> implementation:
>>>>> - we introduce a custom row type (extending RelDataType)
>>>>> - in a streaming environment every row has two attributes by default
>>>>> (rowtime and proctime)
>>>>> - we do not allow creating a row type with those attributes (this
>> should
>>>>> already prevent `SELECT field AS rowtime FROM ...`)
>>>>> - we need to ensure that these attributes are not part of expansion
>> like
>>>>> `SELECT * FROM ...`
>>>>> - implement some rule/logic that translates the attributes to special
>>>>> RexNodes internally, such that the opimizer does not modify these
>>> attributes
>>>>> What do you think?
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Am 15/02/17 um 03:36 schrieb Xingcan Cui:
>>>>>> Hi all,
>>>>>>
>>>>>> thanks for this thread.
>>>>>>
>>>>>> @Fabian If I didn't miss the point, the main difference between the
>>>>>> two approaches is whether or not taking these time attributes as
>>>>>> common table fields that are directly available to users. Whatever,
>>>>>> these time attributes should be attached to records (right?), and
>> the
>>>>>> discussion lies in whether give them public qualifiers like other
>>>>>> common fields or private qualifiers and related get/set methods.
>>>>>>
>>>>>> The former (system attributes) approach will be more compatible with
>>>>>> existing SQL read-only operations (e.g., select, join), but we need
>> to
>>>>>> add restrictions on SQL modification operation (like what?). I think
>>>>>> there are no needs to forbid users modifying these attributes via
>>>>>> table APIs (like map function). Just inform them about these special
>>>>>> attribute names like system built in aggregator names in iteration.
>>>>>>
>>>>>> As for the built in function approach, I don't know if, for now,
>> there
>>>>>> are functions applied on a single row (maybe the value access
>>>>>> functions like COMPOSITE.get(STRING)?). It seems that most of the
>>>>>> built in functions work for a single field or on columns and thus
it
>>>>>> will be mountains of work if we want to add a new kind of function
>> to
>>>>>> SQL. Maybe all existing operations should be modified to support
it.
>>>>>>
>>>>>> All in all, if there are existing supports for single row function,
>> I
>>>>>> prefer the built in function approach. Otherwise the system
>> attributes
>>>>>> approach should be better. After all there are not so much
>>>>>> modification operations in SQL and maybe we can use alias to support
>>>>>> time attributes setting (just hypothesis, not sure if it's
>> feasible).
>>>>>> @Haohui I think the given query is valid if we add a aggregate
>>>>>> function to (PROCTIME()
>>>>>> - ROWTIME()) / 1000 and it should be executed efficiently.
>>>>>>
>>>>>> Best,
>>>>>> Xingcan
>>>>>>
>>>>>> On Wed, Feb 15, 2017 at 6:17 AM, Haohui Mai <ricetons@gmail.com>
>>> wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> Thanks for starting the discussion. I can see there are multiple
>>>>>>> trade-offs in these two approaches. One question I have is that
to
>>>>>>> which extent Flink wants to open its APIs to allow users to access
>>>>>>> both processing and event time.
>>>>>>>
>>>>>>> Before we talk about joins, my understanding for the two approaches
>>>>>>> that you mentioned are essentially (1) treating the value of
event
>> /
>>>>>>> processing time as first-class fields for each row, (2) limiting
>> the
>>>>>>> scope of time indicators to only specifying windows. Take the
>>>>>>> following query as an
>>>>>>> example:
>>>>>>>
>>>>>>> SELECT (PROCTIME() - ROWTIME()) / 1000 AS latency FROM table
GROUP
>> BY
>>>>>>> FLOOR(PROCTIME() TO MINUTES)
>>>>>>>
>>>>>>> There are several questions we can ask:
>>>>>>>
>>>>>>> (1) Is it a valid query?
>>>>>>> (2) How efficient the query will be?
>>>>>>>
>>>>>>> For this query I can see arguments from both sides. I think at
the
>>>>>>> end of the day it really comes down to what Flink wants to support.
>>>>>>> After working on FLINK-5624 I'm more inclined to support the
second
>>>>>>> approach (i.e., built-in functions). The main reason why is that
>> the
>>>>>>> APIs of Flink are designed to separate times from the real
>> payloads.
>>>>>>> It probably makes sense for the Table / SQL APIs to have the
same
>>>>> designs.
>>>>>>> For joins I don't have a clear answer on top of my head. Flink
>>>>>>> requires two streams to be put in the same window before doing
the
>>>>>>> joins. This is essentially a subset of what SQL can express.
I
>> don't
>>>>>>> know what would be the best approach here.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Haohui
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Feb 14, 2017 at 12:26 AM Fabian Hueske <fhueske@gmail.com>
>>>>> wrote:
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> It would as in the query I gave as an example before:
>>>>>>>>
>>>>>>>> SELECT
>>>>>>>>     a,
>>>>>>>>     SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS BETWEEN
2
>>>>>>>> PRECEDING AND CURRENT ROW) AS sumB, FROM myStream
>>>>>>>>
>>>>>>>> Here "proctime" would be a system attribute of the table
>> "myStream".
>>>>>>>> The table would also have another system attribute called
>> "rowtime"
>>>>>>>> which would be used to indicate event time semantics.
>>>>>>>> These attributes would always be present in tables which
are
>> derived
>>>>>>>> from streams.
>>>>>>>> Because we still require that streams have timestamps and
>> watermarks
>>>>>>>> assigned (either by the StreamTableSource or the somewhere
>>>>>>>> downstream the DataStream program) when they are converted
into a
>>>>>>>> table, there is no
>>>>>>> need
>>>>>>>> to register anything.
>>>>>>>>
>>>>>>>> Does that answer your questions?
>>>>>>>>
>>>>>>>> Best, Fabian
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2017-02-14 2:04 GMT+01:00 Radu Tudoran <radu.tudoran@huawei.com>:
>>>>>>>>
>>>>>>>>> Hi Fabian,
>>>>>>>>>
>>>>>>>>> Thanks for starting the discussion. Before I give my
thoughts on
>>>>>>>>> this
>>>>>>> can
>>>>>>>>> you please give some examples of how would you see option
of
>> using
>>>>>>>> "system
>>>>>>>>> attributes"?
>>>>>>>>> Do you use this when you register the stream as a table,
do you
>> use
>>>>>>>>> if when you call an SQL query, do you use it when you
translate
>>>>>>>>> back a
>>>>>>> table
>>>>>>>>> to a stream / write it to a dynamic table?
>>>>>>>>>
>>>>>>>>> Dr. Radu Tudoran
>>>>>>>>> Senior Research Engineer - Big Data Expert IT R&D
Division
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>>>>>> European Research Center
>>>>>>>>> Riesstrasse 25, 80992 München
>>>>>>>>>
>>>>>>>>> E-mail: radu.tudoran@huawei.com
>>>>>>>>> Mobile: +49 15209084330 <+49%201520%209084330>
>>>>>>>>> Telephone: +49 891588344173 <+49%2089%201588344173>
>>>>>>>>>
>>>>>>>>> HUAWEI TECHNOLOGIES Duesseldorf GmbH
>>>>>>>>> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>>>>>>>>> Registered Office: Düsseldorf, Register Court Düsseldorf,
HRB
>>> 56063,
>>>>>>>>> Managing Director: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>>>>>> Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
HRB
>>> 56063,
>>>>>>>>> Geschäftsführer: Bo PENG, Wanzhou MENG, Lifang CHEN
>>>>>>>>> This e-mail and its attachments contain confidential
information
>>> from
>>>>>>>>> HUAWEI, which is intended only for the person or entity
whose
>>> address
>>>>>>> is
>>>>>>>>> listed above. Any use of the information contained herein
in any
>>> way
>>>>>>>>> (including, but not limited to, total or partial disclosure,
>>>>>>>> reproduction,
>>>>>>>>> or dissemination) by persons other than the intended
recipient(s)
>>> is
>>>>>>>>> prohibited. If you receive this e-mail in error, please
notify
>> the
>>>>>>> sender
>>>>>>>>> by phone or email immediately and delete it!
>>>>>>>>>
>>>>>>>>> -----Original Message-----
>>>>>>>>> From: Fabian Hueske [mailto:fhueske@gmail.com]
>>>>>>>>> Sent: Tuesday, February 14, 2017 1:01 AM
>>>>>>>>> To: dev@flink.apache.org
>>>>>>>>> Subject: [DISCUSS] Table API / SQL indicators for event
and
>>>>> processing
>>>>>>>> time
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> I'd like to start an discussion about how Table API /
SQL queries
>>>>>>>> indicate
>>>>>>>>> whether an operation is done in event or processing time.
>>>>>>>>>
>>>>>>>>> 1) Why do we need to indicate the time mode?
>>>>>>>>>
>>>>>>>>> We need to distinguish event time and processing time
mode for
>>>>>>> operations
>>>>>>>>> in queries in order to have the semantics of a query
fully
>> defined.
>>>>>>>>> This cannot be globally done in the TableEnvironment
because some
>>>>>>> queries
>>>>>>>>> explicitly request an expression such as the ORDER BY
clause of
>> an
>>>>> OVER
>>>>>>>>> window with PRECEDING / FOLLOWING clauses.
>>>>>>>>> So we need a way to specify something like the following
query:
>>>>>>>>>
>>>>>>>>> SELECT
>>>>>>>>>     a,
>>>>>>>>>     SUM(b) OVER (PARTITION BY c ORDER BY proctime ROWS
BETWEEN 2
>>>>>>> PRECEDING
>>>>>>>>> AND CURRENT ROW) AS sumB, FROM myStream
>>>>>>>>>
>>>>>>>>> where "proctime" indicates processing time. Equivalently
>> "rowtime"
>>>>>>> would
>>>>>>>>> indicate event time.
>>>>>>>>>
>>>>>>>>> 2) Current state
>>>>>>>>>
>>>>>>>>> The current master branch implements time support only
for
>> grouping
>>>>>>>>> windows in the Table API.
>>>>>>>>> Internally, the Table API converts a 'rowtime symbol
(which looks
>>>>> like
>>>>>>> a
>>>>>>>>> regular attribute) into a special expression which indicates
>>>>>>> event-time.
>>>>>>>>> For example:
>>>>>>>>>
>>>>>>>>> table
>>>>>>>>>     .window(Tumble over 5.milli on 'rowtime as 'w)
>>>>>>>>>     .groupBy('a, 'w)
>>>>>>>>>     .select(...)
>>>>>>>>>
>>>>>>>>> defines a tumbling event-time window.
>>>>>>>>>
>>>>>>>>> Processing-time is indicated by omitting a time attribute
>>>>>>>>> (table.window(Tumble over 5.milli as 'w) ).
>>>>>>>>>
>>>>>>>>> 3) How can we do that in SQL?
>>>>>>>>>
>>>>>>>>> In SQL we cannot add special expressions without touching
the
>>> parser
>>>>>>>> which
>>>>>>>>> we don't want to do because we want to stick to the SQL
standard.
>>>>>>>>> Therefore, I see only two options: adding system attributes
or
>>>>>>>>> (parameterless) built-in functions. I list some pros
and cons of
>>> the
>>>>>>>>> approaches below:
>>>>>>>>>
>>>>>>>>> 1. System Attributes:
>>>>>>>>> + most natural way to access a property of a record.
>>>>>>>>> + works with joins, because time attributes can be related
to
>>> tables
>>>>>>>>> - We need to ensure the attributes are not writable and
always
>>>>> present
>>>>>>> in
>>>>>>>>> streaming tables (i.e., they should be system defined
>> attributes).
>>>>>>>>> - Need to adapt existing Table API expressions (will
not change
>> the
>>>>> API
>>>>>>>>> but some parts of the internal translation)
>>>>>>>>> - Event time value must be set when the stream is converted,
>>>>> processing
>>>>>>>>> time is evaluated on the fly
>>>>>>>>>
>>>>>>>>> 2. Built-in Functions
>>>>>>>>> + Users could try to modify time attributes which is
not possible
>>>>> with
>>>>>>>>> functions
>>>>>>>>> - do not work with joins, because we need to address
different
>>>>>>> relations
>>>>>>>>> - not a natural way to access a property of a record
>>>>>>>>>
>>>>>>>>> I think the only viable choice are system attributes,
because
>>>>> built-in
>>>>>>>>> functions cannot be used for joins.
>>>>>>>>> However, system attributes are the more complex solution
because
>>> they
>>>>>>>> need
>>>>>>>>> a better integration with Calcite's SQL validator (preventing
>> user
>>>>>>>>> attributes which are named rowtime for instance).
>>>>>>>>>
>>>>>>>>> Since there are currently a several contributions on
the way
>> (such
>>> as
>>>>>>> SQL
>>>>>>>>> OVER windows FLINK-5653 to FLINK-5658) that need time
indicators,
>>> we
>>>>>>>> need a
>>>>>>>>> solution soon to be able to make progress.
>>>>>>>>> There are two PRs, #3252 and #3271, which implement the
built-in
>>>>> marker
>>>>>>>>> functions proctime() and rowtime() and which could serve
as a
>>>>> temporary
>>>>>>>>> solution (since we do not work on joins yet).
>>>>>>>>> I would like to suggest to use these functions as a starting
>> point
>>>>>>> (once
>>>>>>>>> the PRs are merged) and later change to the system attribute
>>> solution
>>>>>>>> which
>>>>>>>>> needs a bit more time to be implemented.
>>>>>>>>>
>>>>>>>>> I talked with Timo today about this issue and he said
he would
>> like
>>>>> to
>>>>>>>>> investigate how we can implement this as system functions
>> properly
>>>>>>>>> integrated with Calcite and the SQL Validator.
>>>>>>>>>
>>>>>>>>> What do others think?
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>


Mime
View raw message