flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-4576) Low Watermark Service in JobManager for Streaming Sources
Date Fri, 04 Nov 2016 09:01:20 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15635719#comment-15635719
] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4576 at 11/4/16 8:59 AM:
---------------------------------------------------------------------

Yes, that's true. In general, if all input channels of an operator has received the "watermark
idle" message, then itself should propagate a "watermark idle" downstream. In the case of
an operator which is in the middle of the operator chain of a task, it should just pass on
the "watermark idle" downstream. In the case of a head operator of a chain which has multiple
input channels, it only passes a "watermark idle" message when all of its input channels has
received the message; as long as one of the input channels is not idle, the "watermark idle"
message will not be passed further downstream.

About that, I could use some help describing how the code works regarding elements flowing
through an operator chain of a single task. Specifically, if a previous operator in the chain
emits an watermark, this watermark will be used to call the {{AbstractStreamOperator#processWatermark()}}
method of the operator directly afterwards, correct? I'm struggling a bit trying to understand
how the {{ChainedOutput}} works in {{OperatorChain}}.


was (Author: tzulitai):
Yes, that's true. In general, if all input channels of an operator has received the "watermark
idle" message, then itself should propagate a "watermark idle" downstream. In the case of
an operator which is in the middle of the operator chain of a task, it should just pass on
the "watermark idle" downstream. In the case of a head operator of a chain which has multiple
input channels, it only passes a "watermark idle" message when all of its input channels has
received the message.

About that, I could use some help describing how the code works regarding elements flowing
through an operator chain of a single task. Specifically, if a previous operator in the chain
emits an watermark, this watermark will be used to call the {{AbstractStreamOperator#processWatermark()}}
method of the operator directly afterwards, correct? I'm struggling a bit trying to understand
how the {{ChainedOutput}} works in {{OperatorChain}}.

> Low Watermark Service in JobManager for Streaming Sources
> ---------------------------------------------------------
>
>                 Key: FLINK-4576
>                 URL: https://issues.apache.org/jira/browse/FLINK-4576
>             Project: Flink
>          Issue Type: New Feature
>          Components: JobManager, Streaming, TaskManager
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.2.0
>
>
> As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a low watermark
service in the JobManager to support transparent resharding / partition discovery for our
Kafka and Kinesis consumers (and any future streaming connectors in general for which the
external system may elastically scale up and down independently of the parallelism of sources
in Flink). The main idea is to let source subtasks that don't emit their own watermarks (because
they currently don't have data partitions to consume) emit the low watermark across all subtasks,
instead of simply emitting a Long.MAX_VALUE watermark and forbidding them to be assigned partitions
in the future.
> The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}} will be
added to execution graphs, periodically triggering only the source vertices with a {{RetrieveLowWatermark}}
message. The tasks reply to the JobManager through the actor gateway (or a new interface after
FLINK-4456 gets merged) with a {{ReplyLowWatermark}} message. When the coordinator collects
all low watermarks for a particular source vertex and determines the aggregated low watermark
for this round (accounting only values that are larger than the aggregated low watermark of
the last round), it sends a {{NotifyNewLowWatermark}} message to the source vertex's tasks.
> The messages will only be relevant to tasks that implement an internal {{LowWatermarkCooperatingTask}}
interface. For now, only {{SourceStreamTask}} should implement {{LowWatermarkCooperatingTask}}.
> Source functions should implement a public {{LowWatermarkListener}} interface if they
wish to get notified of the aggregated low watermarks across subtasks. Connectors like the
Kinesis consumer can choose to emit this watermark if the subtask currently does not have
any shards, so that downstream operators may still properly advance time windows (implementation
for this is tracked as a separate issue).
> Overall, the service will include -
> New messages between JobManager <-> TaskManager:
> {{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}}
> {{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}}
> {{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark, timestamp)}}
> New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime
> New public interface {{LowWatermarkListener}} in flink-streaming-java
> Might also need to extend {{SourceFunction.SourceContext}} to support retrieving the
current low watermark of sources.
> Any feedback for this is appreciated!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message