Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F281A187EE for ; Fri, 17 Jul 2015 15:42:06 +0000 (UTC) Received: (qmail 80461 invoked by uid 500); 17 Jul 2015 15:42:03 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 80388 invoked by uid 500); 17 Jul 2015 15:42:03 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 80378 invoked by uid 99); 17 Jul 2015 15:42:03 -0000 Received: from Unknown (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Jul 2015 15:42:03 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 5B9801A71CD for ; Fri, 17 Jul 2015 15:42:03 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.011 X-Spam-Level: X-Spam-Status: No, score=-0.011 tagged_above=-999 required=6.31 tests=[SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01] autolearn=disabled Received: from mx1-us-east.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id Sm_wOwmrpdxS for ; Fri, 17 Jul 2015 15:41:57 +0000 (UTC) Received: from mailout1.informatik.hu-berlin.de (mailout1.informatik.hu-berlin.de [141.20.20.101]) by mx1-us-east.apache.org (ASF Mail Server at mx1-us-east.apache.org) with ESMTPS id 4D22142959 for ; Fri, 17 Jul 2015 15:41:57 +0000 (UTC) Received: from mailbox.informatik.hu-berlin.de (mailbox [141.20.20.63]) by mail.informatik.hu-berlin.de (8.14.7/8.14.7/INF-2.0-MA-SOLARIS-2.10-25) with ESMTP id t6HFcgDj028238 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES256-GCM-SHA384 bits=256 verify=OK) for ; Fri, 17 Jul 2015 17:38:43 +0200 (MEST) Received: from [141.20.27.42] ([141.20.27.42]) (authenticated bits=0) by mailbox.informatik.hu-berlin.de (8.14.7/8.14.7/INF-2.0-MA-SOLARIS-2.10-AUTH-26-465-587) with ESMTP id t6HFcgiY028235 (version=TLSv1/SSLv3 cipher=DHE-RSA-AES128-SHA bits=128 verify=NO) for ; Fri, 17 Jul 2015 17:38:42 +0200 (MEST) Message-ID: <55A9219B.1020502@informatik.hu-berlin.de> Date: Fri, 17 Jul 2015 17:39:07 +0200 From: "Matthias J. Sax" User-Agent: Mozilla/5.0 (X11; Linux x86_64; rv:31.0) Gecko/20100101 Icedove/31.7.0 MIME-Version: 1.0 To: user@flink.apache.org Subject: Re: Streaming window : count with timeout ? References: In-Reply-To: Content-Type: multipart/signed; micalg=pgp-sha256; protocol="application/pgp-signature"; boundary="cb4sqXOwTHcRmb24j6MFDH4buUknNf5jj" X-Virus-Scanned: clamav-milter 0.98.4 at mailbox X-Virus-Status: Clean X-Greylist: Sender succeeded STARTTLS authentication, not delayed by milter-greylist-4.5.1 (mail.informatik.hu-berlin.de [141.20.20.50]); Fri, 17 Jul 2015 17:38:44 +0200 (MEST) This is an OpenPGP/MIME signed message (RFC 4880 and 3156) --cb4sqXOwTHcRmb24j6MFDH4buUknNf5jj Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: quoted-printable You can implement an custom window policy (I guess this should be flexible enough for your case). See documentation: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_gui= de.html#policy-based-windowing If you have further question after reading, just go ahead :) -Matthias On 07/17/2015 05:30 PM, LINZ, Arnaud wrote: > Hello, >=20 > =20 >=20 > The data in my stream have a timestamp that may be slightly out of > order, but I need to process the data in the proper order. To do this, = I > use a windowing function and sort the items in a flatMap. >=20 > =20 >=20 > However, the source may sometimes send data in =E2=80=9Cbulk batches=E2= =80=9D and > sometimes =E2=80=9Con the fly=E2=80=9D. If I choose a time window, it w= ill suits well > the =E2=80=9Con the fly=E2=80=9D behavior but when processing bulks I m= ay have too many > elements to sort in the time interval specified. >=20 > =20 >=20 > If I choose a =E2=80=9Ccount.of=E2=80=9D window, I will process batches= efficiently but > I may need to wait forever in the =E2=80=9Con the fly=E2=80=9D behavior= until the count > is reached. >=20 > =20 >=20 > What I need is then a =E2=80=9Ccount window with time out=E2=80=9D or a= =E2=80=9Ctime window > with max element=E2=80=9D : I would like to specify both a max count an= d a max > time to fit the source behavior. >=20 > =20 >=20 > Do you have any idea how I can do that ? >=20 > =20 >=20 > Best regards, >=20 > Arnaud >=20 > =20 >=20 >=20 > -----------------------------------------------------------------------= - >=20 > L'int=C3=A9grit=C3=A9 de ce message n'=C3=A9tant pas assur=C3=A9e sur i= nternet, la soci=C3=A9t=C3=A9 > exp=C3=A9ditrice ne peut =C3=AAtre tenue responsable de son contenu ni = de ses > pi=C3=A8ces jointes. Toute utilisation ou diffusion non autoris=C3=A9e = est > interdite. Si vous n'=C3=AAtes pas destinataire de ce message, merci de= le > d=C3=A9truire et d'avertir l'exp=C3=A9diteur. >=20 > The integrity of this message cannot be guaranteed on the Internet. The= > company that sent this message cannot therefore be held liable for its > content nor attachments. Any unauthorized use or dissemination is > prohibited. If you are not the intended recipient of this message, then= > please delete it and notify the sender. --cb4sqXOwTHcRmb24j6MFDH4buUknNf5jj Content-Type: application/pgp-signature; name="signature.asc" Content-Description: OpenPGP digital signature Content-Disposition: attachment; filename="signature.asc" -----BEGIN PGP SIGNATURE----- Version: GnuPG v2 iQIcBAEBCAAGBQJVqSGbAAoJEBXkotPFErDWXn0P/3U8ss8s+wO7TMIbYRDrLHAg 90V6PKKFSFyK3r1Sh+mrDOmAUIzzpL5VeVnkKuZ5ydyzM0X7VKkYdTeV2zOE5n0w D3twfMI/Ox7q1biMO09FtJFmx/knQ54iiWugQz5ik08Q/mBLgM0JZbnlgl8XF3hd yiV2TTbVMh0ENwm6RCPbK+E+bZtymCHvWD6kMk6JR9wF8lGjL6LU/ZxcwDrfEaeQ hPATc2v5G+a4SyX8dYCncDkY1LjBJC1tdNfAOmiAoECzIkVXw1jQYuFgqRfNkuG3 X7ZUnkiWuDOl24PC6Cs2m2MMpcrvejcmpAkkOC0NNYf2xIODQEJ3h22QrgIEjHrK z/rmGI3H0JrDqupQlaGZGsHEovEkmgPZJIzEw/WZDtvDOdz83R7LxCIY2b92p6ae lWb+eLAUCfazmyJnfWv2VFrR09BIO72PG8BQZPtUUrTBDudXHjRm8tk/3a1mwYHE jBoaVHIvhS9ALLCClvfBLjXtp3BZj/030QkdNMiRwSt/H5/KoiN1FpB3vfKmfLm0 Ko35zRqOp2hxrO5L7JQqm3Hj4aybzg8qI9BgAFKDahSqSv9p10QNC/nL1CXGTlT8 m5qNc1l6SjnSTcR6dAq78pfjc0l2peoy7TXpxrtvmHexB7j/HKUnG/IPljkRL/Ev WX7E1c126zCaVXQ55D50 =HXmm -----END PGP SIGNATURE----- --cb4sqXOwTHcRmb24j6MFDH4buUknNf5jj--