flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: [DISCUSS] Table API / SQL internal timestamp handling
Date Tue, 01 Aug 2017 14:33:30 GMT
Hi everybody,

I created FLINK-7337 and will work on this.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-7337

2017-08-01 11:36 GMT+02:00 Fabian Hueske <fhueske@gmail.com>:

> Hi,
>
> I think the implementation of the join operator should not depend on the
> synchronization of the watermarks.
> If we need to buffer a stream because we have to wait for future records
> from the other stream to join them, then that's how it is. We cannot change
> the semantics of the query.
> It might be possible to apply backpressure to the source to delay one
> stream, but this is a very delicate thing to do and can easily lead to
> deadlocks and can have unpredictable side effects. So I would just consume
> the stream and put it into the state.
>
> From my point of view, timestamps cannot be "irreconcilable" because the
> query semantics and input streams are given.
> A join will always need to buffer some records. It is true that the join
> operator might need to buffer a lot of records if the input streams and
> join predicate are not aligned.
> IMO, This is fine as long as the query has enough resources. Once it runs
> out of resources, it simply fails and can be restarted with more resources
> or adapted.
>
> Cheers, Fabian
>
>
> 2017-08-01 6:04 GMT+02:00 Xingcan Cui <xingcanc@gmail.com>:
>
>> Hi Shaoxuan,
>>
>> I really appreciate your prompt reply. What you explained makes sense to
>> me.
>>
>> There is only one point that I got some different ideas about "we have to
>> buffer
>> all the delta data between watermarks of two inputs".
>>
>> Consider the following SQL on joining two streams l and r:
>>
>> SELECT * FROM l, r
>> WHERE l.name = r.name
>> AND l.ts BETWEEN r.ts - INTERVAL '600' MINUTE
>>     AND r.ts - INTERVAL '599' MINUTE;
>>
>> This query is valid since it holds both an equi-key and a time span
>> restriction.
>>
>> There are two different situations to execute the query: (1) if the
>> timestamps of
>> l and r are synchronized, e.g., they both contain new generated events, we
>> must
>> buffer the l stream for 600 minutes; and (2) if there exists a natural
>> offset of the two
>> streams, e.g., the r stream is new generated while the l stream is sourced
>> from
>> a event queue generated 10 hours ago, it is unnecessary to buffer so much
>> data.
>>
>> That raises the question. What if the timestamps of the two streams are
>> essentially
>> “irreconcilable"?
>>
>> Best,
>> Xingcan
>>
>> On Mon, Jul 31, 2017 at 10:42 PM, Shaoxuan Wang <wshaoxuan@gmail.com>
>> wrote:
>>
>> > Xingcan,
>> > Watermark is the “estimate of completion”. User defines the waterMark
>> based
>> > on the best estimation per each input of when it pretty much sees all
>> the
>> > data. It is usually calculated by the event timestamp.
>> > When we do a windowed join, we have to make sure the watermark for both
>> > inputs are received before emit a window result at this watermark. If
>> the
>> > two inputs have large difference, say "one for today and the other one
>> > for yesterday" as you pointed out, the watermark for the windowed join
>> > operator is just yesterday.  I guess this is what Fabian means "In case
>> of
>> > a join, the smallest future timestamp depends on two fields and not
>> just on
>> > one." In the windowed join cases, we have to buffer all the delta data
>> > between watermarks of two inputs. It is the user's responsibility (if
>> > she/he wants to reduce the cost) to align watermarks of the stream
>> sources
>> > as much as possible.
>> >
>> > Regards,
>> > Shaoxuan
>> >
>> >
>> > On Mon, Jul 31, 2017 at 10:09 PM, Xingcan Cui <xingcanc@gmail.com>
>> wrote:
>> >
>> > > Hi Fabian,
>> > >
>> > > I got a similar question with Jark. Theoretically, the row times of
>> two
>> > > streams
>> > > could be quite difference, e.g., one for today and the other one for
>> > > yesterday.
>> > > How can we align them?
>> > >
>> > > Best,
>> > > Xingcan
>> > >
>> > > On Mon, Jul 31, 2017 at 9:04 PM, Fabian Hueske <fhueske@gmail.com>
>> > wrote:
>> > >
>> > > > Hi Jark,
>> > > >
>> > > > yes, the handling of watermarks is very tricky. It is not directly
>> > > related
>> > > > to the proposal which is only about the representation of timestamps
>> > but
>> > > > becomes important for event-time joins.
>> > > > We have a JIRA about an operator that is able to hold back
>> watermarks
>> > > [1].
>> > > >
>> > > > Roughly the idea is to track the smallest timestamp that will be
>> > emitted
>> > > in
>> > > > the future and align the watermark to this timestamp.
>> > > > For this we need to know the semantics of the operator (which
>> timestamp
>> > > > will be emitted in the future) but this will be given for relational
>> > > > operators.
>> > > > The new operator could emit a watermark whenever it received one.
>> > > >
>> > > > In case of a join, the smallest future timestamp depends on two
>> fields
>> > > and
>> > > > not just on one.
>> > > >
>> > > > Best,
>> > > > Fabian
>> > > >
>> > > > [1] https://issues.apache.org/jira/browse/FLINK-7245
>> > > >
>> > > >
>> > > > 2017-07-31 14:35 GMT+02:00 Jark Wu <jark@apache.org>:
>> > > >
>> > > > > Hi,
>> > > > >
>> > > > > @Fabian, I read your proposal carefully again, and I'm big +1 to
>> do
>> > it.
>> > > > The
>> > > > > proposal can address the problem of that how to forward both input
>> > > > tables'
>> > > > > rowtime of dual stream join (windowed/non-windowed). The
>> additional
>> > > > > payload drawback
>> > > > > is acceptable.
>> > > > >
>> > > > > You mentioned that:
>> > > > >
>> > > > > > The query operators ensure that the watermarks are always behind
>> > all
>> > > > > > event-time timestamps. With additional analysis we will be able
>> to
>> > > > > restrict
>> > > > > > this to timestamps that are actually used as such.
>> > > > >
>> > > > > I'm more curious about how can we define the watermark strategies
>> in
>> > > > order
>> > > > > to make sure all timestamp columns are aligned to watermarks.
>> > > Especially,
>> > > > > when the watermark has been defined in the input DataStream.
>> > > > >
>> > > > > Bests,
>> > > > > Jark Wu
>> > > > >
>> > > > >
>> > > > > 2017-07-27 23:13 GMT+08:00 Xingcan Cui <xingcanc@gmail.com>:
>> > > > >
>> > > > > > Hi all,
>> > > > > >
>> > > > > > Thanks for the answers, @Fabian.
>> > > > > >
>> > > > > > @Jark, at first I also wanted the users to reassign the
>> timestamp
>> > > field
>> > > > > > arbitrarily. However, that means we have to break the current
>> "time
>> > > > > system"
>> > > > > > and create a new one. The blocked watermarks become meaningless
>> and
>> > > > > maybe a
>> > > > > > new WatermarkAssigner should be provided. A little more strict
>> > > > mechanism
>> > > > > > would be only allowing to use the existing timestamp fields. It
>> > > sounds
>> > > > > > reasonable, but will bring an unnecessary barrier to
>> stream/batch
>> > > SQL,
>> > > > > i.e.
>> > > > > > some SQL works for the batch can not be executed in the stream
>> > > > > environment.
>> > > > > > I just wonder if we could automatically choose a field, which
>> will
>> > be
>> > > > > used
>> > > > > > in the following calculations. Not sure if it makes sense.
>> > > > > >
>> > > > > > @Shaoxuan @Radu, I totally agree that the "proctime" is the main
>> > > block
>> > > > > for
>> > > > > > consolidating stream/batch SQL. Though from a general point of
>> > view,
>> > > it
>> > > > > can
>> > > > > > indicate the time to some extent, the randomness property
>> > determines
>> > > > that
>> > > > > > it should never be used in time-sensitive applications. I always
>> > > > believe
>> > > > > in
>> > > > > > that all the information used for query evaluation should be
>> > acquired
>> > > > > from
>> > > > > > data itself.
>> > > > > >
>> > > > > > Best,
>> > > > > > Xingcan
>> > > > > >
>> > > > > > On Thu, Jul 27, 2017 at 7:24 PM, Fabian Hueske <
>> fhueske@gmail.com>
>> > > > > wrote:
>> > > > > >
>> > > > > > > Hi Shaoxuan,
>> > > > > > >
>> > > > > > > thanks for your comments. I agree with your comment:
>> > > > > > >
>> > > > > > > > The problem we used to have is that we have treated
>> eventtime
>> > > > column
>> > > > > > as a
>> > > > > > > special timestamp column.
>> > > > > > >
>> > > > > > > IMO, an event-time timestamp column is a regular column that
>> is
>> > > > aligned
>> > > > > > > with the watermarks of the stream.
>> > > > > > > In order to distinguish watermark aligned columns from
>> others, we
>> > > > need
>> > > > > a
>> > > > > > > special flag in the schema.
>> > > > > > > When a timestamp column is modified and we cannot guarantee
>> that
>> > is
>> > > > it
>> > > > > > > still aligned with the watermarks, it must lose the special
>> flag
>> > > and
>> > > > be
>> > > > > > > treated like any other column.
>> > > > > > >
>> > > > > > > Regarding your comments:
>> > > > > > > 1) I agree, that we can use Long in addition to Timestamp as a
>> > > > > timestamp
>> > > > > > > columns. Since timestamp columns need to be comparable to
>> > > watermarks
>> > > > > > which
>> > > > > > > are Longs, I don't see that other types would make sense. For
>> > now,
>> > > I
>> > > > > > would
>> > > > > > > keep the restriction that timestamps can only be of Timestamp
>> > > type. I
>> > > > > > > think, extending this to Long would be a follow-up issue to
>> the
>> > > > > changes I
>> > > > > > > proposed here.
>> > > > > > > 2) Relates to 1) and I agree. if we use a Long attribute as
>> > > timestamp
>> > > > > it
>> > > > > > > should remain of type Long. For now I would keep converting
>> it to
>> > > > > > Timestamp
>> > > > > > > and change that later.
>> > > > > > > 3) Yes, timestamp columns must be aligned to watermarks.
>> That's
>> > > their
>> > > > > > > primary characteristic. How to define watermark strategies is
>> > > > > orthogonal
>> > > > > > to
>> > > > > > > this discussion, IMO.
>> > > > > > > 4) From my point of view, proc-time is a purely virtual column
>> > and
>> > > > not
>> > > > > > > related to an actual (data) column. However, it must be part
>> of
>> > the
>> > > > > > schema
>> > > > > > > and treated like any other attribute for a good user
>> experience
>> > and
>> > > > SQL
>> > > > > > > compliance. In order to be able to join two tables on
>> processing
>> > > > time,
>> > > > > it
>> > > > > > > must be possible to include a processing time column in the
>> > schema
>> > > > > > > definition of the table. Processing time queries can never
>> > compute
>> > > > the
>> > > > > > same
>> > > > > > > results as batch queries but their semantics should be aligned
>> > with
>> > > > > > > event-time queries.
>> > > > > > >
>> > > > > > > Best, Fabian
>> > > > > > >
>> > > > > > > 2017-07-27 9:47 GMT+02:00 Radu Tudoran <
>> radu.tudoran@huawei.com
>> > >:
>> > > > > > >
>> > > > > > > > Hi all,
>> > > > > > > >
>> > > > > > > > @Shaoxuan - thanks for the  remarks. I have a question
>> > regarding
>> > > > your
>> > > > > > > > suggestion not to consider to create proctime window in a
>> > regular
>> > > > > > > column. I
>> > > > > > > > think this would be useful though. First you might need to
>> > carry
>> > > > the
>> > > > > > > > timestamp indicator of when the processing happened (for log
>> > > > > purposes,
>> > > > > > > > provenance, traceability ...). Secondly - I do not think it
>> is
>> > > > > > > > contradicting with the semantics in batch SQL as in SQL you
>> > have
>> > > > the
>> > > > > > > > function "now()" ...which pretty much carry the same
>> semantics
>> > as
>> > > > > > having
>> > > > > > > a
>> > > > > > > > function to mark the proctime and then projecting this into
>> a
>> > > > column.
>> > > > > > If
>> > > > > > > I
>> > > > > > > > am not mistaken you can introduce in database columns the
>> > result
>> > > of
>> > > > > > > calling
>> > > > > > > > now().
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > Dr. Radu Tudoran
>> > > > > > > > Staff Research Engineer - Big Data Expert
>> > > > > > > > IT R&D Division
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > > > > > > German Research Center
>> > > > > > > > Munich Office
>> > > > > > > > Riesstrasse 25, 80992 München
>> > > > > > > >
>> > > > > > > > E-mail: radu.tudoran@huawei.com
>> > > > > > > > Mobile: +49 15209084330
>> > > > > > > > Telephone: +49 891588344173
>> > > > > > > >
>> > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
>> > > > > > > > Registered Office: Düsseldorf, Register Court Düsseldorf,
>> HRB
>> > > > 56063,
>> > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
>> > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf,
>> HRB
>> > > > 56063,
>> > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
>> > > > > > > > This e-mail and its attachments contain confidential
>> > information
>> > > > from
>> > > > > > > > HUAWEI, which is intended only for the person or entity
>> whose
>> > > > address
>> > > > > > is
>> > > > > > > > listed above. Any use of the information contained herein in
>> > any
>> > > > way
>> > > > > > > > (including, but not limited to, total or partial disclosure,
>> > > > > > > reproduction,
>> > > > > > > > or dissemination) by persons other than the intended
>> > recipient(s)
>> > > > is
>> > > > > > > > prohibited. If you receive this e-mail in error, please
>> notify
>> > > the
>> > > > > > sender
>> > > > > > > > by phone or email immediately and delete it!
>> > > > > > > >
>> > > > > > > >
>> > > > > > > > -----Original Message-----
>> > > > > > > > From: Shaoxuan Wang [mailto:shaoxuan@apache.org]
>> > > > > > > > Sent: Thursday, July 27, 2017 6:00 AM
>> > > > > > > > To: Dev
>> > > > > > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp
>> > > handling
>> > > > > > > >
>> > > > > > > >  Hi Everyone,
>> > > > > > > > I like this proposal. The problem we used to have is that we
>> > have
>> > > > > > treated
>> > > > > > > > eventtime column as a special timestamp column. An eventtime
>> > > column
>> > > > > is
>> > > > > > > > nothing special than all other regular columns, but with a
>> > > certain
>> > > > > flag
>> > > > > > > > (eventtime-indicator) inferring that this column can be
>> used as
>> > > an
>> > > > > > > eventime
>> > > > > > > > to decide when a bounded query can emit the final result by
>> > > > comparing
>> > > > > > > with
>> > > > > > > > a concern associated waterMark.
>> > > > > > > >
>> > > > > > > > I have a few comments adding on top of this (they may have
>> > > already
>> > > > > been
>> > > > > > > > addressed in the conversation — since It’s a long
>> discussion, I
>> > > may
>> > > > > > miss
>> > > > > > > > something):
>> > > > > > > >
>> > > > > > > >    1. While we remove timestamp column, we introduce
>> > > > > > eventtime-indicator
>> > > > > > > >    (we may already have this concept), it is only a flag
>> can be
>> > > > > applied
>> > > > > > > for
>> > > > > > > >    any column (note that some types may not be able to be
>> used
>> > as
>> > > > > > > eventtime
>> > > > > > > >    column), indicating if this column can be used as
>> eventtime
>> > or
>> > > > > not.
>> > > > > > > This
>> > > > > > > >    flag is useful for validation and codeGen.
>> > > > > > > >    2. A column that has been used as an eventtime, should
>> not
>> > > lose
>> > > > > its
>> > > > > > > own
>> > > > > > > >    type. We should not cast all eventime column to the
>> > timestamp
>> > > > > type.
>> > > > > > > For
>> > > > > > > >    instance, if a column is a long type, it will keep as
>> long
>> > > type
>> > > > > even
>> > > > > > > if
>> > > > > > > > a
>> > > > > > > >    window aggregate has used it as a eventtime.
>> > > > > > > >    3. Eventtime will only work well with some associated
>> > > waterMark
>> > > > > > > >    strategy. We may consider forcing user to provide a
>> > waterMark
>> > > > > logic
>> > > > > > on
>> > > > > > > >    his/her selected eventtime.
>> > > > > > > >    4. For proctime, I hope we should not introduce
>> > > > proctime-indicator
>> > > > > > for
>> > > > > > > >    regular column. Ideally we should not allow user to
>> create
>> > > > > proctime
>> > > > > > > > window
>> > > > > > > >    on regular column, as this is against the batch query
>> > > semantics.
>> > > > > > > > Therefore
>> > > > > > > >    I suggest we should always introduce a proctime timestamp
>> > > column
>> > > > > for
>> > > > > > > > users
>> > > > > > > >    to create proctime window. And unlike eventtime, proctime
>> > does
>> > > > not
>> > > > > > > need
>> > > > > > > > any
>> > > > > > > >    associated waterMark strategy, as there is no such out of
>> > > order
>> > > > > > issue
>> > > > > > > > for
>> > > > > > > >    the proctime.
>> > > > > > > >
>> > > > > > > > Regards,
>> > > > > > > > Shaoxuan
>> > > > > > > >
>> > > > > > > > On Wed, Jul 26, 2017 at 9:10 PM, Fabian Hueske <
>> > > fhueske@gmail.com>
>> > > > > > > wrote:
>> > > > > > > >
>> > > > > > > > > Thanks everybody for the replies so far.
>> > > > > > > > >
>> > > > > > > > > Let me answer your questions and reply to your thoughts:
>> > > > > > > > >
>> > > > > > > > > Radu:
>> > > > > > > > > ---
>> > > > > > > > > First of all, although my proposal is movivated by a join
>> > > > operator,
>> > > > > > > > > this discussion is about timestamp handling, not about
>> joins
>> > in
>> > > > > > > general.
>> > > > > > > > >
>> > > > > > > > > - The semantics of outer joins is to emit null and there
>> is
>> > no
>> > > > way
>> > > > > > > > > around that. This is not an issue for us. Actually, outer
>> > joins
>> > > > are
>> > > > > > > > > supported by the batch SQL / Table API. It is true that
>> outer
>> > > > joins
>> > > > > > > > > might result in null timestamps. Calcite will mark those
>> > fields
>> > > > as
>> > > > > > > > > nullable and we should check that timestamps which are
>> used
>> > in
>> > > > > > windows
>> > > > > > > > or joins are not nullable.
>> > > > > > > > > - The query has to explicitly specify which timestamp
>> > attribute
>> > > > to
>> > > > > > use.
>> > > > > > > > > Otherwise its semantics are not complete and it is
>> invalid. A
>> > > > > > > > > group-window that follows a join will reference a
>> timestamp
>> > > > > attribute
>> > > > > > > > > and this will be used. The other timestamp might be
>> projected
>> > > > out.
>> > > > > > > > > When a result with two timestamps is converted into a
>> > > DataStream,
>> > > > > the
>> > > > > > > > > user has to decide. This could be done inside of the
>> Table to
>> > > > > > > > > DataStream conversion. If the Table has more than one
>> valid
>> > > > > > timestamp,
>> > > > > > > > > the conversion will ask which timestamp to forward.
>> > > > > > > > > - A proctime join should forward all proctime attributes
>> of
>> > the
>> > > > > input
>> > > > > > > > > tables. All will be the same, but that does not matter
>> > because
>> > > > they
>> > > > > > > > > are either virtual or represented as 1 byte dummy
>> attributes.
>> > > > Also,
>> > > > > > > > > unused ones will be automatically projected out anyway.
>> > > > > > > > > - An event-time join should forward all event-time
>> attributes
>> > > of
>> > > > > the
>> > > > > > > > > input tables. Creating a new event-time attribute using
>> > > > processing
>> > > > > > > > > time makes event-time processing pointless and will give
>> > > > completely
>> > > > > > > > random results.
>> > > > > > > > > Event-time is not about the "time an event is created" but
>> > > about
>> > > > a
>> > > > > > > > > timestamp that is associated with an event. For example an
>> > > order
>> > > > > > event
>> > > > > > > > > could have three timestamps: "orderTime", "shipTime", and
>> > > > > > > "receiveTime".
>> > > > > > > > > Each could be a valid event-time attribute.
>> > > > > > > > >
>> > > > > > > > > Jark:
>> > > > > > > > > ---
>> > > > > > > > > Thanks for the proposal. I think I understand what you
>> want
>> > to
>> > > > > > achieve
>> > > > > > > > > with this, but I think functions to instantiate time
>> > attributes
>> > > > are
>> > > > > > > > > not necessary and would make things more complicated. The
>> > point
>> > > > of
>> > > > > > > > > supporting multiple time attributes is to ensure that all
>> of
>> > > them
>> > > > > are
>> > > > > > > > > aligned with the watermarks. If we add a method
>> > > > ROW_TIME(timestamp)
>> > > > > > > > > and we don't know if the timestamp is aligned with the
>> > > > watermarks.
>> > > > > If
>> > > > > > > > > that is not the case, the query won't be executed as
>> > expected.
>> > > > The
>> > > > > > > > > issue of LEFT JOIN can easily be addressed by checking for
>> > > > > > > > > nullablility during optimization when an operator tries to
>> > use
>> > > > it.
>> > > > > > > > >
>> > > > > > > > > The beauty of supporting multiple timestamps is that a
>> user
>> > > does
>> > > > > not
>> > > > > > > > > have to care at all about timestamps (or timestamp
>> functions)
>> > > and
>> > > > > > > > > watermarks. As long as the query uses a timestamp
>> attribute
>> > > that
>> > > > > was
>> > > > > > > > > originally declared as rowtime in a source table (and was
>> not
>> > > > > > modified
>> > > > > > > > > afterwards), this is fine. Think of a cascade of three
>> > windowed
>> > > > > > joins:
>> > > > > > > > > R - S - T - U, and you want to join S - T first. In that
>> > case,
>> > > > you
>> > > > > > > > > need to preserve the timestamps of S and T in order to
>> join R
>> > > and
>> > > > > U.
>> > > > > > > > > From a relational algebra point of view, there is no
>> reason
>> > to
>> > > > > have a
>> > > > > > > > > limitation on how these attributes are accessed.
>> Timestamps
>> > are
>> > > > > just
>> > > > > > > > > regular fields of a record. The only restriction in the
>> > context
>> > > > of
>> > > > > > > > > stream processing is that the watermark must be aligned
>> with
>> > > > > > > > > timestamps, i.e., follow all timestamps such that data is
>> not
>> > > > late
>> > > > > > > > > according to any of the timestamps. This we can achieve
>> and
>> > > > handle
>> > > > > > > > internally without the user having to worry about it.
>> > > > > > > > >
>> > > > > > > > > Xingcan:
>> > > > > > > > > ---
>> > > > > > > > > I think your questions are mostly implementation details
>> and
>> > > not
>> > > > so
>> > > > > > > > > much related to the original proposal of supporting
>> multiple
>> > > > > > > timestamps.
>> > > > > > > > >
>> > > > > > > > > My take on your questions is:
>> > > > > > > > > 1. The rate at which watermarks are emitted is not
>> important
>> > > for
>> > > > > the
>> > > > > > > > > correctness of a query. However, it can affect the
>> > performance,
>> > > > > > > > > because each watermark is sent as a special record and it
>> is
>> > > > > > > > > broadcasted. My initial take would be to emit a new
>> watermark
>> > > > > > whenever
>> > > > > > > > > the operator updated its watermark because usually, the
>> > > operator
>> > > > > > would
>> > > > > > > > > have forwarded the old watermark.
>> > > > > > > > > 2. I would say this is the responsibility of the operator
>> > > because
>> > > > > > > > > first it is not related to the semantics of the query and
>> > > second
>> > > > it
>> > > > > > is
>> > > > > > > > > an operator responsibility in the existing code as well.
>> > > > > > > > >
>> > > > > > > > > Jark 2:
>> > > > > > > > > You are right, the query (or user) must decide on the
>> > > event-time
>> > > > > > > > > attribute to use. My main point is, it is much easier for
>> the
>> > > > user
>> > > > > > > > > (and for us
>> > > > > > > > > internally) if we internally track multiple timestamps.
>> > Because
>> > > > we
>> > > > > do
>> > > > > > > > > not have to prune the timestamp that will not be later
>> used
>> > > into
>> > > > > the
>> > > > > > > > join.
>> > > > > > > > > Moreover, both timestamps might be used later (see join
>> > > example,
>> > > > > > which
>> > > > > > > > > could be reordered of course). All we have to do is to
>> ensure
>> > > > that
>> > > > > > all
>> > > > > > > > > timestamps are aligned with the watermarks.
>> > > > > > > > >
>> > > > > > > > > Radu 2:
>> > > > > > > > > IMO, time (or anything else that affects the semantics)
>> > should
>> > > > > never
>> > > > > > > > > be decided by the system. When we would do that, a query
>> is
>> > not
>> > > > > fully
>> > > > > > > > > specified or, even worse, the way it is executed is
>> > > semantically
>> > > > > > > > > incorrect and produces arbitrary results.
>> > > > > > > > >
>> > > > > > > > > Time attributes should be specified in the source tables
>> and
>> > > then
>> > > > > > > > > forwarded from there. So far I haven't seen an example
>> where
>> > > this
>> > > > > > > > > would not be possible (within the semantics or relational
>> > > > queries).
>> > > > > > If
>> > > > > > > > > we do that right, there won't be a need for explicit time
>> > > > > management
>> > > > > > > > > except for the definition of the initial timestamps which
>> can
>> > > be
>> > > > > > > > > hidden in the table definition. As I said before, we (or
>> the
>> > > > > system)
>> > > > > > > > > cannot decide on the timestamp because that would lead to
>> > > > arbitrary
>> > > > > > > > > results. Asking the user to do that would mean explicit
>> time
>> > > > > > > > > management which is also not desirable. I think my
>> proposal
>> > > gives
>> > > > > > > > > users all options (timestamps) to chose from and the
>> system
>> > can
>> > > > do
>> > > > > > the
>> > > > > > > > rest.
>> > > > > > > > >
>> > > > > > > > > Best, Fabian
>> > > > > > > > >
>> > > > > > > > > 2017-07-26 10:46 GMT+02:00 Radu Tudoran <
>> > > radu.tudoran@huawei.com
>> > > > >:
>> > > > > > > > >
>> > > > > > > > > > Hi everyone,
>> > > > > > > > > >
>> > > > > > > > > > I just want to add that I was referring to NULL values
>> not
>> > > > > > > > > > specifically
>> > > > > > > > > to
>> > > > > > > > > > timefields but to the event itself. If you have the
>> follow
>> > > > > > situation
>> > > > > > > > > >
>> > > > > > > > > > Stream 1:     .... |    event1   | ....
>> > > > > > > > > > Stream 2:     .... |             | ....
>> > > > > > > > > >
>> > > > > > > > > > And you have a LEFT JOIN between stream 1 and stream 2
>> (no
>> > > > > > > > > > condition)...then you still need to emit (event1,null)
>> ...
>> > as
>> > > > > this
>> > > > > > > > > > is the behavior of left join. This is maybe a very
>> simple
>> > > > > > situation,
>> > > > > > > > > > but the
>> > > > > > > > > point
>> > > > > > > > > > is that left joins and right joins can have situation
>> when
>> > > you
>> > > > > have
>> > > > > > > > > > elements only in the main stream and no element in the
>> > right
>> > > > > > stream.
>> > > > > > > > > > And for this case you still need to emit.
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Regarding whether time should be decided by system or
>> > not...i
>> > > > > think
>> > > > > > > > > > the answer is it depends. I think the example from Jack
>> is
>> > > very
>> > > > > > good
>> > > > > > > > > > and
>> > > > > > > > > shows
>> > > > > > > > > > the need for some mechanisms to select/manage the time
>> (I
>> > > like
>> > > > > the
>> > > > > > > > > proposal
>> > > > > > > > > > of having functions to insert the time in the output!).
>> > > > However,
>> > > > > if
>> > > > > > > > > > a business analyst would write a query without explicit
>> > time
>> > > > > > > > > > management we still need to have some default behavior
>> in
>> > the
>> > > > > > > > > > system. As per my initial proposal, I think  we need to
>> > > decide
>> > > > on
>> > > > > > > > > > one timestamp field to carry (either a new one at the
>> > moment
>> > > of
>> > > > > the
>> > > > > > > > > > join) or the timestamp from the
>> > > > > > > > > main
>> > > > > > > > > > stream  (...although I am not sure which one is the main
>> > > stream
>> > > > > in
>> > > > > > > > > > the
>> > > > > > > > > case
>> > > > > > > > > > of a full join:) )
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > Dr. Radu Tudoran
>> > > > > > > > > > Staff Research Engineer - Big Data Expert IT R&D
>> Division
>> > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > > > > > > > > German Research Center
>> > > > > > > > > > Munich Office
>> > > > > > > > > > Riesstrasse 25, 80992 München
>> > > > > > > > > >
>> > > > > > > > > > E-mail: radu.tudoran@huawei.com
>> > > > > > > > > > Mobile: +49 15209084330
>> > > > > > > > > > Telephone: +49 891588344173
>> > > > > > > > > >
>> > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany,
>> www.huawei.com
>> > > > > > > > > > Registered Office: Düsseldorf, Register Court
>> Düsseldorf,
>> > HRB
>> > > > > > 56063,
>> > > > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
>> > > > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht
>> Düsseldorf,
>> > > HRB
>> > > > > > 56063,
>> > > > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
>> > > > > > > > > > This e-mail and its attachments contain confidential
>> > > > information
>> > > > > > from
>> > > > > > > > > > HUAWEI, which is intended only for the person or entity
>> > whose
>> > > > > > address
>> > > > > > > > is
>> > > > > > > > > > listed above. Any use of the information contained
>> herein
>> > in
>> > > > any
>> > > > > > way
>> > > > > > > > > > (including, but not limited to, total or partial
>> > disclosure,
>> > > > > > > > > reproduction,
>> > > > > > > > > > or dissemination) by persons other than the intended
>> > > > recipient(s)
>> > > > > > is
>> > > > > > > > > > prohibited. If you receive this e-mail in error, please
>> > > notify
>> > > > > the
>> > > > > > > > sender
>> > > > > > > > > > by phone or email immediately and delete it!
>> > > > > > > > > >
>> > > > > > > > > > -----Original Message-----
>> > > > > > > > > > From: Jark Wu [mailto:jark@apache.org]
>> > > > > > > > > > Sent: Wednesday, July 26, 2017 8:29 AM
>> > > > > > > > > > To: dev@flink.apache.org
>> > > > > > > > > > Subject: Re: [DISCUSS] Table API / SQL internal
>> timestamp
>> > > > > handling
>> > > > > > > > > >
>> > > > > > > > > > Hi Xingcan,
>> > > > > > > > > >
>> > > > > > > > > > IMO, I don't think event-time of join results could be
>> > > > > > automatically
>> > > > > > > > > > decided by system. Considering batch tables, if users
>> want
>> > a
>> > > > > event
>> > > > > > > time
>> > > > > > > > > > window aggregation after join, user must specify the
>> time
>> > > field
>> > > > > > > > > explicitly
>> > > > > > > > > > (T1.rowtime or T2.rowtime or the computed result of
>> them).
>> > So
>> > > > in
>> > > > > > the
>> > > > > > > > case
>> > > > > > > > > > of streaming tables, the system also can't automatically
>> > > decide
>> > > > > the
>> > > > > > > > time
>> > > > > > > > > > field for users.
>> > > > > > > > > >
>> > > > > > > > > > In regards to the question you asked, I think we don't
>> need
>> > > to
>> > > > > > change
>> > > > > > > > the
>> > > > > > > > > > watermark no matter we choose the left rowtime or right
>> > > rowtime
>> > > > > or
>> > > > > > > the
>> > > > > > > > > > combination. Because the watermark has been aligned with
>> > the
>> > > > > > rowtime
>> > > > > > > in
>> > > > > > > > > the
>> > > > > > > > > > source. Maybe I'm wrong about this, please correct me if
>> > I'm
>> > > > > > missing
>> > > > > > > > > > something.
>> > > > > > > > > >
>> > > > > > > > > > What do you think?
>> > > > > > > > > >
>> > > > > > > > > > Regards,
>> > > > > > > > > > Jark
>> > > > > > > > > >
>> > > > > > > > > > 2017-07-26 11:24 GMT+08:00 Xingcan Cui <
>> xingcanc@gmail.com
>> > >:
>> > > > > > > > > >
>> > > > > > > > > > > Hi all,
>> > > > > > > > > > >
>> > > > > > > > > > > @Fabian, thanks for raising this.
>> > > > > > > > > > >
>> > > > > > > > > > > @Radu and Jark, personally I think the timestamp
>> field is
>> > > > > > critical
>> > > > > > > > for
>> > > > > > > > > > > query processing and thus should be declared as (or
>> > > supposed
>> > > > to
>> > > > > > be)
>> > > > > > > > > > > NOT NULL. In addition, I think the event-time
>> semantic of
>> > > the
>> > > > > > join
>> > > > > > > > > > > results should be automatically decided by the system,
>> > > i.e.,
>> > > > we
>> > > > > > do
>> > > > > > > > not
>> > > > > > > > > > > hand it over to users so to avoid some unpredictable
>> > > > > assignment.
>> > > > > > > > > > >
>> > > > > > > > > > > Generally speaking, consolidating different time
>> fields
>> > is
>> > > > > > possible
>> > > > > > > > > > > since all of them should ideally be monotonically
>> > > increasing.
>> > > > > > From
>> > > > > > > my
>> > > > > > > > > > > point of view, the problem lies in
>> > > > > > > > > > > (1) what's the relationship between the old and new
>> > > > watermarks.
>> > > > > > > Shall
>> > > > > > > > > > > they be one-to-one mapping or the new watermarks could
>> > skip
>> > > > > some
>> > > > > > > > > > > timestamps? And (2) who is in charge of emitting the
>> > > blocked
>> > > > > > > > > > > watermarks, the operator or the process function?
>> > > > > > > > > > >
>> > > > > > > > > > > I'd like to hear from you.
>> > > > > > > > > > >
>> > > > > > > > > > > Best,
>> > > > > > > > > > > Xingcan
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > > > On Wed, Jul 26, 2017 at 10:40 AM, Jark Wu <
>> > jark@apache.org
>> > > >
>> > > > > > wrote:
>> > > > > > > > > > >
>> > > > > > > > > > > > Hi,
>> > > > > > > > > > > >
>> > > > > > > > > > > > Radu's concerns make sense to me, especially the
>> null
>> > > value
>> > > > > > > > > > > > timestamp and multi-proctime.
>> > > > > > > > > > > >
>> > > > > > > > > > > > I have also something in my mind. I would like to
>> > propose
>> > > > > some
>> > > > > > > time
>> > > > > > > > > > > > indicator built-in functions, e.g.
>> ROW_TIME(Timestamp
>> > ts)
>> > > > > will
>> > > > > > > > > > > > generate a event time logical attribute, PROC_TIME()
>> > will
>> > > > > > > generate
>> > > > > > > > a
>> > > > > > > > > > > > processing time logical attribute. It is similar to
>> > > > > > > TUMBLE_ROWTIME
>> > > > > > > > > > > > proposed in this PR https://github.com/apache/
>> > > > > flink/pull/4199.
>> > > > > > > > These
>> > > > > > > > > > > > can be used in any queries, but there still can't be
>> > more
>> > > > > than
>> > > > > > > one
>> > > > > > > > > > > > rowtime attribute or more than one proctime
>> attribute
>> > in
>> > > a
>> > > > > > table
>> > > > > > > > > > schema.
>> > > > > > > > > > > >
>> > > > > > > > > > > > The both selected timestamp fields from a JOIN query
>> > will
>> > > > be
>> > > > > > > > > > > materialized.
>> > > > > > > > > > > > If someone needs further down the computation based
>> on
>> > > the
>> > > > > > event
>> > > > > > > > > > > > time,
>> > > > > > > > > > > they
>> > > > > > > > > > > > need to create a new time attribute using the
>> > > ROW_TIME(...)
>> > > > > > > > > > > > function. And this can also solve the null timestamp
>> > > > problem
>> > > > > in
>> > > > > > > > LEFT
>> > > > > > > > > > > > JOIN, because we
>> > > > > > > > > > > can
>> > > > > > > > > > > > use a user defined function to combine the two
>> rowtimes
>> > > and
>> > > > > > make
>> > > > > > > > the
>> > > > > > > > > > > result
>> > > > > > > > > > > > as the event time attribute, e.g. SELECT
>> > > > > > ROW_TIME(udf(T1.rowtime,
>> > > > > > > > > > > > T2.rowtime)) as rowtime FROM T1 JOIN T2 ...
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > What do you think?
>> > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > > > 2017-07-25 23:48 GMT+08:00 Radu Tudoran <
>> > > > > > radu.tudoran@huawei.com
>> > > > > > > >:
>> > > > > > > > > > > >
>> > > > > > > > > > > > > Hi,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > I think this is an interesting discussion and I
>> would
>> > > > like
>> > > > > to
>> > > > > > > add
>> > > > > > > > > > > > > some issues and give some feedback.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > - For supporting the join we do not only need to
>> > think
>> > > of
>> > > > > the
>> > > > > > > > time
>> > > > > > > > > > > > > but also on the null values. For example if you
>> have
>> > a
>> > > > LEFT
>> > > > > > (or
>> > > > > > > > > > > > > RIGHT) JOIN between items of 2 input streams, and
>> the
>> > > > > > secondary
>> > > > > > > > > > > > > input is not
>> > > > > > > > > > > > available
>> > > > > > > > > > > > > you should still emit Row.of(event1, null)...as
>> far
>> > as
>> > > I
>> > > > > know
>> > > > > > > if
>> > > > > > > > > > > > > you
>> > > > > > > > > > > need
>> > > > > > > > > > > > > to serialize/deserialize null values to send them
>> > they
>> > > do
>> > > > > not
>> > > > > > > > > > > > > work. So
>> > > > > > > > > > > we
>> > > > > > > > > > > > > should include this scenario in the discussions
>> -If
>> > we
>> > > > will
>> > > > > > > have
>> > > > > > > > > > > > > multiple timestamp in an (output) event, one
>> question
>> > > > > > > > > > > is
>> > > > > > > > > > > > > how to select afterwards which is the primary time
>> > > field
>> > > > on
>> > > > > > > which
>> > > > > > > > > > > > > to operate. When we describe a query we might be
>> able
>> > > to
>> > > > > > > specify
>> > > > > > > > > > > > > (or we
>> > > > > > > > > > > get
>> > > > > > > > > > > > > this implicitly if we implement the carryon of
>> the 2
>> > > > > > > timestamps)
>> > > > > > > > > > > Select
>> > > > > > > > > > > > > T1.rowtime, T2.rowtime ...but if the output of a
>> > query
>> > > is
>> > > > > the
>> > > > > > > > > > > > > input of
>> > > > > > > > > > > a
>> > > > > > > > > > > > > new processing pipeline, then, do we support
>> > generally
>> > > > also
>> > > > > > > that
>> > > > > > > > > > > > > the
>> > > > > > > > > > > > input
>> > > > > > > > > > > > > has 2 time fields? ...how do we deal with the 2
>> input
>> > > > > fields
>> > > > > > > > > > > > > (maybe I
>> > > > > > > > > > > am
>> > > > > > > > > > > > > missing something) further in the datastream
>> pipeline
>> > > > that
>> > > > > we
>> > > > > > > > > > > > > build
>> > > > > > > > > > > based
>> > > > > > > > > > > > > on the output?
>> > > > > > > > > > > > > - For the case of proctime - do we need to carry 2
>> > > > > proctimes
>> > > > > > > (the
>> > > > > > > > > > > > > proctimes of the incoming events from each
>> stream),
>> > or
>> > > 1
>> > > > > > > proctime
>> > > > > > > > > > > > > (as
>> > > > > > > > > > > we
>> > > > > > > > > > > > > operate on proctime and the combination of the 2
>> > inputs
>> > > > can
>> > > > > > be
>> > > > > > > > > > > considered
>> > > > > > > > > > > > > as a new event, the current proctime on the
>> machine
>> > can
>> > > > be
>> > > > > > > > > > > > > considered
>> > > > > > > > > > > the
>> > > > > > > > > > > > > (proc)time reference for output event) or 3
>> proctimes
>> > > > (the
>> > > > > 2
>> > > > > > > > > > > > > proctimes
>> > > > > > > > > > > of
>> > > > > > > > > > > > > the input plus the proctime when the new event was
>> > > > > created)?
>> > > > > > > > > > > > > -Similar with the point above, for even time
>> (which I
>> > > am
>> > > > > > > > > > > > > understanding
>> > > > > > > > > > > as
>> > > > > > > > > > > > > the time when the event was created...or do we
>> > > understand
>> > > > > > them
>> > > > > > > as
>> > > > > > > > > > > > > a
>> > > > > > > > > > > time
>> > > > > > > > > > > > > carry within the event?) - when we join 2 events
>> and
>> > > > output
>> > > > > > an
>> > > > > > > > > > > > > event
>> > > > > > > > > > > that
>> > > > > > > > > > > > > is the result of the join - isn't this a new event
>> > > detach
>> > > > > > from
>> > > > > > > > the
>> > > > > > > > > > > > > source\input events? ... I would tend to say it
>> is a
>> > > new
>> > > > > > event
>> > > > > > > > and
>> > > > > > > > > > > > > then
>> > > > > > > > > > > > as
>> > > > > > > > > > > > > for proctime the event time of the new event is
>> the
>> > > > current
>> > > > > > > time
>> > > > > > > > > > > > > when
>> > > > > > > > > > > > this
>> > > > > > > > > > > > > output event was created. If we would accept this
>> > > > > hypothesis
>> > > > > > > then
>> > > > > > > > > > > > > we
>> > > > > > > > > > > > would
>> > > > > > > > > > > > > not need the 2 time input fields to be
>> > carried/managed
>> > > > > > > > implicitly.
>> > > > > > > > > > > > > If someone needs further down the computation
>> > pipeline,
>> > > > > then
>> > > > > > in
>> > > > > > > > > > > > > the query
>> > > > > > > > > > > > they
>> > > > > > > > > > > > > would be selected explicitly from the input stream
>> > and
>> > > > > > > projected
>> > > > > > > > > > > > > in
>> > > > > > > > > > > some
>> > > > > > > > > > > > > fields to be carried (Select T1.rowtime as
>> > FormerTime1,
>> > > > > > > > T2.rowtime
>> > > > > > > > > > > > > as FormerTime2, .... JOIN T1, T2...)...but they
>> would
>> > > not
>> > > > > > have
>> > > > > > > > the
>> > > > > > > > > > > timestamp
>> > > > > > > > > > > > > logic
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > ..my 2 cents
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Dr. Radu Tudoran
>> > > > > > > > > > > > > Staff Research Engineer - Big Data Expert IT R&D
>> > > Division
>> > > > > > > > > > > > >
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > > > > > > > > > > > German Research Center
>> > > > > > > > > > > > > Munich Office
>> > > > > > > > > > > > > Riesstrasse 25, 80992 München
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > E-mail: radu.tudoran@huawei.com
>> > > > > > > > > > > > > Mobile: +49 15209084330 <+49%201520%209084330>
>> > > > > > > > > > > > > Telephone: +49 891588344173
>> <+49%2089%201588344173>
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > HUAWEI TECHNOLOGIES Duesseldorf GmbH
>> > > > > > > > > > > > > Hansaallee 205, 40549 Düsseldorf, Germany,
>> > > > www.huawei.com
>> > > > > > > > > > > > > Registered Office: Düsseldorf, Register Court
>> > > Düsseldorf,
>> > > > > HRB
>> > > > > > > > > 56063,
>> > > > > > > > > > > > > Managing Director: Bo PENG, Qiuen Peng, Shengli
>> Wang
>> > > > > > > > > > > > > Sitz der Gesellschaft: Düsseldorf, Amtsgericht
>> > > > Düsseldorf,
>> > > > > > HRB
>> > > > > > > > > 56063,
>> > > > > > > > > > > > > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
>> > > > > > > > > > > > > This e-mail and its attachments contain
>> confidential
>> > > > > > > information
>> > > > > > > > > from
>> > > > > > > > > > > > > HUAWEI, which is intended only for the person or
>> > entity
>> > > > > whose
>> > > > > > > > > address
>> > > > > > > > > > > is
>> > > > > > > > > > > > > listed above. Any use of the information contained
>> > > herein
>> > > > > in
>> > > > > > > any
>> > > > > > > > > way
>> > > > > > > > > > > > > (including, but not limited to, total or partial
>> > > > > disclosure,
>> > > > > > > > > > > > reproduction,
>> > > > > > > > > > > > > or dissemination) by persons other than the
>> intended
>> > > > > > > recipient(s)
>> > > > > > > > > is
>> > > > > > > > > > > > > prohibited. If you receive this e-mail in error,
>> > please
>> > > > > > notify
>> > > > > > > > the
>> > > > > > > > > > > sender
>> > > > > > > > > > > > > by phone or email immediately and delete it!
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > -----Original Message-----
>> > > > > > > > > > > > > From: Fabian Hueske [mailto:fhueske@gmail.com]
>> > > > > > > > > > > > > Sent: Tuesday, July 25, 2017 4:22 PM
>> > > > > > > > > > > > > To: dev@flink.apache.org
>> > > > > > > > > > > > > Subject: [DISCUSS] Table API / SQL internal
>> timestamp
>> > > > > > handling
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Hi everybody,
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > I'd like to propose and discuss some changes in
>> the
>> > way
>> > > > how
>> > > > > > the
>> > > > > > > > > Table
>> > > > > > > > > > > API
>> > > > > > > > > > > > > / SQL internally handles timestamps.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > The Table API is implemented on top of the
>> DataStream
>> > > > API.
>> > > > > > The
>> > > > > > > > > > > DataStream
>> > > > > > > > > > > > > API hides timestamps from users in order to ensure
>> > that
>> > > > > > > > timestamps
>> > > > > > > > > > and
>> > > > > > > > > > > > > watermarks are aligned. Instead users assign
>> > timestamps
>> > > > and
>> > > > > > > > > > watermarks
>> > > > > > > > > > > > once
>> > > > > > > > > > > > > (usually at the source or in a subsequent
>> operator)
>> > and
>> > > > let
>> > > > > > the
>> > > > > > > > > > system
>> > > > > > > > > > > > > handle the timestamps from there on. Timestamps
>> are
>> > > > stored
>> > > > > in
>> > > > > > > the
>> > > > > > > > > > > > timestamp
>> > > > > > > > > > > > > field of the StreamRecord which is a holder for
>> the
>> > > user
>> > > > > > record
>> > > > > > > > and
>> > > > > > > > > > the
>> > > > > > > > > > > > > timestamp. DataStream operators that depend on
>> time
>> > > > > > > > (time-windows,
>> > > > > > > > > > > > process
>> > > > > > > > > > > > > function, ...) access the timestamp from the
>> > > > StreamRecord.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > In contrast to the DataSteam API, the Table API
>> and
>> > SQL
>> > > > are
>> > > > > > > aware
>> > > > > > > > > of
>> > > > > > > > > > > the
>> > > > > > > > > > > > > semantics of a query. I.e., we can analyze how
>> users
>> > > > access
>> > > > > > > > > > timestamps
>> > > > > > > > > > > > and
>> > > > > > > > > > > > > whether they are modified or not. Another
>> difference
>> > is
>> > > > > that
>> > > > > > > the
>> > > > > > > > > > > > timestamp
>> > > > > > > > > > > > > must be part of the schema of a table in order to
>> > have
>> > > > > > correct
>> > > > > > > > > query
>> > > > > > > > > > > > > semantics.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > The current design to handle timestamps is as
>> > follows.
>> > > > The
>> > > > > > > Table
>> > > > > > > > > API
>> > > > > > > > > > > > > stores timestamps in the timestamp field of the
>> > > > > StreamRecord.
>> > > > > > > > > > > Therefore,
>> > > > > > > > > > > > > timestamps are detached from the remaining data
>> which
>> > > is
>> > > > > > stored
>> > > > > > > > in
>> > > > > > > > > > Row
>> > > > > > > > > > > > > objects. Hence, the physical representation of a
>> row
>> > is
>> > > > > > > different
>> > > > > > > > > > from
>> > > > > > > > > > > > its
>> > > > > > > > > > > > > logical representation. We introduced a
>> translation
>> > > layer
>> > > > > > > > > (RowSchema)
>> > > > > > > > > > > to
>> > > > > > > > > > > > > convert logical schema into physical schema. This
>> is
>> > > > > > necessery
>> > > > > > > > for
>> > > > > > > > > > > > > serialization or code generation when the logical
>> > plan
>> > > is
>> > > > > > > > > translated
>> > > > > > > > > > > > into a
>> > > > > > > > > > > > > physical execution plan. Processing-time
>> timestamps
>> > are
>> > > > > > > similarly
>> > > > > > > > > > > > handled.
>> > > > > > > > > > > > > They are not included in the physical schema and
>> > looked
>> > > > up
>> > > > > > when
>> > > > > > > > > > needed.
>> > > > > > > > > > > > > This design also requires that we need to
>> materialize
>> > > > > > > timestamps
>> > > > > > > > > when
>> > > > > > > > > > > > they
>> > > > > > > > > > > > > are accessed by expressions. Timestamp
>> > materialization
>> > > is
>> > > > > > done
>> > > > > > > > as a
>> > > > > > > > > > > > > pre-optimization step.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > While thinking about the implementation of the
>> > > event-time
>> > > > > > > > windowed
>> > > > > > > > > > > > > stream-stream join [1] I stumbled over the
>> question
>> > > which
>> > > > > > > > timestamp
>> > > > > > > > > > of
>> > > > > > > > > > > > both
>> > > > > > > > > > > > > input tables to forward. With the current design,
>> we
>> > > > could
>> > > > > > only
>> > > > > > > > > have
>> > > > > > > > > > a
>> > > > > > > > > > > > > single timestamp, so keeping both timestamps would
>> > not
>> > > be
>> > > > > > > > possible.
>> > > > > > > > > > The
>> > > > > > > > > > > > > choice of the timestamp would need to be
>> specified by
>> > > the
>> > > > > > query
>> > > > > > > > > > > otherwise
>> > > > > > > > > > > > > it would lack clear semantics. When executing the
>> > join,
>> > > > the
>> > > > > > > join
>> > > > > > > > > > > operator
>> > > > > > > > > > > > > would need to make sure that no late data is
>> emitted.
>> > > > This
>> > > > > > > would
>> > > > > > > > > only
>> > > > > > > > > > > > work
>> > > > > > > > > > > > > the operator was able to hold back watermarks [2].
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > With this information in mind, I'd like to discuss
>> > the
>> > > > > > > following
>> > > > > > > > > > > > proposal:
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > - We allow more than one event-time timestamp and
>> > store
>> > > > > them
>> > > > > > > > > directly
>> > > > > > > > > > > in
>> > > > > > > > > > > > > the Row
>> > > > > > > > > > > > > - The query operators ensure that the watermarks
>> are
>> > > > always
>> > > > > > > > behind
>> > > > > > > > > > all
>> > > > > > > > > > > > > event-time timestamps. With additional analysis we
>> > will
>> > > > be
>> > > > > > able
>> > > > > > > > to
>> > > > > > > > > > > > restrict
>> > > > > > > > > > > > > this to timestamps that are actually used as such.
>> > > > > > > > > > > > > - When a DataStream operator is time-based (e.g.,
>> a
>> > > > > > DataStream
>> > > > > > > > > > > > > time-windows), we inject an operator that copies
>> the
>> > > > > > timestamp
>> > > > > > > > from
>> > > > > > > > > > the
>> > > > > > > > > > > > Row
>> > > > > > > > > > > > > into the StreamRecord.
>> > > > > > > > > > > > > - We try to remove the distinction between logical
>> > and
>> > > > > > physical
>> > > > > > > > > > schema.
>> > > > > > > > > > > > > For event-time timestamps this is because we store
>> > them
>> > > > in
>> > > > > > the
>> > > > > > > > Row
>> > > > > > > > > > > > object,
>> > > > > > > > > > > > > for processing-time timestamps, we add a dummy
>> byte
>> > > > field.
>> > > > > > When
>> > > > > > > > > > > > accessing a
>> > > > > > > > > > > > > field of this type, the code generator injects the
>> > code
>> > > > to
>> > > > > > > fetch
>> > > > > > > > > the
>> > > > > > > > > > > > > timestamps.
>> > > > > > > > > > > > > - We might be able to get around the
>> pre-optimization
>> > > > time
>> > > > > > > > > > > > materialization
>> > > > > > > > > > > > > step.
>> > > > > > > > > > > > > - A join result would be able to keep both
>> > timestamps.
>> > > > The
>> > > > > > > > > watermark
>> > > > > > > > > > > > would
>> > > > > > > > > > > > > be hold back for both so both could be used in
>> > > subsequent
>> > > > > > > > > operations.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > I admit, I haven't thought this completely
>> through.
>> > > > > > > > > > > > > However, the benefits of this design from my
>> point of
>> > > > view
>> > > > > > are:
>> > > > > > > > > > > > > - encoding of timestamps in Rows means that the
>> > logical
>> > > > > > schema
>> > > > > > > is
>> > > > > > > > > > equal
>> > > > > > > > > > > > to
>> > > > > > > > > > > > > the physical schema
>> > > > > > > > > > > > > - no timestamp materialization
>> > > > > > > > > > > > > - support for multiple timestamps. Otherwise we
>> would
>> > > > need
>> > > > > to
>> > > > > > > > > expose
>> > > > > > > > > > > > > internal restrictions to the user which are hard
>> to
>> > > > > explain /
>> > > > > > > > > > > > communicate.
>> > > > > > > > > > > > > - no need to change any public interfaces at the
>> > > moment.
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > The drawbacks as far as I see them are:
>> > > > > > > > > > > > > - additional payload due to unused timestamp
>> field +
>> > > > > possibly
>> > > > > > > the
>> > > > > > > > > > > > > processing-time dummy field
>> > > > > > > > > > > > > - complete rework of the internal timestamp logic
>> > > > > (again...)
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > Please let me know what you think,
>> > > > > > > > > > > > > Fabian
>> > > > > > > > > > > > >
>> > > > > > > > > > > > > [1] https://issues.apache.org/jira
>> /browse/FLINK-6233
>> > > > > > > > > > > > > [2] https://issues.apache.org/jira
>> /browse/FLINK-7245
>> > > > > > > > > > > > >
>> > > > > > > > > > > >
>> > > > > > > > > > >
>> > > > > > > > > >
>> > > > > > > > >
>> > > > > > > >
>> > > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message