Return-Path: X-Original-To: apmail-flink-user-archive@minotaur.apache.org Delivered-To: apmail-flink-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 56A83193A8 for ; Sat, 19 Mar 2016 11:48:07 +0000 (UTC) Received: (qmail 94455 invoked by uid 500); 19 Mar 2016 11:48:07 -0000 Delivered-To: apmail-flink-user-archive@flink.apache.org Received: (qmail 94365 invoked by uid 500); 19 Mar 2016 11:48:07 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@flink.apache.org Delivered-To: mailing list user@flink.apache.org Received: (qmail 94356 invoked by uid 99); 19 Mar 2016 11:48:07 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 19 Mar 2016 11:48:07 +0000 Received: from [192.168.1.99] (f055213199.adsl.alicedsl.de [78.55.213.199]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 48CFB1A0044 for ; Sat, 19 Mar 2016 11:48:05 +0000 (UTC) Content-Type: text/plain; charset=us-ascii Mime-Version: 1.0 (Mac OS X Mail 9.2 \(3112\)) Subject: Re: Window on stream with timestamps ascending by key From: Aljoscha Krettek In-Reply-To: <56EA851A.7070605@corp.ovh.com> Date: Sat, 19 Mar 2016 12:48:01 +0100 Content-Transfer-Encoding: quoted-printable Message-Id: References: <56EA8132.6030609@corp.ovh.com> <56EA851A.7070605@corp.ovh.com> To: user@flink.apache.org X-Mailer: Apple Mail (2.3112) Hi, what you essentially would require is watermarks that are tracked by = key. Right now this is not possible in Flink. The watermarks, which are = used for keeping track of the timestamps, are global across all keys. Maybe you could implement something that fits your requirements in a = custom operator, i.e. by using DataStream.transform() and writing a = StreamOperator (more specifically a OneInputStreamOperator). Let us know if you need more information. Cheers, Aljoscha > On 17 Mar 2016, at 11:21, Charles-Antoine Mathieu = wrote: >=20 > I wonder how to work with a stream with event timestamps ascending by = key. >=20 > I can have a huge time skew between different keys, for example if I = (re)connect an event producer, > it will send all buffered results possibly from the last days. >=20 > Is it possible to trigger the window computation per key ? >=20 > Example with a window of 5 seconds and the window function being the = count of the timestamps : >=20 > KEY1 1000 > KEY1 1001 > KEY1 1002 > KEY2 1 > KEY2 2 > KEY2 3 > KEY2 4 > KEY2 5 > KEY2 window =3D> 15 > KEY1 1003 > KEY2 6 > KEY2 7 > KEY2 8 > KEY2 9 > KEY2 10 > KEY2 window =3D> 40 > KEY1 1004 > KEY2 11 > KEY2 12 > KEY2 13 > KEY2 14 > KEY2 15 > KEY2 window =3D> 65 > KEY1 1005 > KEY1 window =3D> 5015 > ... >=20 >=20