flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-5017) Introduce WatermarkStatus stream element to allow for temporarily idle streaming sources
Date Mon, 07 Nov 2016 18:27:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15644949#comment-15644949
] 

Tzu-Li (Gordon) Tai edited comment on FLINK-5017 at 11/7/16 6:27 PM:
---------------------------------------------------------------------

Hi [~aljoscha],

To ease discussion a bit, I've just pushed some of my current changes regarding the IDLE watermark
forwarding logic to a local branch: https://github.com/tzulitai/flink/tree/FLINK-5017. It's
WIP code which I haven't proofread yet, so might be a bit sloppy / won't build yet as the
output / operator interface implementations aren't all updated yet.

{quote}
The only place where it is relevant is in the watermark advancement code in the input processors.
If all inputs are IDLE, then the input processor needs to bypass the chain and directly output
to all real outgoing connections, which in itself is quite tricky.
{quote}

Concerning this: the approach I'm taking right now, is that operator's that are IDLE are not
supposed to receive any records or watermarks at all, before they are triggered back to ACTIVE
(using WatermarkStatus elements). It's basically the sources' and intermediate timestamp extractors'
responsibility to ensure this. 
So, regarding "If all inputs are IDLE, then the input processor needs to bypass the chain
and directly output to all real outgoing connections", I'm not really sure if this is necessary.
Once the whole input processor is IDLE, a IDLE status will be propagated down the chain, which
eventually is forwarded to a real outgoing connection (output of last operator in chain, correct?).
No records or watermarks will be propagated down the chain while the input processor remains
IDLE. Once the input processor is ACTIVE again, it propagates an ACTIVE down the chain, and
records and watermarks are allowed to flow again.

Do you think the above makes sense? The WIP code implements this approach, at {{StreamInputProcessor}}
/ {{StreamTwoInputProcessor}} / {{AbstractStreamOperator}}.


was (Author: tzulitai):
Hi [~aljoscha],

To ease discussion a bit, I've just pushed some of my current changes regarding the IDLE watermark
forwarding logic to a local branch: https://github.com/tzulitai/flink/tree/FLINK-5017. It's
WIP code which I haven't proofread yet, so might be a bit sloppy / won't build yet as the
output / operator interface implementations aren't all updated yet.

{quote}
The only place where it is relevant is in the watermark advancement code in the input processors.
If all inputs are IDLE, then the input processor needs to bypass the chain and directly output
to all real outgoing connections, which in itself is quite tricky.
{quote}

Concerning this: the approach I'm taking right now, is that operator's that are IDLE are not
supposed to receive any records or watermarks at all, before they are triggered back to ACTIVE
(using WatermarkStatus elements). It's basically the sources' and intermediate timestamp extractors'
responsibility to ensure this. 
So, regarding "If all inputs are IDLE, then the input processor needs to bypass the chain
and directly output to all real outgoing connections", I'm not really sure if this is necessary.
Once the whole input processor is IDLE, a IDLE status will be propagated down the chain, which
eventually is forwarded to a real outgoing connection (output of last operator in chain, correct?).
No records or watermarks will be propagated down the chain while the input processor remains
IDLE. Once the input processor is ACTIVE again, it propagates an ACTIVE down the chain, and
records and watermarks are allowed to flow again.

Do you think the above makes sense? The WIP code implements this approach.

> Introduce WatermarkStatus stream element to allow for temporarily idle streaming sources
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-5017
>                 URL: https://issues.apache.org/jira/browse/FLINK-5017
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.2.0
>
>
> A {{WatermarkStatus}} element informs receiving operators whether or not they should
continue to expect watermarks from the sending operator. There are 2 kinds of status, namely
{{IDLE}} and {{ACTIVE}}. Watermark status elements are generated at the sources, and may be
propagated through the operators of the topology using {{Output#emitWatermarkStatus(WatermarkStatus)}}.
> Sources and downstream operators should emit either of the status elements once it changes
between "watermark-idle" and "watermark-active" states.
> A source is considered "watermark-idle" if it will not emit records for an indefinite
amount of time. This is the case, for example, for Flink's Kafka Consumer, where sources might
initially have no assigned partitions to read from, or no records can be read from the assigned
partitions. Once the source detects that it will resume emitting data, it is considered "watermark-active".
> Downstream operators with multiple inputs (ex. head operators of a {{OneInputStreamTask}}
or {{TwoInputStreamTask}}) should not wait for watermarks from an upstream operator that is
"watermark-idle" when deciding whether or not to advance the operator's current watermark.
When a downstream operator determines that all upstream operators are "watermark-idle" (i.e.
when all input channels have received the watermark idle status element), then the operator
is considered to also be "watermark-idle", as it will temporarily be unable to advance its
own watermark. This is always the case for operators that only read from a single upstream
operator. Once an operator is considered "watermark-idle", it should itself forward its idle
status to inform downstream operators. The operator is considered to be back to "watermark-active"
as soon as at least one of its upstream operators resume to be "watermark-active" (i.e. when
at least one input channel receives the watermark active status element), and should also
forward its active status to inform downstream operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message