From Shaoxuan Wang <wshaox...@gmail.com>
Subject Re: [DISCUSS] Table API / SQL internal timestamp handling
Date Mon, 31 Jul 2017 14:42:17 GMT
Xingcan,
Watermark is the “estimate of completion”. User defines the waterMark based
on the best estimation per each input of when it pretty much sees all the
data. It is usually calculated by the event timestamp.
When we do a windowed join, we have to make sure the watermark for both
inputs are received before emit a window result at this watermark. If the
two inputs have large difference, say "one for today and the other one
for yesterday" as you pointed out, the watermark for the windowed join
operator is just yesterday.  I guess this is what Fabian means "In case of
a join, the smallest future timestamp depends on two fields and not just on
one." In the windowed join cases, we have to buffer all the delta data
between watermarks of two inputs. It is the user's responsibility (if
she/he wants to reduce the cost) to align watermarks of the stream sources
as much as possible.

Regards,
Shaoxuan

On Mon, Jul 31, 2017 at 10:09 PM, Xingcan Cui <xingcanc@gmail.com> wrote:

> Hi Fabian,
>
> I got a similar question with Jark. Theoretically, the row times of two
> streams
> could be quite difference, e.g., one for today and the other one for
> yesterday.
> How can we align them?
>
> Best,
> Xingcan
>
> On Mon, Jul 31, 2017 at 9:04 PM, Fabian Hueske <fhueske@gmail.com> wrote:
>
> > Hi Jark,
> >
> > yes, the handling of watermarks is very tricky. It is not directly
> related
> > to the proposal which is only about the representation of timestamps but
> > becomes important for event-time joins.
> > We have a JIRA about an operator that is able to hold back watermarks
> [1].
> >
> > Roughly the idea is to track the smallest timestamp that will be emitted
> in
> > the future and align the watermark to this timestamp.
> > For this we need to know the semantics of the operator (which timestamp
> > will be emitted in the future) but this will be given for relational
> > operators.
> > The new operator could emit a watermark whenever it received one.
> >
> > In case of a join, the smallest future timestamp depends on two fields
> and
> > not just on one.
> >
> > Best,
> > Fabian
> >
> >
> >
> > 2017-07-31 14:35 GMT+02:00 Jark Wu <jark@apache.org>:
> >
> > > Hi,
> > >
> > > @Fabian, I read your proposal carefully again, and I'm big +1 to do it.
> > The
> > > proposal can address the problem of that how to forward both input
> > tables'
> > > rowtime of dual stream join (windowed/non-windowed). The additional
> > > is acceptable.
> > >
> > > You mentioned that:
> > >
> > > > The query operators ensure that the watermarks are always behind all
> > > > event-time timestamps. With additional analysis we will be able to
> > > restrict
> > > > this to timestamps that are actually used as such.
> > >
> > > I'm more curious about how can we define the watermark strategies in
> > order
> > > to make sure all timestamp columns are aligned to watermarks.
> Especially,
> > > when the watermark has been defined in the input DataStream.
> > >
> > > Bests,
> > > Jark Wu
> > >
> > >
> > > 2017-07-27 23:13 GMT+08:00 Xingcan Cui <xingcanc@gmail.com>:
> > >
> > > > Hi all,
> > > >
> > > > Thanks for the answers, @Fabian.
> > > >
> > > > @Jark, at first I also wanted the users to reassign the timestamp
> field
> > > > arbitrarily. However, that means we have to break the current "time
> > > system"
> > > > and create a new one. The blocked watermarks become meaningless and
> > > maybe a
> > > > new WatermarkAssigner should be provided. A little more strict
> > mechanism
> > > > would be only allowing to use the existing timestamp fields. It
> sounds
> > > > reasonable, but will bring an unnecessary barrier to stream/batch
> SQL,
> > > i.e.
> > > > some SQL works for the batch can not be executed in the stream
> > > environment.
> > > > I just wonder if we could automatically choose a field, which will be
> > > used
> > > > in the following calculations. Not sure if it makes sense.
> > > >
> > > > @Shaoxuan @Radu, I totally agree that the "proctime" is the main
> block
> > > for
> > > > consolidating stream/batch SQL. Though from a general point of view,
> it
> > > can
> > > > indicate the time to some extent, the randomness property determines
> > that
> > > > it should never be used in time-sensitive applications. I always
> > believe
> > > in
> > > > that all the information used for query evaluation should be acquired
> > > from
> > > > data itself.
> > > >
> > > > Best,
> > > > Xingcan
> > > >
> > > > On Thu, Jul 27, 2017 at 7:24 PM, Fabian Hueske <fhueske@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Shaoxuan,
> > > > >
> > > > >
> > > > > > The problem we used to have is that we have treated eventtime
> > column
> > > > as a
> > > > > special timestamp column.
> > > > >
> > > > > IMO, an event-time timestamp column is a regular column that is
> > aligned
> > > > > with the watermarks of the stream.
> > > > > In order to distinguish watermark aligned columns from others, we
> > need
> > > a
> > > > > special flag in the schema.
> > > > > When a timestamp column is modified and we cannot guarantee that is
> > it
> > > > > still aligned with the watermarks, it must lose the special flag
> and
> > be
> > > > > treated like any other column.
> > > > >
> > > > > 1) I agree, that we can use Long in addition to Timestamp as a
> > > timestamp
> > > > > columns. Since timestamp columns need to be comparable to
> watermarks
> > > > which
> > > > > are Longs, I don't see that other types would make sense. For now,
> I
> > > > would
> > > > > keep the restriction that timestamps can only be of Timestamp
> type. I
> > > > > think, extending this to Long would be a follow-up issue to the
> > > changes I
> > > > > proposed here.
> > > > > 2) Relates to 1) and I agree. if we use a Long attribute as
> timestamp
> > > it
> > > > > should remain of type Long. For now I would keep converting it to
> > > > Timestamp
> > > > > and change that later.
> > > > > 3) Yes, timestamp columns must be aligned to watermarks. That's
> their
> > > > > primary characteristic. How to define watermark strategies is
> > > orthogonal
> > > > to
> > > > > this discussion, IMO.
> > > > > 4) From my point of view, proc-time is a purely virtual column and
> > not
> > > > > related to an actual (data) column. However, it must be part of the
> > > > schema
> > > > > and treated like any other attribute for a good user experience and
> > SQL
> > > > > compliance. In order to be able to join two tables on processing
> > time,
> > > it
> > > > > must be possible to include a processing time column in the schema
> > > > > definition of the table. Processing time queries can never compute
> > the
> > > > same
> > > > > results as batch queries but their semantics should be aligned with
> > > > > event-time queries.
> > > > >
> > > > > Best, Fabian
> > > > >
> > > > >
> > > > > > Hi all,
> > > > > >
> > > > > > @Shaoxuan - thanks for the  remarks. I have a question regarding
> > your
> > > > > > suggestion not to consider to create proctime window in a regular
> > > > > column. I
> > > > > > think this would be useful though. First you might need to carry
> > the
> > > > > > timestamp indicator of when the processing happened (for log
> > > purposes,
> > > > > > provenance, traceability ...). Secondly - I do not think it is
> > > > > > contradicting with the semantics in batch SQL as in SQL you have
> > the
> > > > > > function "now()" ...which pretty much carry the same semantics as
> > > > having
> > > > > a
> > > > > > function to mark the proctime and then projecting this into a
> > column.
> > > > If
> > > > > I
> > > > > > am not mistaken you can introduce in database columns the result
> of
> > > > > calling
> > > > > > now().
> > > > > >
> > > > > >
> > > > > >
> > > > > > -----Original Message-----
> > > > > > From: Shaoxuan Wang [mailto:shaoxuan@apache.org]
> > > > > > Sent: Thursday, July 27, 2017 6:00 AM
> > > > > > To: Dev
> > > > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp
> handling
> > > > > >
> > > > > >  Hi Everyone,
> > > > > > I like this proposal. The problem we used to have is that we have
> > > > treated
> > > > > > eventtime column as a special timestamp column. An eventtime
> column
> > > is
> > > > > > nothing special than all other regular columns, but with a
> certain
> > > flag
> > > > > > (eventtime-indicator) inferring that this column can be used as
> an
> > > > > eventime
> > > > > > to decide when a bounded query can emit the final result by
> > comparing
> > > > > with
> > > > > > a concern associated waterMark.
> > > > > >
> > > > > > I have a few comments adding on top of this (they may have
> > > been
> > > > > > addressed in the conversation — since It’s a long discussion, I
> may
> > > > miss
> > > > > > something):
> > > > > >
> > > > > >    1. While we remove timestamp column, we introduce
> > > > eventtime-indicator
> > > > > >    (we may already have this concept), it is only a flag can be
> > > applied
> > > > > for
> > > > > >    any column (note that some types may not be able to be used as
> > > > > eventtime
> > > > > >    column), indicating if this column can be used as eventtime or
> > > not.
> > > > > This
> > > > > >    flag is useful for validation and codeGen.
> > > > > >    2. A column that has been used as an eventtime, should not
> lose
> > > its
> > > > > own
> > > > > >    type. We should not cast all eventime column to the timestamp
> > > type.
> > > > > For
> > > > > >    instance, if a column is a long type, it will keep as long
> type
> > > even
> > > > > if
> > > > > > a
> > > > > >    window aggregate has used it as a eventtime.
> > > > > >    3. Eventtime will only work well with some associated
> waterMark
> > > > > >    strategy. We may consider forcing user to provide a waterMark
> > > logic
> > > > on
> > > > > >    his/her selected eventtime.
> > > > > >    4. For proctime, I hope we should not introduce
> > proctime-indicator
> > > > for
> > > > > >    regular column. Ideally we should not allow user to create
> > > proctime
> > > > > > window
> > > > > >    on regular column, as this is against the batch query
> semantics.
> > > > > > Therefore
> > > > > >    I suggest we should always introduce a proctime timestamp
> column
> > > for
> > > > > > users
> > > > > >    to create proctime window. And unlike eventtime, proctime does
> > not
> > > > > need
> > > > > > any
> > > > > >    associated waterMark strategy, as there is no such out of
> order
> > > > issue
> > > > > > for
> > > > > >    the proctime.
> > > > > >
> > > > > > Regards,
> > > > > > Shaoxuan
> > > > > >
> > > > > > On Wed, Jul 26, 2017 at 9:10 PM, Fabian Hueske <
> fhueske@gmail.com>
> > > > > wrote:
> > > > > >
> > > > > > > Thanks everybody for the replies so far.
> > > > > > >
> > > > > > >
> > > > > > > Radu:
> > > > > > > ---
> > > > > > > First of all, although my proposal is movivated by a join
> > operator,
> > > > > > > this discussion is about timestamp handling, not about joins in
> > > > > general.
> > > > > > >
> > > > > > > - The semantics of outer joins is to emit null and there is no
> > way
> > > > > > > around that. This is not an issue for us. Actually, outer joins
> > are
> > > > > > > supported by the batch SQL / Table API. It is true that outer
> > joins
> > > > > > > might result in null timestamps. Calcite will mark those fields
> > as
> > > > > > > nullable and we should check that timestamps which are used in
> > > > windows
> > > > > > or joins are not nullable.
> > > > > > > - The query has to explicitly specify which timestamp attribute
> > to
> > > > use.
> > > > > > > Otherwise its semantics are not complete and it is invalid. A
> > > > > > > group-window that follows a join will reference a timestamp
> > > attribute
> > > > > > > and this will be used. The other timestamp might be projected
> > out.
> > > > > > > When a result with two timestamps is converted into a
> DataStream,
> > > the
> > > > > > > user has to decide. This could be done inside of the Table to
> > > > > > > DataStream conversion. If the Table has more than one valid
> > > > timestamp,
> > > > > > > the conversion will ask which timestamp to forward.
> > > > > > > - A proctime join should forward all proctime attributes of the
> > > input
> > > > > > > tables. All will be the same, but that does not matter because
> > they
> > > > > > > are either virtual or represented as 1 byte dummy attributes.
> > Also,
> > > > > > > unused ones will be automatically projected out anyway.
> > > > > > > - An event-time join should forward all event-time attributes
> of
> > > the
> > > > > > > input tables. Creating a new event-time attribute using
> > processing
> > > > > > > time makes event-time processing pointless and will give
> > completely
> > > > > > random results.
> > > > > > > Event-time is not about the "time an event is created" but
> > a
> > > > > > > timestamp that is associated with an event. For example an
> order
> > > > event
> > > > > > > could have three timestamps: "orderTime", "shipTime", and
> > > > > "receiveTime".
> > > > > > > Each could be a valid event-time attribute.
> > > > > > >
> > > > > > > Jark:
> > > > > > > ---
> > > > > > > Thanks for the proposal. I think I understand what you want to
> > > > achieve
> > > > > > > with this, but I think functions to instantiate time attributes
> > are
> > > > > > > not necessary and would make things more complicated. The point
> > of
> > > > > > > supporting multiple time attributes is to ensure that all of
> them
> > > are
> > > > > > > aligned with the watermarks. If we add a method
> > ROW_TIME(timestamp)
> > > > > > > and we don't know if the timestamp is aligned with the
> > watermarks.
> > > If
> > > > > > > that is not the case, the query won't be executed as expected.
> > The
> > > > > > > issue of LEFT JOIN can easily be addressed by checking for
> > > > > > > nullablility during optimization when an operator tries to use
> > it.
> > > > > > >
> > > > > > > The beauty of supporting multiple timestamps is that a user
> does
> > > not
> > > > > > > have to care at all about timestamps (or timestamp functions)
> and
> > > > > > > watermarks. As long as the query uses a timestamp attribute
> that
> > > was
> > > > > > > originally declared as rowtime in a source table (and was not
> > > > modified
> > > > > > > afterwards), this is fine. Think of a cascade of three windowed
> > > > joins:
> > > > > > > R - S - T - U, and you want to join S - T first. In that case,
> > you
> > > > > > > need to preserve the timestamps of S and T in order to join R
> and
> > > U.
> > > > > > > From a relational algebra point of view, there is no reason to
> > > have a
> > > > > > > limitation on how these attributes are accessed. Timestamps are
> > > just
> > > > > > > regular fields of a record. The only restriction in the context
> > of
> > > > > > > stream processing is that the watermark must be aligned with
> > > > > > > timestamps, i.e., follow all timestamps such that data is not
> > late
> > > > > > > according to any of the timestamps. This we can achieve and
> > handle
> > > > > > internally without the user having to worry about it.
> > > > > > >
> > > > > > > Xingcan:
> > > > > > > ---
> > > > > > > I think your questions are mostly implementation details and
> not
> > so
> > > > > > > much related to the original proposal of supporting multiple
> > > > > timestamps.
> > > > > > >
> > > > > > > My take on your questions is:
> > > > > > > 1. The rate at which watermarks are emitted is not important
> for
> > > the
> > > > > > > correctness of a query. However, it can affect the performance,
> > > > > > > because each watermark is sent as a special record and it is
> > > > > > > broadcasted. My initial take would be to emit a new watermark
> > > > whenever
> > > > > > > the operator updated its watermark because usually, the
> operator
> > > > would
> > > > > > > have forwarded the old watermark.
> > > > > > > 2. I would say this is the responsibility of the operator
> because
> > > > > > > first it is not related to the semantics of the query and
> second
> > it
> > > > is
> > > > > > > an operator responsibility in the existing code as well.
> > > > > > >
> > > > > > > Jark 2:
> > > > > > > You are right, the query (or user) must decide on the
> event-time
> > > > > > > attribute to use. My main point is, it is much easier for the
> > user
> > > > > > > (and for us
> > > > > > > internally) if we internally track multiple timestamps. Because
> > we
> > > do
> > > > > > > not have to prune the timestamp that will not be later used
> into
> > > the
> > > > > > join.
> > > > > > > Moreover, both timestamps might be used later (see join
> example,
> > > > which
> > > > > > > could be reordered of course). All we have to do is to ensure
> > that
> > > > all
> > > > > > > timestamps are aligned with the watermarks.
> > > > > > >
> > > > > > > Radu 2:
> > > > > > > IMO, time (or anything else that affects the semantics) should
> > > never
> > > > > > > be decided by the system. When we would do that, a query is not
> > > fully
> > > > > > > specified or, even worse, the way it is executed is
> semantically
> > > > > > > incorrect and produces arbitrary results.
> > > > > > >
> > > > > > > Time attributes should be specified in the source tables and
> then
> > > > > > > forwarded from there. So far I haven't seen an example where
> this
> > > > > > > would not be possible (within the semantics or relational
> > queries).
> > > > If
> > > > > > > we do that right, there won't be a need for explicit time
> > > management
> > > > > > > except for the definition of the initial timestamps which can
> be
> > > > > > > hidden in the table definition. As I said before, we (or the
> > > system)
> > > > > > > cannot decide on the timestamp because that would lead to
> > arbitrary
> > > > > > > results. Asking the user to do that would mean explicit time
> > > > > > > management which is also not desirable. I think my proposal
> gives
> > > > > > > users all options (timestamps) to chose from and the system can
> > do
> > > > the
> > > > > > rest.
> > > > > > >
> > > > > > > Best, Fabian
> > > > > > >
> > > > > > > 2017-07-26 10:46 GMT+02:00 Radu Tudoran <
> > >:
> > > > > > >
> > > > > > > > Hi everyone,
> > > > > > > >
> > > > > > > > I just want to add that I was referring to NULL values not
> > > > > > > > specifically
> > > > > > > to
> > > > > > > > timefields but to the event itself. If you have the follow
> > > > situation
> > > > > > > >
> > > > > > > > Stream 1:     .... |    event1   | ....
> > > > > > > > Stream 2:     .... |             | ....
> > > > > > > >
> > > > > > > > And you have a LEFT JOIN between stream 1 and stream 2 (no
> > > > > > > > condition)...then you still need to emit (event1,null) ... as
> > > this
> > > > > > > > is the behavior of left join. This is maybe a very simple
> > > > situation,
> > > > > > > > but the
> > > > > > > point
> > > > > > > > is that left joins and right joins can have situation when
> you
> > > have
> > > > > > > > elements only in the main stream and no element in the right
> > > > stream.
> > > > > > > > And for this case you still need to emit.
> > > > > > > >
> > > > > > > >
> > > > > > > > Regarding whether time should be decided by system or not...i
> > > think
> > > > > > > > the answer is it depends. I think the example from Jack is
> very
> > > > good
> > > > > > > > and
> > > > > > > shows
> > > > > > > > the need for some mechanisms to select/manage the time (I
> like
> > > the
> > > > > > > proposal
> > > > > > > > of having functions to insert the time in the output!).
> > However,
> > > if
> > > > > > > > a business analyst would write a query without explicit time
> > > > > > > > management we still need to have some default behavior in the
> > > > > > > > system. As per my initial proposal, I think  we need to
> decide
> > on
> > > > > > > > one timestamp field to carry (either a new one at the moment
> of
> > > the
> > > > > > > > join) or the timestamp from the
> > > > > > > main
> > > > > > > > stream  (...although I am not sure which one is the main
> stream
> > > in
> > > > > > > > the
> > > > > > > case
> > > > > > > > of a full join:) )
> > > > > > > >
> > > > > > > >
> > > > > > > > -----Original Message-----
> > > > > > > > From: Jark Wu [mailto:jark@apache.org]
> > > > > > > > Sent: Wednesday, July 26, 2017 8:29 AM
> > > > > > > > To: dev@flink.apache.org
> > > > > > > > Subject: Re: [DISCUSS] Table API / SQL internal timestamp
> > > handling
> > > > > > > >
> > > > > > > > Hi Xingcan,
> > > > > > > >
> > > > > > > > IMO, I don't think event-time of join results could be
> > > > automatically
> > > > > > > > decided by system. Considering batch tables, if users want a
> > > event
> > > > > time
> > > > > > > > window aggregation after join, user must specify the time
> field
> > > > > > > explicitly
> > > > > > > > (T1.rowtime or T2.rowtime or the computed result of them). So
> > in
> > > > the
> > > > > > case
> > > > > > > > of streaming tables, the system also can't automatically
> decide
> > > the
> > > > > > time
> > > > > > > > field for users.
> > > > > > > >
> > > > > > > > In regards to the question you asked, I think we don't need
> to
> > > > change
> > > > > > the
> > > > > > > > watermark no matter we choose the left rowtime or right
> rowtime
> > > or
> > > > > the
> > > > > > > > combination. Because the watermark has been aligned with the
> > > > rowtime
> > > > > in
> > > > > > > the
> > > > missing
> > > > > > > > something.
> > > > > > > >
> > > > > > > > What do you think?
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Jark
> > > > > > > >
> > > > > > > > 2017-07-26 11:24 GMT+08:00 Xingcan Cui <xingcanc@gmail.com>:
> > > > > > > >
> > > > > > > > > Hi all,
> > > > > > > > >
> > > > > > > > > @Fabian, thanks for raising this.
> > > > > > > > >
> > > > > > > > > @Radu and Jark, personally I think the timestamp field is
> > > > critical
> > > > > > for
> > > > > > > > > query processing and thus should be declared as (or
> supposed
> > to
> > > > be)
> > > > > > > > > NOT NULL. In addition, I think the event-time semantic of
> the
> > > > join
> > > > > > > > > results should be automatically decided by the system,
> i.e.,
> > we
> > > > do
> > > > > > not
> > > > > > > > > hand it over to users so to avoid some unpredictable
> > > assignment.
> > > > > > > > >
> > > > > > > > > Generally speaking, consolidating different time fields is
> > > > possible
> > > > > > > > > since all of them should ideally be monotonically
> increasing.
> > > > From
> > > > > my
> > > > > > > > > point of view, the problem lies in
> > > > > > > > > (1) what's the relationship between the old and new
> > watermarks.
> > > > > Shall
> > > > > > > > > they be one-to-one mapping or the new watermarks could skip
> > > some
> > > > > > > > > timestamps? And (2) who is in charge of emitting the
> blocked
> > > > > > > > > watermarks, the operator or the process function?
> > > > > > > > >
> > > > > > > > > I'd like to hear from you.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Xingcan
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Jul 26, 2017 at 10:40 AM, Jark Wu <jark@apache.org
> >
> > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > Radu's concerns make sense to me, especially the null
> value
> > > > > > > > > > timestamp and multi-proctime.
> > > > > > > > > >
> > > > > > > > > > I have also something in my mind. I would like to propose
> > > some
> > > > > time
> > > > > > > > > > indicator built-in functions, e.g. ROW_TIME(Timestamp ts)
> > > will
> > > > > > > > > > generate a event time logical attribute, PROC_TIME() will
> > > > > generate
> > > > > > a
> > > > > > > > > > processing time logical attribute. It is similar to
> > > > > TUMBLE_ROWTIME
> > > > > > > > > > proposed in this PR https://github.com/apache/
> > > > > > These
> > > > > > > > > > can be used in any queries, but there still can't be more
> > > than
> > > > > one
> > > > > > > > > > rowtime attribute or more than one proctime attribute in
> a
> > > > table
> > > > > > > > schema.
> > > > > > > > > >
> > > > > > > > > > The both selected timestamp fields from a JOIN query will
> > be
> > > > > > > > > materialized.
> > > > > > > > > > If someone needs further down the computation based on
> the
> > > > event
> > > > > > > > > > time,
> > > > > > > > > they
> > > > > > > > > > need to create a new time attribute using the
> ROW_TIME(...)
> > > > > > > > > > function. And this can also solve the null timestamp
> > problem
> > > in
> > > > > > LEFT
> > > > > > > > > > JOIN, because we
> > > > > > > > > can
> > > > > > > > > > use a user defined function to combine the two rowtimes
> and
> > > > make
> > > > > > the
> > > > > > > > > result
> > > > > > > > > > as the event time attribute, e.g. SELECT
> > > > ROW_TIME(udf(T1.rowtime,
> > > > > > > > > > T2.rowtime)) as rowtime FROM T1 JOIN T2 ...
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > What do you think?
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > 2017-07-25 23:48 GMT+08:00 Radu Tudoran <
> > > > > >:
> > > > > > > > > >
> > > > > > > > > > > Hi,
> > > > > > > > > > >
> > > > > > > > > > > I think this is an interesting discussion and I would
> > like
> > > to
> > > > > add
> > > > > > > > > > > some issues and give some feedback.
> > > > > > > > > > >
> > > > > > > > > > > - For supporting the join we do not only need to think
> of
> > > the
> > > > > > time
> > > > > > > > > > > but also on the null values. For example if you have a
> > LEFT
> > > > (or
> > > > > > > > > > > RIGHT) JOIN between items of 2 input streams, and the
> > > > secondary
> > > > > > > > > > > input is not
> > > > > > > > > > available
> > > > > > > > > > > you should still emit Row.of(event1, null)...as far as
> I
> > > know
> > > > > if
> > > > > > > > > > > you
> > > > > > > > > need
> > > > > > > > > > > to serialize/deserialize null values to send them they
> do
> > > not
> > > > > > > > > > > work. So
> > > > > > > > > we
> > > > > > > > > > > should include this scenario in the discussions -If we
> > will
> > > > > have
> > > > > > > > > > > multiple timestamp in an (output) event, one question
> > > > > > > > > is
> > > > > > > > > > > how to select afterwards which is the primary time
> field
> > on
> > > > > which
> > > > > > > > > > > to operate. When we describe a query we might be able
> to
> > > > > specify
> > > > > > > > > > > (or we
> > > > > > > > > get
> > > > > > > > > > > this implicitly if we implement the carryon of the 2
> > > > > timestamps)
> > > > > > > > > Select
> > > > > > > > > > > T1.rowtime, T2.rowtime ...but if the output of a query
> is
> > > the
> > > > > > > > > > > input of
> > > > > > > > > a
> > > > > > > > > > > new processing pipeline, then, do we support generally
> > also
> > > > > that
> > > > > > > > > > > the
> > > > > > > > > > input
> > > > > > > > > > > has 2 time fields? ...how do we deal with the 2 input
> > > fields
> > > > > > > > > > > (maybe I
> > > > > > > > > am
> > > > > > > > > > > missing something) further in the datastream pipeline
> > that
> > > we
> > > > > > > > > > > build
> > > > > > > > > based
> > > > > > > > > > > on the output?
> > > > > > > > > > > - For the case of proctime - do we need to carry 2
> > > proctimes
> > > > > (the
> > > > > > > > > > > proctimes of the incoming events from each stream), or
> 1
> > > > > proctime
> > > > > > > > > > > (as
> > > > > > > > > we
> > > > > > > > > > > operate on proctime and the combination of the 2 inputs
> > can
> > > > be
> > > > > > > > > considered
> > > > > > > > > > > as a new event, the current proctime on the machine can
> > be
> > > > > > > > > > > considered
> > > > > > > > > the
> > > > > > > > > > > (proc)time reference for output event) or 3 proctimes
> > (the
> > > 2
> > > > > > > > > > > proctimes
> > > > > > > > > of
> > > > > > > > > > > the input plus the proctime when the new event was
> > > created)?
> > > > > > > > > > > -Similar with the point above, for even time (which I
> am
> > > > > > > > > > > understanding
> > > > > > > > > as
> > > > > > > > > > > the time when the event was created...or do we
> understand
> > > > them
> > > > > as
> > > > > > > > > > > a
> > > > > > > > > time
> > > > > > > > > > > carry within the event?) - when we join 2 events and
> > output
> > > > an
> > > > > > > > > > > event
> > > > > > > > > that
> > > > > > > > > > > is the result of the join - isn't this a new event
> detach
> > > > from
> > > > > > the
> > > > > > > > > > > source\input events? ... I would tend to say it is a
> new
> > > > event
> > > > > > and
> > > > > > > > > > > then
> > > > > > > > > > as
> > > > > > > > > > > for proctime the event time of the new event is the
> > current
> > > > > time
> > > > > > > > > > > when
> > > > > > > > > > this
> > > > > > > > > > > output event was created. If we would accept this
> > > hypothesis
> > > > > then
> > > > > > > > > > > we
> > > > > > > > > > would
> > > > > > > > > > > not need the 2 time input fields to be carried/managed
> > > > > > implicitly.
> > > > > > > > > > > If someone needs further down the computation pipeline,
> > > then
> > > > in
> > > > > > > > > > > the query
> > > > > > > > > > they
> > > > > > > > > > > would be selected explicitly from the input stream and
> > > > > projected
> > > > > > > > > > > in
> > > > > > > > > some
> > > > > > > > > > > fields to be carried (Select T1.rowtime as FormerTime1,
> > > > > > T2.rowtime
> > > > > > > > > > > as FormerTime2, .... JOIN T1, T2...)...but they would
> not
> > > > have
> > > > > > the
> > > > > > > > > timestamp
> > > > > > > > > > > logic
> > > > > > > > > > >
> > > > > > > > > > > ..my 2 cents
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > -----Original Message-----
> > > > > > > > > > > From: Fabian Hueske [mailto:fhueske@gmail.com]
> > > > > > > > > > > Sent: Tuesday, July 25, 2017 4:22 PM
> > > > > > > > > > > To: dev@flink.apache.org
> > > > > > > > > > > Subject: [DISCUSS] Table API / SQL internal timestamp
> > > > handling
> > > > > > > > > > >
> > > > > > > > > > > Hi everybody,
> > > > > > > > > > >
> > > > > > > > > > > I'd like to propose and discuss some changes in the way
> > how
> > > > the
> > > > > > > Table
> > > > > > > > > API
> > > > > > > > > > > / SQL internally handles timestamps.
> > > > > > > > > > >
> > > > > > > > > > > The Table API is implemented on top of the DataStream
> > API.
> > > > The
> > > > > > > > > DataStream
> > > > > > > > > > > API hides timestamps from users in order to ensure that
> > > > > > timestamps
> > > > > > > > and
> > > > > > > > > > > watermarks are aligned. Instead users assign timestamps
> > and
> > > > > > > > watermarks
> > > > > > > > > > once
> > > > > > > > > > > (usually at the source or in a subsequent operator) and
> > let
> > > > the
> > > > > > > > system
> > > > > > > > > > > handle the timestamps from there on. Timestamps are
> > stored
> > > in
> > > > > the
> > > > > > > > > > timestamp
> > > > > > > > > > > field of the StreamRecord which is a holder for the
> user
> > > > record
> > > > > > and
> > > > > > > > the
> > > > > > > > > > > timestamp. DataStream operators that depend on time
> > > > > > (time-windows,
> > > > > > > > > > process
> > > > > > > > > > > function, ...) access the timestamp from the
> > StreamRecord.
> > > > > > > > > > >
> > > > > > > > > > > In contrast to the DataSteam API, the Table API and SQL
> > are
> > > > > aware
> > > > > > > of
> > > > > > > > > the
> > > > > > > > > > > semantics of a query. I.e., we can analyze how users
> > access
> > > > > > > > timestamps
> > > > > > > > > > and
> > > > > > > > > > > whether they are modified or not. Another difference is
> > > that
> > > > > the
> > > > > > > > > > timestamp
> > > > > > > > > > > must be part of the schema of a table in order to have
> > > > correct
> > > > > > > query
> > > > > > > > > > > semantics.
> > > > > > > > > > >
> > > > > > > > > > > The current design to handle timestamps is as follows.
> > The
> > > > > Table
> > > > > > > API
> > > > > > > > > > > stores timestamps in the timestamp field of the
> > > StreamRecord.
> > > > > > > > > Therefore,
> > > > > > > > > > > timestamps are detached from the remaining data which
> is
> > > > stored
> > > > > > in
> > > > > > > > Row
> > > > > > > > > > > objects. Hence, the physical representation of a row is
> > > > > different
> > > > > > > > from
> > > > > > > > > > its
> > > > > > > > > > > logical representation. We introduced a translation
> layer
> > > > > > > (RowSchema)
> > > > > > > > > to
> > > > > > > > > > > convert logical schema into physical schema. This is
> > > > necessery
> > > > > > for
> > > > > > > > > > > serialization or code generation when the logical plan
> is
> > > > > > > translated
> > > > > > > > > > into a
> > > > > > > > > > > physical execution plan. Processing-time timestamps are
> > > > > similarly
> > > > > > > > > > handled.
> > > > > > > > > > > They are not included in the physical schema and looked
> > up
> > > > when
> > > > > > > > needed.
> > > > > > > > > > > This design also requires that we need to materialize
> > > > > timestamps
> > > > > > > when
> > > > > > > > > > they
> > > > > > > > > > > are accessed by expressions. Timestamp materialization
> is
> > > > done
> > > > > > as a
> > > > > > > > > > > pre-optimization step.
> > > > > > > > > > >
> > > > > > > > > > > While thinking about the implementation of the
> event-time
> > > > > > windowed
> > > > > > > > > > > stream-stream join [1] I stumbled over the question
> which
> > > > > > timestamp
> > > > > > > > of
> > > > > > > > > > both
> > > > > > > > > > > input tables to forward. With the current design, we
> > could
> > > > only
> > > > > > > have
> > > > > > > > a
> > > > > > > > > > > single timestamp, so keeping both timestamps would not
> be
> > > > > > possible.
> > > > > > > > The
> > > > > > > > > > > choice of the timestamp would need to be specified by
> the
> > > > query
> > > > > > > > > otherwise
> > > > > > > > > > > it would lack clear semantics. When executing the join,
> > the
> > > > > join
> > > > > > > > > operator
> > > > > > > > > > > would need to make sure that no late data is emitted.
> > This
> > > > > would
> > > > > > > only
> > > > > > > > > > work
> > > > > > > > > > > the operator was able to hold back watermarks [2].
> > > > > > > > > > >
> > > > > > > > > > > With this information in mind, I'd like to discuss the
> > > > > following
> > > > > > > > > > proposal:
> > > > > > > > > > >
> > > > > > > > > > > - We allow more than one event-time timestamp and store
> > > them
> > > > > > > directly
> > > > > > > > > in
> > > > > > > > > > > the Row
> > > > > > > > > > > - The query operators ensure that the watermarks are
> > always
> > > > > > behind
> > > > > > > > all
> > > > > > > > > > > event-time timestamps. With additional analysis we will
> > be
> > > > able
> > > > > > to
> > > > > > > > > > restrict
> > > > > > > > > > > this to timestamps that are actually used as such.
> > > > > > > > > > > - When a DataStream operator is time-based (e.g., a
> > > > DataStream
> > > > > > > > > > > time-windows), we inject an operator that copies the
> > > > timestamp
> > > > > > from
> > > > > > > > the
> > > > > > > > > > Row
> > > > > > > > > > > into the StreamRecord.
> > > > > > > > > > > - We try to remove the distinction between logical and
> > > > physical
> > > > > > > > schema.
> > > > > > > > > > > For event-time timestamps this is because we store them
> > in
> > > > the
> > > > > > Row
> > > > > > > > > > object,
> > > > > > > > > > > for processing-time timestamps, we add a dummy byte
> > field.
> > > > When
> > > > > > > > > > accessing a
> > > > > > > > > > > field of this type, the code generator injects the code
> > to
> > > > > fetch
> > > > > > > the
> > > > > > > > > > > timestamps.
> > > > > > > > > > > - We might be able to get around the pre-optimization
> > time
> > > > > > > > > > materialization
> > > > > > > > > > > step.
> > > > > > > > > > > - A join result would be able to keep both timestamps.
> > The
> > > > > > > watermark
> > > > > > > > > > would
> > > > > > > > > > > be hold back for both so both could be used in
> subsequent
> > > > > > > operations.
> > > > > > > > > > >
> > > > > > > > > > > I admit, I haven't thought this completely through.
> > > > > > > > > > > However, the benefits of this design from my point of
> > view
> > > > are:
> > > > > > > > > > > - encoding of timestamps in Rows means that the logical
> > > > schema
> > > > > is
> > > > > > > > equal
> > > > > > > > > > to
> > > > > > > > > > > the physical schema
> > > > > > > > > > > - no timestamp materialization
> > > > > > > > > > > - support for multiple timestamps. Otherwise we would
> > need
> > > to
> > > > > > > expose
> > > > > > > > > > > internal restrictions to the user which are hard to
> > > explain /
> > > > > > > > > > communicate.
> > > > > > > > > > > - no need to change any public interfaces at the
> moment.
> > > > > > > > > > >
> > > > > > > > > > > The drawbacks as far as I see them are:
> > > > > > > > > > > - additional payload due to unused timestamp field +
> > > possibly
> > > > > the
> > > > > > > > > > > processing-time dummy field
> > > > > > > > > > > - complete rework of the internal timestamp logic
> > > (again...)
> > > > > > > > > > >
> > > > > > > > > > > Please let me know what you think,
> > > > > > > > > > > Fabian
> > > > > > > > > > >
> > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-6233
> > > > > > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-7245
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

