Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D1502200C64 for ; Fri, 28 Apr 2017 09:56:43 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CFD97160BA3; Fri, 28 Apr 2017 07:56:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EEA80160B8C for ; Fri, 28 Apr 2017 09:56:42 +0200 (CEST) Received: (qmail 19511 invoked by uid 500); 28 Apr 2017 07:56:42 -0000 Mailing-List: contact dev-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list dev@flink.apache.org Received: (qmail 19499 invoked by uid 99); 28 Apr 2017 07:56:41 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Apr 2017 07:56:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 48C511B0F8D for ; Fri, 28 Apr 2017 07:56:41 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.379 X-Spam-Level: ** X-Spam-Status: No, score=2.379 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RCVD_IN_SORBS_SPAM=0.5, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd2-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id xroCc2sWtdHv for ; Fri, 28 Apr 2017 07:56:38 +0000 (UTC) Received: from mail-yb0-f174.google.com (mail-yb0-f174.google.com [209.85.213.174]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id 099C05FBBB for ; Fri, 28 Apr 2017 07:56:38 +0000 (UTC) Received: by mail-yb0-f174.google.com with SMTP id 8so13662666ybw.1 for ; Fri, 28 Apr 2017 00:56:37 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=NzhFzTlqqXRT6OK70hzapaHnjqXlZ0RfJt51dGrr8C8=; b=d6H/aGI5ir2x1k2y89vWoauPleaUbmj7XNw4YfBQxFzwYTEgixKyv0KhjXEJRR8pJK VXal4s755czergQbmKKzowSd+cgo5sEkm6Y7Tcy+Z7ExzxrrMK7csrluKvK5yY11ejwI l2RFLU+7SegQy7irfMVTm7t1aTtEgcKQSIhmx5MoC/VSzyXAnFLOk8zr9HPvZXFsksXD k/4dsi8zOWP+YirOur7LMk1OIMUkfOcXkho07PGXXqYIkVEwcF7siZuOvk1ZZT3v63rD raZRA4S7aFd34dhKpbptx1ThTC4ij5hFGEG0Euq4YtLv29qDdtKvueEkAUYziLrdXGaJ Forw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=NzhFzTlqqXRT6OK70hzapaHnjqXlZ0RfJt51dGrr8C8=; b=VhmLgj1Kxd2H3uA5FMzQZISsH8tRmpyudyg/YV5vSZAKcNYHmwhtUc/TGIB+7T9q0E u0EXOFQrEdOjHCLeVnyFQWhtKfnHJUWvyrJ6GqWpAQgmaXHGC5EW8lU1Y5R+JNitk/Z7 Kmj6JHBuxDTJXz0D26kcnMCmOnRKJDaJWAEbFLju75ZGKdGglcoBPvE1u90Cnth7MPvS 3Zb3+KDsQBm+xYI+jlfow+SNVxFpO0O8vPGAk3m2chr4gObpC/pXB1gyKpb3Hy2fLF2S 86Pw274Lbrf1x4euWvFXgM0rTctANCAn6NQr+dPAPDhYAeVk8svWBflwoHjgg3G4aHLk AtJg== X-Gm-Message-State: AN3rC/5n6/TYbOc6X1PrtUfomFG7Uz5CmylGvkVV2rEemTakTbbVij8W rWUff+n9yIR7dmM6P0g1W396wKIOKnEZTPg= X-Received: by 10.37.217.144 with SMTP id q138mr8037948ybg.166.1493366190926; Fri, 28 Apr 2017 00:56:30 -0700 (PDT) MIME-Version: 1.0 Received: by 10.83.21.7 with HTTP; Fri, 28 Apr 2017 00:56:00 -0700 (PDT) In-Reply-To: <8B754047F81D6B4290B9F4CE928333A517C4B60F@lhreml503-mbx> References: <8B754047F81D6B4290B9F4CE928333A517C4B3F7@lhreml503-mbx> <8B754047F81D6B4290B9F4CE928333A517C4B60F@lhreml503-mbx> From: Fabian Hueske Date: Fri, 28 Apr 2017 09:56:00 +0200 Message-ID: Subject: Re: question about rowtime processfunction - are watermarks needed? To: "dev@flink.apache.org" Content-Type: multipart/alternative; boundary=001a114fcdea54734a054e3569ea archived-at: Fri, 28 Apr 2017 07:56:44 -0000 --001a114fcdea54734a054e3569ea Content-Type: text/plain; charset=UTF-8 Hi Radu, yes that might happen in a parallel setup and depends on the "speed" of the parallel threads. An operator does only increment its own event-time clock to the minimum of the last watermark received from each input channel. If one input channel is "slow", the event-time of an operator lacks behind and "late" events of the other threads are correctly processed because the operators event-time was not incremented yet. So, event-time is not deterministic when it comes to which records are dropped. The watermark documentation might be helpful as well [1]. Cheers, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.html#watermarks-in-parallel-streams 2017-04-27 22:09 GMT+02:00 Radu Tudoran : > Re-hi, > > I debuged a bit the test for the Event rowtime > > I tested the testBoundNonPartitionedEventTimeWindowWithRange from > SQLITCase class > > Although I would expect that once a watermark is triggered: 1) the on > timer will be called to process the events that arrived so far and 2) the > future events that arrive will be dropped. However, it seems that almost > the entire input can arrive in the processElement function before the > onTimer is triggered. > > Moreover, if you modify the input to add an un-ordered event (see dataset > below where I added after watermark 14000 ...an event with watermark > 1000...as far as I would expect this should be dropped. However, in > different runs it can happen that it will be not dropped. Basically it can > happen that the onTimer was never triggered and this event arrives and it > is registered). Is this correct? Am I missing something? > > > @Test > def testBoundNonPartitionedEventTimeWindowWithRangeUnOrder(): Unit = { > val data = Seq( > Left((1500L, (1L, 15, "Hello"))), > Left((1600L, (1L, 16, "Hello"))), > Left((1000L, (1L, 1, "Hello"))), > Left((2000L, (2L, 2, "Hello"))), > Right(1000L), > Left((2000L, (2L, 2, "Hello"))), > Left((2000L, (2L, 3, "Hello"))), > Left((3000L, (3L, 3, "Hello"))), > Right(2000L), > Left((4000L, (4L, 4, "Hello"))), > Right(3000L), > Left((5000L, (5L, 5, "Hello"))), > Right(5000L), > Left((6000L, (6L, 6, "Hello"))), > Left((6500L, (6L, 65, "Hello"))), > Right(7000L), > Left((9000L, (6L, 9, "Hello"))), > Left((9500L, (6L, 18, "Hello"))), > Left((9000L, (6L, 9, "Hello"))), > Right(10000L), > Left((10000L, (7L, 7, "Hello World"))), > Left((11000L, (7L, 17, "Hello World"))), > Left((11000L, (7L, 77, "Hello World"))), > Right(12000L), > Left((14000L, (7L, 18, "Hello World"))), > Right(14000L), > Left((15000L, (8L, 8, "Hello World"))), > Left((1000L, (8L, 8, "Too late - Hello World"))), ///event is out > of ordered and showed be droppped > Right(17000L), > Left((20000L, (20L, 20, "Hello World"))), > Right(19000L)) > > > > > -----Original Message----- > From: Fabian Hueske [mailto:fhueske@gmail.com] > Sent: Thursday, April 27, 2017 3:17 PM > To: dev@flink.apache.org > Subject: Re: question about rowtime processfunction - are watermarks > needed? > > Hi Radu, > > event-time processing requires watermarks. Operators use watermarks to > compute the current event-time. > The ProcessFunctions for over range windows use the TimerServices to group > elements by time. > In case of event-time, the timers are triggered by the event-time of the > operator which is derived from the received watermarks. > In case of processing-time, the timers are triggered based on the > wallclock time of the operator. > > So by using event-tim timers, we implicitly rely on the watermarks because > the timers are triggered based on the received watermarks. > > Best, Fabian > > > 2017-04-27 10:51 GMT+02:00 Radu Tudoran : > > > Hi, > > > > I am looking at the implementation of RowTimeBoundedRangeOver (in the > > context of Stream SQL). I see that the logic is that the progress > > happens based on the timestamps of the rowevent - i.e., when an even > > arrives we register to be processed based on it's timestamp > (ctx.timerService. > > registerEventTimeTimer(triggeringTs)) > > > > In the onTimer we remove (retract) data that has expired. However, we > > do not consider watermarks nor some allowed latency for the events or > > anything like this, which makes me ask: > > Don't we need to work with watermarks when we deal with even time? And > > keep the events within the allowed delayed/next watermark? Am I > > missing something? Or maybe we do not consider at this point > > allowedLateness for this version? > > > > Thanks > > > > Best regards, > > > > > --001a114fcdea54734a054e3569ea--