Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id A9C071842F for ; Fri, 8 May 2015 08:16:42 +0000 (UTC) Received: (qmail 92414 invoked by uid 500); 8 May 2015 08:16:42 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 92353 invoked by uid 500); 8 May 2015 08:16:42 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 92342 invoked by uid 99); 8 May 2015 08:16:42 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 May 2015 08:16:42 +0000 Received: from mail-vn0-f45.google.com (mail-vn0-f45.google.com [209.85.216.45]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 3BB2C1A046D for ; Fri, 8 May 2015 08:16:42 +0000 (UTC) Received: by vnbf62 with SMTP id f62so4905377vnb.3 for ; Fri, 08 May 2015 01:16:41 -0700 (PDT) MIME-Version: 1.0 X-Received: by 10.52.170.238 with SMTP id ap14mr2107949vdc.92.1431073001273; Fri, 08 May 2015 01:16:41 -0700 (PDT) Received: by 10.52.36.108 with HTTP; Fri, 8 May 2015 01:16:41 -0700 (PDT) Date: Fri, 8 May 2015 10:16:41 +0200 Message-ID: Subject: [DISCUSS] Behaviour of Streaming Sources From: Aljoscha Krettek To: dev@flink.apache.org Content-Type: text/plain; charset=UTF-8 Hi, in the process of reworking the Streaming Operator model I'm also reworking the sources in order to get rid of the loop in each source. Right now, the interface for sources (SourceFunction) has one method: run(). This is called when the source starts and can just output elements at any time using the Collector interface. This does not give the Task that runs the source a lot of control in suspending operation for performing checkpoints or some such thing. I thought about changing the interface to this: interface SourceFunction { boolean reachedEnd(); T next(); } This is similar to the batch API and also to what Stephan proposes in his pull request. I think this will not work for streaming because sources might not have new elements to emit at the moment but might have something to emit in the future. This is problematic because streaming topologies are usually running indefinitely. In that case, the reachedEnd() and next() would have to be blocking (until a new element arrives). This again does not give the task the power to suspend operation at will. I propose a three function interface: interface SourceFunction { boolean reachedEnd(): boolean hasNext(): T next(); } where the contract for the source is as follows: - reachedEnd() == true => stop the source - hasNext() == true => call next() to retrieve next element - hasNext() == false => call again at some later point - next() => retrieve next element, throw exception if no element available I thought about allowing next() to return NULL to signal that no element is available at the moment. This will not work because a source might want to return NULL as an element. What do you think? Any other ideas about solving this? Cheers, Aljoscha