Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BCB66200D33 for ; Wed, 8 Nov 2017 18:12:49 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BB060160BDA; Wed, 8 Nov 2017 17:12:49 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id DA5B91609E0 for ; Wed, 8 Nov 2017 18:12:48 +0100 (CET) Received: (qmail 21924 invoked by uid 500); 8 Nov 2017 17:12:47 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 21914 invoked by uid 99); 8 Nov 2017 17:12:47 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Nov 2017 17:12:47 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 0D91DD20B8 for ; Wed, 8 Nov 2017 17:12:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -0.401 X-Spam-Level: X-Spam-Status: No, score=-0.401 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-2.8, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 72a7DcbIxL94 for ; Wed, 8 Nov 2017 17:12:44 +0000 (UTC) Received: from mail-pf0-f176.google.com (mail-pf0-f176.google.com [209.85.192.176]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 67B1860F69 for ; Wed, 8 Nov 2017 17:12:44 +0000 (UTC) Received: by mail-pf0-f176.google.com with SMTP id b79so2172670pfk.5 for ; Wed, 08 Nov 2017 09:12:44 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=rcOeEv7DyNqSKBMi2bGhQjDeg50MqecrqIFobN9AjLQ=; b=EQPcXtVMNJf4kiBapTjRTZF4K1FQhoj55Yp1ysTywsedlHTqnXaInLsYXA3YpL4lmw q+qEG/1+N+5tgpXRPpxQsb6TwGyKCBmnuJkMLoFrJuJupzozlla+dHM/vHl4tymmLnPo cpeBPDGIP8juNqm0ZLeyANIZ+WY/M4kn6fdh/IrhgIDCjET/dnl1xiNtC9BGMB0wB08n vg/JLVDa+DP1dxJx1o0oLD5KOWaPjj4RJnwUYbHBNenndAASuZqj+OnBSforWS5vAjNB jTXW9kNXP4qxBKgRJFItYH6YsAIcVUdGOuUmVdXXKAkiTF0/lZQ5pKPaqVnzgBGqk6h7 gwrg== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=rcOeEv7DyNqSKBMi2bGhQjDeg50MqecrqIFobN9AjLQ=; b=Ha0QE1zoBW5RG2GwIpBHrUDyr6FHCd17C3q8dmejdtbhHn24LbKVV+w0ON5rwX/M3L GQPzm3mbmBUjMA/JKquRbjU5xWsCgzTFOWirevhC44WtDEkEy68cJAjLlK/QS9OSflmZ 65qqlDPjzK3vhECSe9LmGuZheZsaGMiEz07lKoyCNkNfOkcEM0WybN6nSANQPtmuXI/T vys5CsEePO9Z6LOcCQ9xMdvzK4JH1skNhvY6h8vSC5AAAfityyRmnVMaGsMmJoIdZwU7 ffZJYwBhVzceuVzBfXP5S+xw4Bvl5k734eP5y1/KV/QMlYoD+59NlODRYFi3/xumaOFi wDFg== X-Gm-Message-State: AJaThX7tP1EUUt0wRBAfv4y1ljIYIqYv253+ZcjaGMXRvHWUpsixB1v5 omtmzOCRjPOe12nQnnbALluW4xNtoYESw7UUKCGkyw== X-Google-Smtp-Source: ABhQp+RTDLQK70rvZ/yMfFDH4nvBzki4J4+M6taA4cm8M3OV000vl7g6lIBABjcGBn4U6GAtWQ3ftFTQnRxOiheYxgU= X-Received: by 10.99.55.82 with SMTP id g18mr1071930pgn.31.1510161162963; Wed, 08 Nov 2017 09:12:42 -0800 (PST) MIME-Version: 1.0 Received: by 10.100.155.33 with HTTP; Wed, 8 Nov 2017 09:12:02 -0800 (PST) In-Reply-To: References: From: Xingcan Cui Date: Thu, 9 Nov 2017 01:12:02 +0800 Message-ID: Subject: Re: Generate watermarks per key in a KeyedStream To: Shailesh Jain Cc: user Content-Type: multipart/alternative; boundary="f403045c56daac0fed055d7bcbce" archived-at: Wed, 08 Nov 2017 17:12:49 -0000 --f403045c56daac0fed055d7bcbce Content-Type: text/plain; charset="UTF-8" Hi Shailesh, actually, the watermarks are generated per partition, but all of them will be forcibly aligned to the minimum one during processing. That is decided by the semantics of watermark and KeyedStream, i.e., the watermarks belong to a whole stream and a stream is made up of different partitions (one per key). If the physical devices work in different time systems due to delay, the event streams from them should be treated separately. Hope that helps. Best, Xingcan On Wed, Nov 8, 2017 at 11:48 PM, Shailesh Jain wrote: > Hi, > > I'm working on implementing a use case wherein different physical devices > are sending events, and due to network/power issues, there can be a delay > in receiving events at Flink source. One of the operators within the flink > job is the Pattern operator, and there are certain patterns which are time > sensitive, so I'm using Event time characteristic. But the problem comes > when there are unpredictable delays in events from a particular device(s), > which causes those events to be dropped (as I cannot really define a static > bound to allow for lateness). > > Since I'm using a KeyedStream, keyed on the source device ID, is there a > way to allow each CEP operator instance (one per key) to progress its time > based on the event time in the corresponding stream partition. Or in other > words, is there a way to generate watermarks per partition in a KeyedStream? > > Thanks, > Shailesh > --f403045c56daac0fed055d7bcbce Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Shailesh,

actually, the waterma= rks are generated per partition, but all of them will be forcibly aligned t= o the minimum one during processing. That is decided by the semantics of wa= termark and KeyedStream, i.e., the watermarks belong to a whole stream and = a stream is made up of different partitions (one per key).

If the physical devices work in different time systems due to delay, = the event streams from them should be treated separately.

Hope that helps.

Best,
Xingcan

On Wed, Nov 8, = 2017 at 11:48 PM, Shailesh Jain <shailesh.jain@stellapps.com= > wrote:
=
Hi,

I'm working on implementing a use case = wherein different physical devices are sending events, and due to network/p= ower issues, there can be a delay in receiving events at Flink source. One = of the operators within the flink job is the Pattern operator, and there ar= e certain patterns which are time sensitive, so I'm using Event time ch= aracteristic. But the problem comes when there are unpredictable delays in = events from a particular device(s), which causes those events to be dropped= (as I cannot really define a static bound to allow for lateness).

<= /div>Since I'm using a KeyedStream, keyed on the source device ID, is t= here a way to allow each CEP operator instance (one per key) to progress it= s time based on the event time in the corresponding stream partition. Or in= other words, is there a way to generate watermarks per partition in a Keye= dStream?

Thanks,
Shailesh

--f403045c56daac0fed055d7bcbce--