Return-Path: X-Original-To: apmail-flink-dev-archive@www.apache.org Delivered-To: apmail-flink-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 07AF918543 for ; Fri, 8 May 2015 08:49:56 +0000 (UTC) Received: (qmail 11762 invoked by uid 500); 8 May 2015 08:49:55 -0000 Delivered-To: apmail-flink-dev-archive@flink.apache.org Received: (qmail 11705 invoked by uid 500); 8 May 2015 08:49:55 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 11690 invoked by uid 99); 8 May 2015 08:49:55 -0000 Received: from Unknown (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 May 2015 08:49:55 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id EEF771827D4 for ; Fri, 8 May 2015 08:49:54 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -2.311 X-Spam-Level: X-Spam-Status: No, score=-2.311 tagged_above=-999 required=6.31 tests=[RCVD_IN_DNSWL_MED=-2.3, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-eu-west.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 4nsmQwDSsUr1 for ; Fri, 8 May 2015 08:49:53 +0000 (UTC) Received: from mailout1.informatik.hu-berlin.de (mailout1.informatik.hu-berlin.de [141.20.20.101]) by mx1-eu-west.apache.org (ASF Mail Server at mx1-eu-west.apache.org) with ESMTPS id 9E257215CA for ; Fri, 8 May 2015 08:49:52 +0000 (UTC) Received: from mailbox.informatik.hu-berlin.de (mailbox [141.20.20.63]) by mail.informatik.hu-berlin.de (8.14.7/8.14.7/INF-2.0-MA-SOLARIS-2.10-25) with ESMTP id t488nohD012888 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-GCM-SHA384 bits=256 verify=OK) for ; Fri, 8 May 2015 10:49:51 +0200 (MEST) Received: from [141.20.27.42] (localhost [127.0.0.1]) (authenticated bits=0) by mailbox.informatik.hu-berlin.de (8.14.7/8.14.7/INF-2.0-MA-SOLARIS-2.10-AUTH-26-465-587) with ESMTP id t488nnwh012881 for ; Fri, 8 May 2015 10:49:49 +0200 (MEST) Message-ID: <554C780C.1010107@informatik.hu-berlin.de> Date: Fri, 08 May 2015 10:47:08 +0200 From: "Matthias J. Sax" User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:31.0) Gecko/20100101 Icedove/31.6.0 MIME-Version: 1.0 To: dev@flink.apache.org Subject: Re: [DISCUSS] Behaviour of Streaming Sources References: In-Reply-To: Content-Type: multipart/signed; micalg=pgp-sha256; protocol="application/pgp-signature"; boundary="xq2GWjT4EAQv2RLwIJwFBhOtUdSxHXiP2" X-Virus-Scanned: clamav-milter 0.98.4 at mailbox X-Virus-Status: Clean X-Greylist: Sender succeeded STARTTLS authentication, not delayed by milter-greylist-4.5.1 (mail.informatik.hu-berlin.de [141.20.20.50]); Fri, 08 May 2015 10:49:51 +0200 (MEST) --xq2GWjT4EAQv2RLwIJwFBhOtUdSxHXiP2 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable Did you consider the Storm way to handle this? Storm offers a method "void next()" that uses a collector object to emit new tuples. Using this interface, "next()" can loop internally as long as tuples are available and return if there is (currently) no input. What I have seen, people tend to emit a single tuple an leave next() immediately, because Storm call next() in an infinite loop anyway. -> You could force the UDF to return each time, be disallowing consecutive calls to Collector.out(...). If next() is called by the system and it returns, it is easy to check if the out(..) method of the collector object was called at least once. If the recored was emitted, Storm "sleeps" for a while before calling next() again, to avoid busy waiting. The sleeping time is increased for consecutive "empty" next() calls and reset the first time next() emits records again. I like this interface, because it's very simple and would prefer it over an interface with many methods. -Matthias On 05/08/2015 10:16 AM, Aljoscha Krettek wrote: > Hi, > in the process of reworking the Streaming Operator model I'm also > reworking the sources in order to get rid of the loop in each source. > Right now, the interface for sources (SourceFunction) has one method: > run(). This is called when the source starts and can just output > elements at any time using the Collector interface. This does not give > the Task that runs the source a lot of control in suspending operation > for performing checkpoints or some such thing. >=20 > I thought about changing the interface to this: >=20 > interface SourceFunction { > boolean reachedEnd(); > T next(); > } >=20 > This is similar to the batch API and also to what Stephan proposes in > his pull request. I think this will not work for streaming because > sources might not have new elements to emit at the moment but might > have something to emit in the future. This is problematic because > streaming topologies are usually running indefinitely. In that case, > the reachedEnd() and next() would have to be blocking (until a new > element arrives). This again does not give the task the power to > suspend operation at will. >=20 > I propose a three function interface: >=20 > interface SourceFunction { > boolean reachedEnd(): > boolean hasNext(): > T next(); > } >=20 > where the contract for the source is as follows: > - reachedEnd() =3D=3D true =3D> stop the source > - hasNext() =3D=3D true =3D> call next() to retrieve next element > - hasNext() =3D=3D false =3D> call again at some later point > - next() =3D> retrieve next element, throw exception if no element ava= ilable >=20 > I thought about allowing next() to return NULL to signal that no > element is available at the moment. This will not work because a > source might want to return NULL as an element. >=20 > What do you think? Any other ideas about solving this? >=20 > Cheers, > Aljoscha >=20 --xq2GWjT4EAQv2RLwIJwFBhOtUdSxHXiP2 Content-Type: application/pgp-signature; name="signature.asc" Content-Description: OpenPGP digital signature Content-Disposition: attachment; filename="signature.asc" -----BEGIN PGP SIGNATURE----- Version: GnuPG v2 iQIcBAEBCAAGBQJVTHgOAAoJEBXkotPFErDWeC4QAMFzGg7DbHsz8dz/YWHKbIbM GfoP+blVd2InShOMBToy8UVN+TI41ArxubcDoCslyjNphGhAjX++WeamwAYw4+yi +gtiXRiIF3HklYyBXD6HOmNXwiwjZYdCYiFXHml8qdtGz1tZObi26IjxlZnC58Uh TkzWP36OyyQLmHJQDzpG/47VoWmcoVCeepaFprglLZT8FEvqq1PTa1m5B9vK+2c2 xmq7jGCbepQpwjjyf36TNRRrG7Nmr+S246tIXInjvK3/49uKHYAvr0cdZlzB+Ft8 LRU/Rhqw8rpY/RvYh+B5Pd49+EH8iX264MhsA6V66NPAQ7asWzwAoDjvZBkuAfCD MgIQD36vexHsHT7HiSufd99Ex0exfx4bhyQfzlSC2jq3HVNKmjEwizH2VMaKxSYm pTmTy696FwpH1adZ2Pp01KNkTksihTws+eSPFK+QeinWwt2iwulsYjpqq3MQ7lKe ZaMHSqSYLmo6OuF/Qdw16LwtNz06YeDJE7r2o2H6py21AS+o7Gi5YAIe16Y8UQav SSmGrgcGuJqidLg/egM2mVD8W8iEPEBp06wDk0JujmrattftAnAiYTvVBTIazSxd yu9FVVw7k5zVjMHtzPHVMxryR7KS9MwO7z5BQ5zNAz54yevFdxIOOo7oC85yAEPJ roIkz8q+4OxIRnbeCtU2 =vdao -----END PGP SIGNATURE----- --xq2GWjT4EAQv2RLwIJwFBhOtUdSxHXiP2--