flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xingcan Cui <xingc...@gmail.com>
Subject Re: [DISCUSS] Table API / SQL internal timestamp handling
Date Wed, 26 Jul 2017 03:24:08 GMT
Hi all,

@Fabian, thanks for raising this.

@Radu and Jark, personally I think the timestamp field is critical for
query processing
and thus should be declared as (or supposed to be) NOT NULL. In addition, I
think the
event-time semantic of the join results should be automatically decided by
the system,
i.e., we do not hand it over to users so to avoid some unpredictable
assignment.

Generally speaking, consolidating different time fields is possible since
all of them
should ideally be monotonically increasing. From my point of view, the
problem lies in
(1) what's the relationship between the old and new watermarks. Shall they
be one-to-one
mapping or the new watermarks could skip some timestamps? And (2) who is in
charge of
emitting the blocked watermarks, the operator or the process function?

I'd like to hear from you.

Best,
Xingcan



On Wed, Jul 26, 2017 at 10:40 AM, Jark Wu <jark@apache.org> wrote:

> Hi,
>
> Radu's concerns make sense to me, especially the null value timestamp and
> multi-proctime.
>
> I have also something in my mind. I would like to propose some time
> indicator built-in functions, e.g. ROW_TIME(Timestamp ts) will generate a
> event time logical attribute, PROC_TIME() will generate a processing time
> logical attribute. It is similar to TUMBLE_ROWTIME proposed in this PR
> https://github.com/apache/flink/pull/4199. These can be used in any
> queries, but there still can't be more than one rowtime attribute or more
> than one proctime attribute in a table schema.
>
> The both selected timestamp fields from a JOIN query will be materialized.
> If someone needs further down the computation based on the event time, they
> need to create a new time attribute using the ROW_TIME(...) function. And
> this can also solve the null timestamp problem in LEFT JOIN, because we can
> use a user defined function to combine the two rowtimes and make the result
> as the event time attribute, e.g. SELECT ROW_TIME(udf(T1.rowtime,
> T2.rowtime)) as rowtime FROM T1 JOIN T2 ...
>
>
> What do you think?
>
>
> 2017-07-25 23:48 GMT+08:00 Radu Tudoran <radu.tudoran@huawei.com>:
>
> > Hi,
> >
> > I think this is an interesting discussion and I would like to add some
> > issues and give some feedback.
> >
> > - For supporting the join we do not only need to think of the time but
> > also on the null values. For example if you have a LEFT (or RIGHT) JOIN
> > between items of 2 input streams, and the secondary input is not
> available
> > you should still emit Row.of(event1, null)...as far as I know if you need
> > to serialize/deserialize null values to send them they do not work. So we
> > should include this scenario in the discussions
> > -If we will have multiple timestamp in an (output) event, one question is
> > how to select afterwards which is the primary time field on which to
> > operate. When we describe a query we might be able to specify (or we get
> > this implicitly if we implement the carryon of the 2 timestamps)  Select
> > T1.rowtime, T2.rowtime ...but if the output of a query is the input of a
> > new processing pipeline, then, do we support generally also that the
> input
> > has 2 time fields? ...how do we deal with the 2 input fields (maybe I am
> > missing something) further in the datastream pipeline that we build based
> > on the output?
> > - For the case of proctime - do we need to carry 2 proctimes (the
> > proctimes of the incoming events from each stream), or 1 proctime (as we
> > operate on proctime and the combination of the 2 inputs can be considered
> > as a new event, the current proctime on the machine can be considered the
> > (proc)time reference for output event) or 3 proctimes (the 2 proctimes of
> > the input plus the proctime when the new event was created)?
> > -Similar with the point above, for even time (which I am understanding as
> > the time when the event was created...or do we understand them as a time
> > carry within the event?) - when we join 2 events and output an event that
> > is the result of the join - isn't this a new event detach from the
> > source\input events? ... I would tend to say it is a new event and then
> as
> > for proctime the event time of the new event is the current time when
> this
> > output event was created. If we would accept this hypothesis then we
> would
> > not need the 2 time input fields to be carried/managed implicitly.  If
> > someone needs further down the computation pipeline, then in the query
> they
> > would be selected explicitly from the input stream and projected in some
> > fields to be carried (Select T1.rowtime as FormerTime1, T2.rowtime as
> > FormerTime2, .... JOIN T1, T2...)...but they would not have the timestamp
> > logic
> >
> > ..my 2 cents
> >
> >
> >
> >
> > Dr. Radu Tudoran
> > Staff Research Engineer - Big Data Expert
> > IT R&D Division
> >
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > German Research Center
> > Munich Office
> > Riesstrasse 25, 80992 München
> >
> > E-mail: radu.tudoran@huawei.com
> > Mobile: +49 15209084330
> > Telephone: +49 891588344173
> >
> > HUAWEI TECHNOLOGIES Duesseldorf GmbH
> > Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
> > Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
> > Managing Director: Bo PENG, Qiuen Peng, Shengli Wang
> > Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> > Geschäftsführer: Bo PENG, Qiuen Peng, Shengli Wang
> > This e-mail and its attachments contain confidential information from
> > HUAWEI, which is intended only for the person or entity whose address is
> > listed above. Any use of the information contained herein in any way
> > (including, but not limited to, total or partial disclosure,
> reproduction,
> > or dissemination) by persons other than the intended recipient(s) is
> > prohibited. If you receive this e-mail in error, please notify the sender
> > by phone or email immediately and delete it!
> >
> > -----Original Message-----
> > From: Fabian Hueske [mailto:fhueske@gmail.com]
> > Sent: Tuesday, July 25, 2017 4:22 PM
> > To: dev@flink.apache.org
> > Subject: [DISCUSS] Table API / SQL internal timestamp handling
> >
> > Hi everybody,
> >
> > I'd like to propose and discuss some changes in the way how the Table API
> > / SQL internally handles timestamps.
> >
> > The Table API is implemented on top of the DataStream API. The DataStream
> > API hides timestamps from users in order to ensure that timestamps and
> > watermarks are aligned. Instead users assign timestamps and watermarks
> once
> > (usually at the source or in a subsequent operator) and let the system
> > handle the timestamps from there on. Timestamps are stored in the
> timestamp
> > field of the StreamRecord which is a holder for the user record and the
> > timestamp. DataStream operators that depend on time (time-windows,
> process
> > function, ...) access the timestamp from the StreamRecord.
> >
> > In contrast to the DataSteam API, the Table API and SQL are aware of the
> > semantics of a query. I.e., we can analyze how users access timestamps
> and
> > whether they are modified or not. Another difference is that the
> timestamp
> > must be part of the schema of a table in order to have correct query
> > semantics.
> >
> > The current design to handle timestamps is as follows. The Table API
> > stores timestamps in the timestamp field of the StreamRecord. Therefore,
> > timestamps are detached from the remaining data which is stored in Row
> > objects. Hence, the physical representation of a row is different from
> its
> > logical representation. We introduced a translation layer (RowSchema) to
> > convert logical schema into physical schema. This is necessery for
> > serialization or code generation when the logical plan is translated
> into a
> > physical execution plan. Processing-time timestamps are similarly
> handled.
> > They are not included in the physical schema and looked up when needed.
> > This design also requires that we need to materialize timestamps when
> they
> > are accessed by expressions. Timestamp materialization is done as a
> > pre-optimization step.
> >
> > While thinking about the implementation of the event-time windowed
> > stream-stream join [1] I stumbled over the question which timestamp of
> both
> > input tables to forward. With the current design, we could only have a
> > single timestamp, so keeping both timestamps would not be possible. The
> > choice of the timestamp would need to be specified by the query otherwise
> > it would lack clear semantics. When executing the join, the join operator
> > would need to make sure that no late data is emitted. This would only
> work
> > the operator was able to hold back watermarks [2].
> >
> > With this information in mind, I'd like to discuss the following
> proposal:
> >
> > - We allow more than one event-time timestamp and store them directly in
> > the Row
> > - The query operators ensure that the watermarks are always behind all
> > event-time timestamps. With additional analysis we will be able to
> restrict
> > this to timestamps that are actually used as such.
> > - When a DataStream operator is time-based (e.g., a DataStream
> > time-windows), we inject an operator that copies the timestamp from the
> Row
> > into the StreamRecord.
> > - We try to remove the distinction between logical and physical schema.
> > For event-time timestamps this is because we store them in the Row
> object,
> > for processing-time timestamps, we add a dummy byte field. When
> accessing a
> > field of this type, the code generator injects the code to fetch the
> > timestamps.
> > - We might be able to get around the pre-optimization time
> materialization
> > step.
> > - A join result would be able to keep both timestamps. The watermark
> would
> > be hold back for both so both could be used in subsequent operations.
> >
> > I admit, I haven't thought this completely through.
> > However, the benefits of this design from my point of view are:
> > - encoding of timestamps in Rows means that the logical schema is equal
> to
> > the physical schema
> > - no timestamp materialization
> > - support for multiple timestamps. Otherwise we would need to expose
> > internal restrictions to the user which are hard to explain /
> communicate.
> > - no need to change any public interfaces at the moment.
> >
> > The drawbacks as far as I see them are:
> > - additional payload due to unused timestamp field + possibly the
> > processing-time dummy field
> > - complete rework of the internal timestamp logic (again...)
> >
> > Please let me know what you think,
> > Fabian
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-6233
> > [2] https://issues.apache.org/jira/browse/FLINK-7245
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message