flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Fabian Hueske (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-7245) Enhance the operators to support holding back watermarks
Date Wed, 26 Jul 2017 11:28:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7245?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16101562#comment-16101562

Fabian Hueske commented on FLINK-7245:

Great, thanks for the response [~xccui]!

Ad 2) No, not really for mentoring. I envision the new operator as a skeleton that automatically
takes care of delaying the watermarks. In order to do that, the custom user code would need
to report the smallest timestamps which will be emitted in the future. I called the method
to report these timestamps "hook", probably not the most appropriate term.

Ad 3) Hmm, I was thinking about this a bit. What I said before about the {{OperatorStateStore.getUnionListState()}}
would not work. We need to checkpoint the smallest future timestamp for each key. Since keys
can be assigned to different operators in case of rescaling or recovery, we need to ensure
that this information is kept together with the keys. However, operators have no access to
keys but only to key groups (key groups are the unit of key distribution in Flink). Hence,
we need to keep PriorityQueues for each key group and checkpoint those together with the keygroup.
If a key group is moved to a different operator, the priority queue will be move there as
well. I talked to [~aljoscha] and we can do this only on the lowest level of operator abstraction
which is the {{AbstractStreamOperator}}. Have a look at the {{snapshotState()}} method which
iterates over all key groups to snapshot timer information. I have to admit, I'm not familiar
with the source code at this level.

Ad 4) Each operator keeps track of all watermarks it receives from all input channels (i.e.,
each parallel instance of each input operator). For each input channel, it keeps maximum (i.e.,
latest received) watermark and computes its own watermark (which is emitted via all outgoing
channels) as the minimum of those maximum input channel watermarks. So, each operator has
only a single watermark (i.e., not a watermark per key). However, our custom code will have
to operate on multiple keys and keys are strictly separated from each other such that no key
is aware of the lowest timestamp of the others.

Ad 5) That's a good point. We can make that configurable as well. I thought, emitting a watermark
when a watermark is received would be a good default.

Cheers, Fabian

> Enhance the operators to support holding back watermarks
> --------------------------------------------------------
>                 Key: FLINK-7245
>                 URL: https://issues.apache.org/jira/browse/FLINK-7245
>             Project: Flink
>          Issue Type: New Feature
>          Components: DataStream API
>            Reporter: Xingcan Cui
>            Assignee: Xingcan Cui
> Currently the watermarks are applied and emitted by the {{AbstractStreamOperator}} instantly.

> {code:java}
> public void processWatermark(Watermark mark) throws Exception {
> 	if (timeServiceManager != null) {
> 		timeServiceManager.advanceWatermark(mark);
> 	}
> 	output.emitWatermark(mark);
> }
> {code}
> Some calculation results (with timestamp fields) triggered by these watermarks (e.g.,
join or aggregate results) may be regarded as delayed by the downstream operators since their
timestamps must be less than or equal to the corresponding triggers. 
> This issue aims to add another "working mode", which supports holding back watermarks,
to current operators. These watermarks should be blocked and stored by the operators until
all the corresponding new generated results are emitted.

This message was sent by Atlassian JIRA

View raw message