flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: [DISCUSS] Table API / SQL internal timestamp handling
Date Mon, 31 Jul 2017 13:04:35 GMT
Hi Jark,

yes, the handling of watermarks is very tricky. It is not directly related
to the proposal which is only about the representation of timestamps but
becomes important for event-time joins.
We have a JIRA about an operator that is able to hold back watermarks [1].

Roughly the idea is to track the smallest timestamp that will be emitted in
the future and align the watermark to this timestamp.
For this we need to know the semantics of the operator (which timestamp
will be emitted in the future) but this will be given for relational
operators.
The new operator could emit a watermark whenever it received one.

In case of a join, the smallest future timestamp depends on two fields and
not just on one.

Best,
Fabian

[1] https://issues.apache.org/jira/browse/FLINK-7245


2017-07-31 14:35 GMT+02:00 Jark Wu <jark@apache.org>:

> Hi,
>
> @Fabian, I read your proposal carefully again, and I'm big +1 to do it. The
> proposal can address the problem of that how to forward both input tables'
> rowtime of dual stream join (windowed/non-windowed). The additional
> payload drawback
> is acceptable.
>
> You mentioned that:
>
> > The query operators ensure that the watermarks are always behind all
> > event-time timestamps. With additional analysis we will be able to
> restrict
> > this to timestamps that are actually used as such.
>
> I'm more curious about how can we define the watermark strategies in order
> to make sure all timestamp columns are aligned to watermarks. Especially,
> when the watermark has been defined in the input DataStream.
>
> Bests,
> Jark Wu
>
>
> 2017-07-27 23:13 GMT+08:00 Xingcan Cui <xingcanc@gmail.com>:
>
> > Hi all,
> >
> > Thanks for the answers, @Fabian.
> >
> > @Jark, at first I also wanted the users to reassign the timestamp field
> > arbitrarily. However, that means we have to break the current "time
> system"
> > and create a new one. The blocked watermarks become meaningless and
> maybe a
> > new WatermarkAssigner should be provided. A little more strict mechanism
> > would be only allowing to use the existing timestamp fields. It sounds
> > reasonable, but will bring an unnecessary barrier to stream/batch SQL,
> i.e.
> > some SQL works for the batch can not be executed in the stream
> environment.
> > I just wonder if we could automatically choose a field, which will be
> used
> > in the following calculations. Not sure if it makes sense.
> >
> > @Shaoxuan @Radu, I totally agree that the "proctime" is the main block
> for
> > consolidating stream/batch SQL. Though from a general point of view, it
> can
> > indicate the time to some extent, the randomness property determines that
> > it should never be used in time-sensitive applications. I always believe
> in
> > that all the information used for query evaluation should be acquired
> from
> > data itself.
> >
> > Best,
> > Xingcan
> >
> > On Thu, Jul 27, 2017 at 7:24 PM, Fabian Hueske <fhueske@gmail.com>
> wrote:
> >
> > > Hi Shaoxuan,
> > >
> > > thanks for your comments. I agree with your comment:
> > >
> > > > The problem we used to have is that we have treated eventtime column
> > as a
> > > special timestamp column.
> > >
> > > IMO, an event-time timestamp column is a regular column that is aligned
> > > with the watermarks of the stream.
> > > In order to distinguish watermark aligned columns from others, we need
> a
> > > special flag in the schema.
> > > When a timestamp column is modified and we cannot guarantee that is it
> > > still aligned with the watermarks, it must lose the special flag and be
> > > treated like any other column.
> > >
> > > Regarding your comments:
> > > 1) I agree, that we can use Long in addition to Timestamp as a
> timestamp
> > > columns. Since timestamp columns need to be comparable to watermarks
> > which
> > > are Longs, I don't see that other types would make sense. For now, I
> > would
> > > keep the restriction that timestamps can only be of Timestamp type. I
> > > think, extending this to Long would be a follow-up issue to the
> changes I
> > > proposed here.
> > > 2) Relates to 1) and I agree. if we use a Long attribute as timestamp
> it
> > > should remain of type Long. For now I would keep converting it to
> > Timestamp
> > > and change that later.
> > > 3) Yes, timestamp columns must be aligned to watermarks. That's their
> > > primary characteristic. How to define watermark strategies is
> orthogonal
> > to
> > > this discussion, IMO.
> > > 4) From my point of view, proc-time is a purely virtual column and not
> > > related to an actual (data) column. However, it must be part of the
> > schema
> > > and treated like any other attribute for a good user experience and SQL
> > > compliance. In order to be able to join two tables on processing time,
> it
> > > must be possible to include a processing time column in the schema
> > > definition of the table. Processing time queries can never compute the
> > same
> > > results as batch queries but their semantics should be aligned with
> > > event-time queries.
> > >
> > > Best, Fabian
> > >
> > > 2017-07-27 9:47 GMT+02:00 Radu Tudoran <radu.tudoran@huawei.com>:
> > >
> > > > Hi all,
> > > >
> > > > @Shaoxuan - thanks for the  remarks. I have a question regarding your
> > > > suggestion not to consider to create proctime window in a regular
> > > column. I
> > > > think this would be useful though. First you might need to carry the
> > > > timestamp indicator of when the processing happened (for log
> purposes,
> > > > provenance, traceability ...). Secondly - I do not think it is
> > > > contradicting with the semantics in batch SQL as in SQL you have the
> > > > function "now()" ...which pretty much carry the same semantics as
> > having
> > > a
> > > > function to mark the proctime and then projecting this into a column.
> > If
> > > I
> > > > am not mistaken you can introduce in database columns the result of
> > > calling
> > > > now().
> > > >
> > > >
> > > > Dr. Radu Tudoran
> > > > Staff Research Engineer - Big Data Expert
> > > > IT R&D Division
> > > >
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > German Research Center
> > > > Munich Office
> > > > Riesstrasse 25, 80992 München
> > > >
> > > > E-mail: radu.tudoran@huawei.com
> > > > Mobile: +49 15209084330
> > > > Telephone: +49 891588344173
> > > >
> > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > > > This e-mail and its attachments contain confidential information from
> > > > HUAWEI, which is intended only for the person or entity whose address
> > is
> > > > listed above. Any use of the information contained herein in any way
> > > > (including, but not limited to, total or partial disclosure,
> > > reproduction,
> > > > or dissemination) by persons other than the intended recipient(s) is
> > > > prohibited. If you receive this e-mail in error, please notify the
> > sender
> > > > by phone or email immediately and delete it!
> > > >
> > > >
> > > > -----Original Message-----
> > > > From: Shaoxuan Wang [mailto:shaoxuan@apache.org]
> > > > Sent: Thursday, July 27, 2017 6:00 AM
> > > > To: Dev
> > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp handling
> > > >
> > > >  Hi Everyone,
> > > > I like this proposal. The problem we used to have is that we have
> > treated
> > > > eventtime column as a special timestamp column. An eventtime column
> is
> > > > nothing special than all other regular columns, but with a certain
> flag
> > > > (eventtime-indicator) inferring that this column can be used as an
> > > eventime
> > > > to decide when a bounded query can emit the final result by comparing
> > > with
> > > > a concern associated waterMark.
> > > >
> > > > I have a few comments adding on top of this (they may have already
> been
> > > > addressed in the conversation — since It’s a long discussion, I may
> > miss
> > > > something):
> > > >
> > > >    1. While we remove timestamp column, we introduce
> > eventtime-indicator
> > > >    (we may already have this concept), it is only a flag can be
> applied
> > > for
> > > >    any column (note that some types may not be able to be used as
> > > eventtime
> > > >    column), indicating if this column can be used as eventtime or
> not.
> > > This
> > > >    flag is useful for validation and codeGen.
> > > >    2. A column that has been used as an eventtime, should not lose
> its
> > > own
> > > >    type. We should not cast all eventime column to the timestamp
> type.
> > > For
> > > >    instance, if a column is a long type, it will keep as long type
> even
> > > if
> > > > a
> > > >    window aggregate has used it as a eventtime.
> > > >    3. Eventtime will only work well with some associated waterMark
> > > >    strategy. We may consider forcing user to provide a waterMark
> logic
> > on
> > > >    his/her selected eventtime.
> > > >    4. For proctime, I hope we should not introduce proctime-indicator
> > for
> > > >    regular column. Ideally we should not allow user to create
> proctime
> > > > window
> > > >    on regular column, as this is against the batch query semantics.
> > > > Therefore
> > > >    I suggest we should always introduce a proctime timestamp column
> for
> > > > users
> > > >    to create proctime window. And unlike eventtime, proctime does not
> > > need
> > > > any
> > > >    associated waterMark strategy, as there is no such out of order
> > issue
> > > > for
> > > >    the proctime.
> > > >
> > > > Regards,
> > > > Shaoxuan
> > > >
> > > > On Wed, Jul 26, 2017 at 9:10 PM, Fabian Hueske <fhueske@gmail.com>
> > > wrote:
> > > >
> > > > > Thanks everybody for the replies so far.
> > > > >
> > > > > Let me answer your questions and reply to your thoughts:
> > > > >
> > > > > Radu:
> > > > > ---
> > > > > First of all, although my proposal is movivated by a join operator,
> > > > > this discussion is about timestamp handling, not about joins in
> > > general.
> > > > >
> > > > > - The semantics of outer joins is to emit null and there is no way
> > > > > around that. This is not an issue for us. Actually, outer joins are
> > > > > supported by the batch SQL / Table API. It is true that outer joins
> > > > > might result in null timestamps. Calcite will mark those fields as
> > > > > nullable and we should check that timestamps which are used in
> > windows
> > > > or joins are not nullable.
> > > > > - The query has to explicitly specify which timestamp attribute to
> > use.
> > > > > Otherwise its semantics are not complete and it is invalid. A
> > > > > group-window that follows a join will reference a timestamp
> attribute
> > > > > and this will be used. The other timestamp might be projected out.
> > > > > When a result with two timestamps is converted into a DataStream,
> the
> > > > > user has to decide. This could be done inside of the Table to
> > > > > DataStream conversion. If the Table has more than one valid
> > timestamp,
> > > > > the conversion will ask which timestamp to forward.
> > > > > - A proctime join should forward all proctime attributes of the
> input
> > > > > tables. All will be the same, but that does not matter because they
> > > > > are either virtual or represented as 1 byte dummy attributes. Also,
> > > > > unused ones will be automatically projected out anyway.
> > > > > - An event-time join should forward all event-time attributes of
> the
> > > > > input tables. Creating a new event-time attribute using processing
> > > > > time makes event-time processing pointless and will give completely
> > > > random results.
> > > > > Event-time is not about the "time an event is created" but about a
> > > > > timestamp that is associated with an event. For example an order
> > event
> > > > > could have three timestamps: "orderTime", "shipTime", and
> > > "receiveTime".
> > > > > Each could be a valid event-time attribute.
> > > > >
> > > > > Jark:
> > > > > ---
> > > > > Thanks for the proposal. I think I understand what you want to
> > achieve
> > > > > with this, but I think functions to instantiate time attributes are
> > > > > not necessary and would make things more complicated. The point of
> > > > > supporting multiple time attributes is to ensure that all of them
> are
> > > > > aligned with the watermarks. If we add a method ROW_TIME(timestamp)
> > > > > and we don't know if the timestamp is aligned with the watermarks.
> If
> > > > > that is not the case, the query won't be executed as expected. The
> > > > > issue of LEFT JOIN can easily be addressed by checking for
> > > > > nullablility during optimization when an operator tries to use it.
> > > > >
> > > > > The beauty of supporting multiple timestamps is that a user does
> not
> > > > > have to care at all about timestamps (or timestamp functions) and
> > > > > watermarks. As long as the query uses a timestamp attribute that
> was
> > > > > originally declared as rowtime in a source table (and was not
> > modified
> > > > > afterwards), this is fine. Think of a cascade of three windowed
> > joins:
> > > > > R - S - T - U, and you want to join S - T first. In that case, you
> > > > > need to preserve the timestamps of S and T in order to join R and
> U.
> > > > > From a relational algebra point of view, there is no reason to
> have a
> > > > > limitation on how these attributes are accessed. Timestamps are
> just
> > > > > regular fields of a record. The only restriction in the context of
> > > > > stream processing is that the watermark must be aligned with
> > > > > timestamps, i.e., follow all timestamps such that data is not late
> > > > > according to any of the timestamps. This we can achieve and handle
> > > > internally without the user having to worry about it.
> > > > >
> > > > > Xingcan:
> > > > > ---
> > > > > I think your questions are mostly implementation details and not so
> > > > > much related to the original proposal of supporting multiple
> > > timestamps.
> > > > >
> > > > > My take on your questions is:
> > > > > 1. The rate at which watermarks are emitted is not important for
> the
> > > > > correctness of a query. However, it can affect the performance,
> > > > > because each watermark is sent as a special record and it is
> > > > > broadcasted. My initial take would be to emit a new watermark
> > whenever
> > > > > the operator updated its watermark because usually, the operator
> > would
> > > > > have forwarded the old watermark.
> > > > > 2. I would say this is the responsibility of the operator because
> > > > > first it is not related to the semantics of the query and second it
> > is
> > > > > an operator responsibility in the existing code as well.
> > > > >
> > > > > Jark 2:
> > > > > You are right, the query (or user) must decide on the event-time
> > > > > attribute to use. My main point is, it is much easier for the user
> > > > > (and for us
> > > > > internally) if we internally track multiple timestamps. Because we
> do
> > > > > not have to prune the timestamp that will not be later used into
> the
> > > > join.
> > > > > Moreover, both timestamps might be used later (see join example,
> > which
> > > > > could be reordered of course). All we have to do is to ensure that
> > all
> > > > > timestamps are aligned with the watermarks.
> > > > >
> > > > > Radu 2:
> > > > > IMO, time (or anything else that affects the semantics) should
> never
> > > > > be decided by the system. When we would do that, a query is not
> fully
> > > > > specified or, even worse, the way it is executed is semantically
> > > > > incorrect and produces arbitrary results.
> > > > >
> > > > > Time attributes should be specified in the source tables and then
> > > > > forwarded from there. So far I haven't seen an example where this
> > > > > would not be possible (within the semantics or relational queries).
> > If
> > > > > we do that right, there won't be a need for explicit time
> management
> > > > > except for the definition of the initial timestamps which can be
> > > > > hidden in the table definition. As I said before, we (or the
> system)
> > > > > cannot decide on the timestamp because that would lead to arbitrary
> > > > > results. Asking the user to do that would mean explicit time
> > > > > management which is also not desirable. I think my proposal gives
> > > > > users all options (timestamps) to chose from and the system can do
> > the
> > > > rest.
> > > > >
> > > > > Best, Fabian
> > > > >
> > > > > 2017-07-26 10:46 GMT+02:00 Radu Tudoran <radu.tudoran@huawei.com>:
> > > > >
> > > > > > Hi everyone,
> > > > > >
> > > > > > I just want to add that I was referring to NULL values not
> > > > > > specifically
> > > > > to
> > > > > > timefields but to the event itself. If you have the follow
> > situation
> > > > > >
> > > > > > Stream 1:     .... |    event1   | ....
> > > > > > Stream 2:     .... |             | ....
> > > > > >
> > > > > > And you have a LEFT JOIN between stream 1 and stream 2 (no
> > > > > > condition)...then you still need to emit (event1,null) ... as
> this
> > > > > > is the behavior of left join. This is maybe a very simple
> > situation,
> > > > > > but the
> > > > > point
> > > > > > is that left joins and right joins can have situation when you
> have
> > > > > > elements only in the main stream and no element in the right
> > stream.
> > > > > > And for this case you still need to emit.
> > > > > >
> > > > > >
> > > > > > Regarding whether time should be decided by system or not...i
> think
> > > > > > the answer is it depends. I think the example from Jack is very
> > good
> > > > > > and
> > > > > shows
> > > > > > the need for some mechanisms to select/manage the time (I like
> the
> > > > > proposal
> > > > > > of having functions to insert the time in the output!). However,
> if
> > > > > > a business analyst would write a query without explicit time
> > > > > > management we still need to have some default behavior in the
> > > > > > system. As per my initial proposal, I think  we need to decide on
> > > > > > one timestamp field to carry (either a new one at the moment of
> the
> > > > > > join) or the timestamp from the
> > > > > main
> > > > > > stream  (...although I am not sure which one is the main stream
> in
> > > > > > the
> > > > > case
> > > > > > of a full join:) )
> > > > > >
> > > > > >
> > > > > > Dr. Radu Tudoran
> > > > > > Staff Research Engineer - Big Data Expert IT R&D Division
> > > > > >
> > > > > >
> > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > German Research Center
> > > > > > Munich Office
> > > > > > Riesstrasse 25, 80992 München
> > > > > >
> > > > > > E-mail: radu.tudoran@huawei.com
> > > > > > Mobile: +49 15209084330
> > > > > > Telephone: +49 891588344173
> > > > > >
> > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB
> > 56063,
> > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB
> > 56063,
> > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > This e-mail and its attachments contain confidential information
> > from
> > > > > > HUAWEI, which is intended only for the person or entity whose
> > address
> > > > is
> > > > > > listed above. Any use of the information contained herein in any
> > way
> > > > > > (including, but not limited to, total or partial disclosure,
> > > > > reproduction,
> > > > > > or dissemination) by persons other than the intended recipient(s)
> > is
> > > > > > prohibited. If you receive this e-mail in error, please notify
> the
> > > > sender
> > > > > > by phone or email immediately and delete it!
> > > > > >
> > > > > > -----Original Message-----
> > > > > > From: Jark Wu [mailto:jark@apache.org]
> > > > > > Sent: Wednesday, July 26, 2017 8:29 AM
> > > > > > To: dev@flink.apache.org
> > > > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp
> handling
> > > > > >
> > > > > > Hi Xingcan,
> > > > > >
> > > > > > IMO, I don't think event-time of join results could be
> > automatically
> > > > > > decided by system. Considering batch tables, if users want a
> event
> > > time
> > > > > > window aggregation after join, user must specify the time field
> > > > > explicitly
> > > > > > (T1.rowtime or T2.rowtime or the computed result of them). So in
> > the
> > > > case
> > > > > > of streaming tables, the system also can't automatically decide
> the
> > > > time
> > > > > > field for users.
> > > > > >
> > > > > > In regards to the question you asked, I think we don't need to
> > change
> > > > the
> > > > > > watermark no matter we choose the left rowtime or right rowtime
> or
> > > the
> > > > > > combination. Because the watermark has been aligned with the
> > rowtime
> > > in
> > > > > the
> > > > > > source. Maybe I'm wrong about this, please correct me if I'm
> > missing
> > > > > > something.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Regards,
> > > > > > Jark
> > > > > >
> > > > > > 2017-07-26 11:24 GMT+08:00 Xingcan Cui <xingcanc@gmail.com>:
> > > > > >
> > > > > > > Hi all,
> > > > > > >
> > > > > > > @Fabian, thanks for raising this.
> > > > > > >
> > > > > > > @Radu and Jark, personally I think the timestamp field is
> > critical
> > > > for
> > > > > > > query processing and thus should be declared as (or supposed to
> > be)
> > > > > > > NOT NULL. In addition, I think the event-time semantic of the
> > join
> > > > > > > results should be automatically decided by the system, i.e., we
> > do
> > > > not
> > > > > > > hand it over to users so to avoid some unpredictable
> assignment.
> > > > > > >
> > > > > > > Generally speaking, consolidating different time fields is
> > possible
> > > > > > > since all of them should ideally be monotonically increasing.
> > From
> > > my
> > > > > > > point of view, the problem lies in
> > > > > > > (1) what's the relationship between the old and new watermarks.
> > > Shall
> > > > > > > they be one-to-one mapping or the new watermarks could skip
> some
> > > > > > > timestamps? And (2) who is in charge of emitting the blocked
> > > > > > > watermarks, the operator or the process function?
> > > > > > >
> > > > > > > I'd like to hear from you.
> > > > > > >
> > > > > > > Best,
> > > > > > > Xingcan
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Wed, Jul 26, 2017 at 10:40 AM, Jark Wu <jark@apache.org>
> > wrote:
> > > > > > >
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > Radu's concerns make sense to me, especially the null value
> > > > > > > > timestamp and multi-proctime.
> > > > > > > >
> > > > > > > > I have also something in my mind. I would like to propose
> some
> > > time
> > > > > > > > indicator built-in functions, e.g. ROW_TIME(Timestamp ts)
> will
> > > > > > > > generate a event time logical attribute, PROC_TIME() will
> > > generate
> > > > a
> > > > > > > > processing time logical attribute. It is similar to
> > > TUMBLE_ROWTIME
> > > > > > > > proposed in this PR https://github.com/apache/
> flink/pull/4199.
> > > > These
> > > > > > > > can be used in any queries, but there still can't be more
> than
> > > one
> > > > > > > > rowtime attribute or more than one proctime attribute in a
> > table
> > > > > > schema.
> > > > > > > >
> > > > > > > > The both selected timestamp fields from a JOIN query will be
> > > > > > > materialized.
> > > > > > > > If someone needs further down the computation based on the
> > event
> > > > > > > > time,
> > > > > > > they
> > > > > > > > need to create a new time attribute using the ROW_TIME(...)
> > > > > > > > function. And this can also solve the null timestamp problem
> in
> > > > LEFT
> > > > > > > > JOIN, because we
> > > > > > > can
> > > > > > > > use a user defined function to combine the two rowtimes and
> > make
> > > > the
> > > > > > > result
> > > > > > > > as the event time attribute, e.g. SELECT
> > ROW_TIME(udf(T1.rowtime,
> > > > > > > > T2.rowtime)) as rowtime FROM T1 JOIN T2 ...
> > > > > > > >
> > > > > > > >
> > > > > > > > What do you think?
> > > > > > > >
> > > > > > > >
> > > > > > > > 2017-07-25 23:48 GMT+08:00 Radu Tudoran <
> > radu.tudoran@huawei.com
> > > >:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > I think this is an interesting discussion and I would like
> to
> > > add
> > > > > > > > > some issues and give some feedback.
> > > > > > > > >
> > > > > > > > > - For supporting the join we do not only need to think of
> the
> > > > time
> > > > > > > > > but also on the null values. For example if you have a LEFT
> > (or
> > > > > > > > > RIGHT) JOIN between items of 2 input streams, and the
> > secondary
> > > > > > > > > input is not
> > > > > > > > available
> > > > > > > > > you should still emit Row.of(event1, null)...as far as I
> know
> > > if
> > > > > > > > > you
> > > > > > > need
> > > > > > > > > to serialize/deserialize null values to send them they do
> not
> > > > > > > > > work. So
> > > > > > > we
> > > > > > > > > should include this scenario in the discussions -If we will
> > > have
> > > > > > > > > multiple timestamp in an (output) event, one question
> > > > > > > is
> > > > > > > > > how to select afterwards which is the primary time field on
> > > which
> > > > > > > > > to operate. When we describe a query we might be able to
> > > specify
> > > > > > > > > (or we
> > > > > > > get
> > > > > > > > > this implicitly if we implement the carryon of the 2
> > > timestamps)
> > > > > > > Select
> > > > > > > > > T1.rowtime, T2.rowtime ...but if the output of a query is
> the
> > > > > > > > > input of
> > > > > > > a
> > > > > > > > > new processing pipeline, then, do we support generally also
> > > that
> > > > > > > > > the
> > > > > > > > input
> > > > > > > > > has 2 time fields? ...how do we deal with the 2 input
> fields
> > > > > > > > > (maybe I
> > > > > > > am
> > > > > > > > > missing something) further in the datastream pipeline that
> we
> > > > > > > > > build
> > > > > > > based
> > > > > > > > > on the output?
> > > > > > > > > - For the case of proctime - do we need to carry 2
> proctimes
> > > (the
> > > > > > > > > proctimes of the incoming events from each stream), or 1
> > > proctime
> > > > > > > > > (as
> > > > > > > we
> > > > > > > > > operate on proctime and the combination of the 2 inputs can
> > be
> > > > > > > considered
> > > > > > > > > as a new event, the current proctime on the machine can be
> > > > > > > > > considered
> > > > > > > the
> > > > > > > > > (proc)time reference for output event) or 3 proctimes (the
> 2
> > > > > > > > > proctimes
> > > > > > > of
> > > > > > > > > the input plus the proctime when the new event was
> created)?
> > > > > > > > > -Similar with the point above, for even time (which I am
> > > > > > > > > understanding
> > > > > > > as
> > > > > > > > > the time when the event was created...or do we understand
> > them
> > > as
> > > > > > > > > a
> > > > > > > time
> > > > > > > > > carry within the event?) - when we join 2 events and output
> > an
> > > > > > > > > event
> > > > > > > that
> > > > > > > > > is the result of the join - isn't this a new event detach
> > from
> > > > the
> > > > > > > > > source\input events? ... I would tend to say it is a new
> > event
> > > > and
> > > > > > > > > then
> > > > > > > > as
> > > > > > > > > for proctime the event time of the new event is the current
> > > time
> > > > > > > > > when
> > > > > > > > this
> > > > > > > > > output event was created. If we would accept this
> hypothesis
> > > then
> > > > > > > > > we
> > > > > > > > would
> > > > > > > > > not need the 2 time input fields to be carried/managed
> > > > implicitly.
> > > > > > > > > If someone needs further down the computation pipeline,
> then
> > in
> > > > > > > > > the query
> > > > > > > > they
> > > > > > > > > would be selected explicitly from the input stream and
> > > projected
> > > > > > > > > in
> > > > > > > some
> > > > > > > > > fields to be carried (Select T1.rowtime as FormerTime1,
> > > > T2.rowtime
> > > > > > > > > as FormerTime2, .... JOIN T1, T2...)...but they would not
> > have
> > > > the
> > > > > > > timestamp
> > > > > > > > > logic
> > > > > > > > >
> > > > > > > > > ..my 2 cents
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Dr. Radu Tudoran
> > > > > > > > > Staff Research Engineer - Big Data Expert IT R&D Division
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > > > German Research Center
> > > > > > > > > Munich Office
> > > > > > > > > Riesstrasse 25, 80992 München
> > > > > > > > >
> > > > > > > > > E-mail: radu.tudoran@huawei.com
> > > > > > > > > Mobile: +49 15209084330
> > > > > > > > > Telephone: +49 891588344173
> > > > > > > > >
> > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > > > > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf,
> HRB
> > > > > 56063,
> > > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
> > HRB
> > > > > 56063,
> > > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > > > > > > > > This e-mail and its attachments contain confidential
> > > information
> > > > > from
> > > > > > > > > HUAWEI, which is intended only for the person or entity
> whose
> > > > > address
> > > > > > > is
> > > > > > > > > listed above. Any use of the information contained herein
> in
> > > any
> > > > > way
> > > > > > > > > (including, but not limited to, total or partial
> disclosure,
> > > > > > > > reproduction,
> > > > > > > > > or dissemination) by persons other than the intended
> > > recipient(s)
> > > > > is
> > > > > > > > > prohibited. If you receive this e-mail in error, please
> > notify
> > > > the
> > > > > > > sender
> > > > > > > > > by phone or email immediately and delete it!
> > > > > > > > >
> > > > > > > > > -----Original Message-----
> > > > > > > > > From: Fabian Hueske [mailto:fhueske@gmail.com]
> > > > > > > > > Sent: Tuesday, July 25, 2017 4:22 PM
> > > > > > > > > To: dev@flink.apache.org
> > > > > > > > > Subject: [DISCUSS] Table API / SQL internal timestamp
> > handling
> > > > > > > > >
> > > > > > > > > Hi everybody,
> > > > > > > > >
> > > > > > > > > I'd like to propose and discuss some changes in the way how
> > the
> > > > > Table
> > > > > > > API
> > > > > > > > > / SQL internally handles timestamps.
> > > > > > > > >
> > > > > > > > > The Table API is implemented on top of the DataStream API.
> > The
> > > > > > > DataStream
> > > > > > > > > API hides timestamps from users in order to ensure that
> > > > timestamps
> > > > > > and
> > > > > > > > > watermarks are aligned. Instead users assign timestamps and
> > > > > > watermarks
> > > > > > > > once
> > > > > > > > > (usually at the source or in a subsequent operator) and let
> > the
> > > > > > system
> > > > > > > > > handle the timestamps from there on. Timestamps are stored
> in
> > > the
> > > > > > > > timestamp
> > > > > > > > > field of the StreamRecord which is a holder for the user
> > record
> > > > and
> > > > > > the
> > > > > > > > > timestamp. DataStream operators that depend on time
> > > > (time-windows,
> > > > > > > > process
> > > > > > > > > function, ...) access the timestamp from the StreamRecord.
> > > > > > > > >
> > > > > > > > > In contrast to the DataSteam API, the Table API and SQL are
> > > aware
> > > > > of
> > > > > > > the
> > > > > > > > > semantics of a query. I.e., we can analyze how users access
> > > > > > timestamps
> > > > > > > > and
> > > > > > > > > whether they are modified or not. Another difference is
> that
> > > the
> > > > > > > > timestamp
> > > > > > > > > must be part of the schema of a table in order to have
> > correct
> > > > > query
> > > > > > > > > semantics.
> > > > > > > > >
> > > > > > > > > The current design to handle timestamps is as follows. The
> > > Table
> > > > > API
> > > > > > > > > stores timestamps in the timestamp field of the
> StreamRecord.
> > > > > > > Therefore,
> > > > > > > > > timestamps are detached from the remaining data which is
> > stored
> > > > in
> > > > > > Row
> > > > > > > > > objects. Hence, the physical representation of a row is
> > > different
> > > > > > from
> > > > > > > > its
> > > > > > > > > logical representation. We introduced a translation layer
> > > > > (RowSchema)
> > > > > > > to
> > > > > > > > > convert logical schema into physical schema. This is
> > necessery
> > > > for
> > > > > > > > > serialization or code generation when the logical plan is
> > > > > translated
> > > > > > > > into a
> > > > > > > > > physical execution plan. Processing-time timestamps are
> > > similarly
> > > > > > > > handled.
> > > > > > > > > They are not included in the physical schema and looked up
> > when
> > > > > > needed.
> > > > > > > > > This design also requires that we need to materialize
> > > timestamps
> > > > > when
> > > > > > > > they
> > > > > > > > > are accessed by expressions. Timestamp materialization is
> > done
> > > > as a
> > > > > > > > > pre-optimization step.
> > > > > > > > >
> > > > > > > > > While thinking about the implementation of the event-time
> > > > windowed
> > > > > > > > > stream-stream join [1] I stumbled over the question which
> > > > timestamp
> > > > > > of
> > > > > > > > both
> > > > > > > > > input tables to forward. With the current design, we could
> > only
> > > > > have
> > > > > > a
> > > > > > > > > single timestamp, so keeping both timestamps would not be
> > > > possible.
> > > > > > The
> > > > > > > > > choice of the timestamp would need to be specified by the
> > query
> > > > > > > otherwise
> > > > > > > > > it would lack clear semantics. When executing the join, the
> > > join
> > > > > > > operator
> > > > > > > > > would need to make sure that no late data is emitted. This
> > > would
> > > > > only
> > > > > > > > work
> > > > > > > > > the operator was able to hold back watermarks [2].
> > > > > > > > >
> > > > > > > > > With this information in mind, I'd like to discuss the
> > > following
> > > > > > > > proposal:
> > > > > > > > >
> > > > > > > > > - We allow more than one event-time timestamp and store
> them
> > > > > directly
> > > > > > > in
> > > > > > > > > the Row
> > > > > > > > > - The query operators ensure that the watermarks are always
> > > > behind
> > > > > > all
> > > > > > > > > event-time timestamps. With additional analysis we will be
> > able
> > > > to
> > > > > > > > restrict
> > > > > > > > > this to timestamps that are actually used as such.
> > > > > > > > > - When a DataStream operator is time-based (e.g., a
> > DataStream
> > > > > > > > > time-windows), we inject an operator that copies the
> > timestamp
> > > > from
> > > > > > the
> > > > > > > > Row
> > > > > > > > > into the StreamRecord.
> > > > > > > > > - We try to remove the distinction between logical and
> > physical
> > > > > > schema.
> > > > > > > > > For event-time timestamps this is because we store them in
> > the
> > > > Row
> > > > > > > > object,
> > > > > > > > > for processing-time timestamps, we add a dummy byte field.
> > When
> > > > > > > > accessing a
> > > > > > > > > field of this type, the code generator injects the code to
> > > fetch
> > > > > the
> > > > > > > > > timestamps.
> > > > > > > > > - We might be able to get around the pre-optimization time
> > > > > > > > materialization
> > > > > > > > > step.
> > > > > > > > > - A join result would be able to keep both timestamps. The
> > > > > watermark
> > > > > > > > would
> > > > > > > > > be hold back for both so both could be used in subsequent
> > > > > operations.
> > > > > > > > >
> > > > > > > > > I admit, I haven't thought this completely through.
> > > > > > > > > However, the benefits of this design from my point of view
> > are:
> > > > > > > > > - encoding of timestamps in Rows means that the logical
> > schema
> > > is
> > > > > > equal
> > > > > > > > to
> > > > > > > > > the physical schema
> > > > > > > > > - no timestamp materialization
> > > > > > > > > - support for multiple timestamps. Otherwise we would need
> to
> > > > > expose
> > > > > > > > > internal restrictions to the user which are hard to
> explain /
> > > > > > > > communicate.
> > > > > > > > > - no need to change any public interfaces at the moment.
> > > > > > > > >
> > > > > > > > > The drawbacks as far as I see them are:
> > > > > > > > > - additional payload due to unused timestamp field +
> possibly
> > > the
> > > > > > > > > processing-time dummy field
> > > > > > > > > - complete rework of the internal timestamp logic
> (again...)
> > > > > > > > >
> > > > > > > > > Please let me know what you think,
> > > > > > > > > Fabian
> > > > > > > > >
> > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-6233
> > > > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7245
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message