flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xingcan Cui (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks
Date Thu, 27 Jul 2017 14:40:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16103282#comment-16103282

Xingcan Cui commented on FLINK-7245:

Hi [~fhueske], it took me a little time to comprehend how the rowtime works in current Table/SQL
API. To continue the work, I'd like to share more of my understandings and questions that
may be a little *detailed*. I wonder if you could help confirm or answer them.
Suppose there's a class {{Order(a:Long, b:String)}}.
# When registering a rowtime with the API, e.g., {{tEnv.registerDataStream("OrderA", orderA,
'a.rowtime, 'b)}}, I think the current logic should be that field {{a}} is shaded in the physical
schema and an extra indicator about the rowtime field is added to the logical schema. I find
the following snippet in {{StreamTableEnvironment.extractRowtime()}}.
if (mappedIdx < 0) {
         throw new TableException(
            s"The rowtime attribute can only replace a valid field. " +
            s"${origName.getOrElse(name)} is not a field of type $streamType.")
However, when I tried {{tEnv.registerDataStream("OrderA", orderA, 'a, 'b, 'c.rowtime)}}, it
can also be successfully registered with a field {{c}} added. I know whether allowing the
extra field or not both make sense, but is still confused about that.
# When translating a SQL, the rowtime field is omitted by the initial "{{Order}} to {{CRow}}
# The planner checks if the rowtime field will be used in a SQL. If the result turns to be
true, this special field will be set with the {{ctx.timestamp()}} method in the following
operator with a generated function.
#  The user should manually assign watermarks before registering the datastream. Now that
the rowtime field will be taken as a common field, shall we consider adding a configurable
{{DefaultWatermarkAssigner}} if it is not provided?

Besides, I found a minor issue in the SQL.html document. It uses an identical name "rowtime"
for the field ( {{tableEnv.registerDataStream("Orders", ds, "user, product, amount, proctime.proctime,
rowtime.rowtime")}}). Readers may be confused whether they should use the "field name" or
the "rowtime" keyword in the SQL.

Thanks, Xingcan

> Enhance the operators to support holding back watermarks
> --------------------------------------------------------
>                 Key: FLINK-7245
>                 URL: https://issues.apache.org/jira/browse/FLINK-7245
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Xingcan Cui
>            Assignee: Xingcan Cui
> Currently the watermarks are applied and emitted by the {{AbstractStreamOperator}} instantly.

> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
> 	if (timeServiceManager != null) {
> 		timeServiceManager.advanceWatermark(mark);
> 	}
> 	output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these watermarks (e.g.,
join or aggregate results) may be regarded as delayed by the downstream operators since their
timestamps must be less than or equal to the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back watermarks,
to current operators. These watermarks should be blocked and stored by the operators until
all the corresponding new generated results are emitted.

This message was sent by Atlassian JIRA

View raw message