Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8C3B4200B8F for ; Fri, 30 Sep 2016 14:13:22 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8AD8F160AD9; Fri, 30 Sep 2016 12:13:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A57AF160AC4 for ; Fri, 30 Sep 2016 14:13:21 +0200 (CEST) Received: (qmail 68504 invoked by uid 500); 30 Sep 2016 12:13:20 -0000 Mailing-List: contact issues-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list issues@flink.apache.org Received: (qmail 68491 invoked by uid 99); 30 Sep 2016 12:13:20 -0000 Received: from arcas.apache.org (HELO arcas) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 30 Sep 2016 12:13:20 +0000 Received: from arcas.apache.org (localhost [127.0.0.1]) by arcas (Postfix) with ESMTP id 6BCA72C0057 for ; Fri, 30 Sep 2016 12:13:20 +0000 (UTC) Date: Fri, 30 Sep 2016 12:13:20 +0000 (UTC) From: "Tzu-Li (Gordon) Tai (JIRA)" To: issues@flink.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Comment Edited] (FLINK-4576) Low Watermark Service in JobManager for Streaming Sources MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 archived-at: Fri, 30 Sep 2016 12:13:22 -0000 [ https://issues.apache.org/jira/browse/FLINK-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15535824#comment-15535824 ] Tzu-Li (Gordon) Tai edited comment on FLINK-4576 at 9/30/16 12:12 PM: ---------------------------------------------------------------------- Hi [~aljoscha], I'm struggling a bit on implementation choice of recording and accessing the last emitted watermark of each source operator. When a {{SourceStreamTask}} receives a request to return its last emitted watermark to JobManager, it accesses the last emitted watermark of the head operator (a {{StreamSource}} operator) by calling {{headOperator.getLastEmittedWatermark()}}. I'm currently jumping around two possible ways to achieve this: 1. Record the last emitted watermark within {{SourceFunction.SourceContext}} s, and expose it through a getter method. {{headOperator.getLastEmittedWatermark()}} simply uses this getter method on the source context. 2. Extend the {{o.a.f.streaming.api.operators.Output}} interface to have a new {{getLastEmittedWatermark}} method, and let {{headOperator.getLastEmittedWatermark()}} call that instead. I'm not sure which would be a better approach? Another problem I'd like to confirm is that the low watermark service should only be relevant to watermarks emitted from {{StreamSource}} operators, and not the watermarks emitted at the end of the whole operator chain of a {{SourceStreamTask}}, correct? I'm thinking about this because I'm a bit unsure with how user-supplied watermark assigners should work with the JM low watermark service. I've completed the communication between JM / TM / tasks, so I'm down to this last bit and can open the PR afterwards. Any feedback will be helpful! was (Author: tzulitai): Hi [~aljoscha], I'm struggling a bit on implementation choice of recording and accessing the last emitted watermark of each source operator. When a {{SourceStreamTask}} receives a request to return its last emitted watermark to JobManager, it accesses the last emitted watermark of the head operator (a {{StreamSource}} operator) by calling {{headOperator.getLastEmittedWatermark()}}. I'm currently jumping around two possible ways to achieve this: 1. Record the last emitted watermark within {{SourceFunction.SourceContext}} s, and expose it through a getter method. {{headOperator.getLastEmittedWatermark()}} simply uses this getter method on the source context. 2. Extend the `o.a.f.streaming.api.operators.Output` interface to have a new {{getLastEmittedWatermark}} method, and let {{headOperator.getLastEmittedWatermark()}} call that instead. I'm not sure which would be a better approach? Another problem I'd like to confirm is that the low watermark service should only be relevant to watermarks emitted from {{StreamSource}} operators, and not the watermarks emitted at the end of the whole operator chain of a {{SourceStreamTask}}, correct? I'm thinking about this because I'm a bit unsure with how user-supplied watermark assigners should work with the JM low watermark service. I've completed the communication between JM / TM / tasks, so I'm down to this last bit and can open the PR afterwards. Any feedback will be helpful! > Low Watermark Service in JobManager for Streaming Sources > --------------------------------------------------------- > > Key: FLINK-4576 > URL: https://issues.apache.org/jira/browse/FLINK-4576 > Project: Flink > Issue Type: New Feature > Components: JobManager, Streaming, TaskManager > Reporter: Tzu-Li (Gordon) Tai > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.2.0 > > > As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a low watermark service in the JobManager to support transparent resharding / partition discovery for our Kafka and Kinesis consumers (and any future streaming connectors in general for which the external system may elastically scale up and down independently of the parallelism of sources in Flink). The main idea is to let source subtasks that don't emit their own watermarks (because they currently don't have data partitions to consume) emit the low watermark across all subtasks, instead of simply emitting a Long.MAX_VALUE watermark and forbidding them to be assigned partitions in the future. > The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}} will be added to execution graphs, periodically triggering only the source vertices with a {{RetrieveLowWatermark}} message. The tasks reply to the JobManager through the actor gateway (or a new interface after FLINK-4456 gets merged) with a {{ReplyLowWatermark}} message. When the coordinator collects all low watermarks for a particular source vertex and determines the aggregated low watermark for this round (accounting only values that are larger than the aggregated low watermark of the last round), it sends a {{NotifyNewLowWatermark}} message to the source vertex's tasks. > The messages will only be relevant to tasks that implement an internal {{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} should implement {{LowWatermarkCooperatingTask}}. > Source functions should implement a public {{LowWatermarkListener}} interface if they wish to get notified of the aggregated low watermarks across subtasks. Connectors like the Kinesis consumer can choose to emit this watermark if the subtask currently does not have any shards, so that downstream operators may still properly advance time windows (implementation for this is tracked as a separate issue). > Overall, the service will include - > New messages between JobManager <-> TaskManager: > {{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}} > {{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}} > {{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark, timestamp)}} > New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime > New public interface {{LowWatermarkListener}} in flink-streaming-java > Might also need to extend {{SourceFunction.SourceContext}} to support retrieving the current low watermark of sources. > Any feedback for this is appreciated! -- This message was sent by Atlassian JIRA (v6.3.4#6332)