From user-return-17629-archive-asf-public=cust-asf.ponee.io@flink.apache.org Mon Jan 15 14:51:43 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 395F6180657 for ; Mon, 15 Jan 2018 14:51:43 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 29761160C31; Mon, 15 Jan 2018 13:51:43 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2184C160C25 for ; Mon, 15 Jan 2018 14:51:41 +0100 (CET) Received: (qmail 65456 invoked by uid 500); 15 Jan 2018 13:51:40 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 65446 invoked by uid 99); 15 Jan 2018 13:51:40 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 15 Jan 2018 13:51:40 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 49E7B1806E2 for ; Mon, 15 Jan 2018 13:51:40 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id GOe72r7D_IDR for ; Mon, 15 Jan 2018 13:51:38 +0000 (UTC) Received: from mail-qt0-f173.google.com (mail-qt0-f173.google.com [209.85.216.173]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 995B85F23C for ; Mon, 15 Jan 2018 13:51:38 +0000 (UTC) Received: by mail-qt0-f173.google.com with SMTP id m59so14199229qte.11 for ; Mon, 15 Jan 2018 05:51:38 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=R6scUnsuc8yWvy1TeiFBDIIr0zhqE9Z/NQTykVhEj2Q=; b=sFj3fPb6HWLYECBuDU5xZPkaewBFjBK8kvc1IKzl0Jkk4YCJZ6D+hTtaluNwNnSb/+ 9cNx5FT/k0sZwrC/mLGO7m+C0dNu9i6v9UD8ZR24IHoWfFUpJ0JqiOUyAVHgZJMBEOPR ctfkpkK4nehCKzrZ3cE20PKxo0nG8H085KapHgAFcKIQr/K8fWqxYTSxfMUqu8YIgi4X dZANdvfnXXKt1TkjvR0q5ag08f9i1//+trzyBOH8TgUIa5Jnr4E7206ZR1NCkcRd06vT ko6LM+oHzEoCLHia5YrI3hJUm8kiec9EfR1k3xg3DwYZADO23KhVyL8CX6ymYb3a5vb0 7WBQ== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=R6scUnsuc8yWvy1TeiFBDIIr0zhqE9Z/NQTykVhEj2Q=; b=QAGd0ooNo9Ieho4iu8PblJ+yK3nQrMiKAy9w5dFmIMItxPnBrfw83FL6fhD6mJKY0o qZFFDqPfLAkmOM1U76IROH2CT0arashLdPHTTZatLzS3X1FvGWnIRAkcLhMPsrPZ2P2x qZ7CAUs+gSQvRXQQPar1avj6c9X2JCfx9+3x0LjJbFbVyOzDA8saPsyudqcpYkSwZ9E7 1rCSDVSP38lyR7Uq1xOXbqXA+H80SkFexWehW1u84DrPqVTV9a3jIE11SN7p4U7fG6Lr Fxe2112WUm8au2qNWWSVjdCJTP9ZPa76O/53ho6YiOeukv69K+xspVrffT4BwqQ1WUfu 3Gyw== X-Gm-Message-State: AKwxyteLkVW0zZbsUixgspSSEO8U4YjObMpJZsOSchGuta2SgVlRK+z+ 36Tt6A+Y/uJc2/LRZP82lQCVhqRZemvFkx7PWFE= X-Google-Smtp-Source: ACJfBovqDA2DUTCn25n14Bqoca5aTZyRjQwjtQWsSp+gmwUrYcECg5Itb+WaWcwdZZL8hurWZJ2G8QqkmFyp73EMYVQ= X-Received: by 10.237.62.81 with SMTP id m17mr31438620qtf.88.1516024298243; Mon, 15 Jan 2018 05:51:38 -0800 (PST) MIME-Version: 1.0 Received: by 10.237.61.130 with HTTP; Mon, 15 Jan 2018 05:51:17 -0800 (PST) In-Reply-To: References: From: Jayant Ameta Date: Mon, 15 Jan 2018 19:21:17 +0530 Message-ID: Subject: Re: Trigger not firing when using BoundedOutOfOrdernessTimestampExtractor To: Fabian Hueske Cc: Gary Yao , user Content-Type: multipart/alternative; boundary="001a11406f1ac4895d0562d0e9b6" --001a11406f1ac4895d0562d0e9b6 Content-Type: text/plain; charset="UTF-8" Hi Fabian, I want to extract timestamps from my event. However, the events stream can be sparse at times (e.g. 2 days without any events). What's the best strategy to create watermarks if I want real-time processing of the events which enter the stream? Jayant Ameta On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hueske wrote: > Another thing to point out is that watermarks are usually data-driven, > i.e., they depend on the timestamps of the events and not on the clock of > the machine. > Otherwise, you might observe a lot of late data, i.e., events with > timestamps smaller than the last watermark. > > If you assign timestamps and watermarks based on the clock of the machine, > you might also use ingestion time instead of event time. > > 2018-01-11 11:49 GMT+01:00 Jayant Ameta : > >> Thanks Gary, >> I was only trying with a fixed set of events, so the Watermark was not >> advancing, like you said. >> >> >> Jayant Ameta >> >> On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao wrote: >> >>> Hi Jayant, >>> >>> The difference is that the Watermarks from >>> BoundedOutOfOrdernessTimestampExtractor are based on the greatest >>> timestamp of >>> all previous events. That is, if you do not receive new events, the >>> Watermark >>> will not advance. In contrast, your custom implementation of >>> AssignerWithPeriodicWatermarks always advances the Watermark based on >>> the wall >>> clock. >>> >>> Maybe this will already help you to debug your application. If not, it >>> would be >>> great to see a minimal working example. >>> >>> Best, >>> Gary >>> >>> On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta >>> wrote: >>> >>>> Hi, >>>> When using a BoundedOutOfOrdernessTimestampExtractor, the trigger is >>>> not firing. However, the trigger fires when using custom timestamp >>>> extractor with similar watermark. >>>> >>>> Sample code below: >>>> 1.Assigner as anonymous class which works fine >>>> >>>> AssignerWithPeriodicWatermarks> assigner = new AssignerWithPeriodicWatermarks>() { >>>> >>>> @Override >>>> public long extractTimestamp(Tuple2 element, long previousElementTimestamp) { >>>> return System.currentTimeMillis(); >>>> } >>>> >>>> @Override >>>> public final Watermark getCurrentWatermark() { >>>> // this guarantees that the watermark never goes backwards. >>>> return new Watermark(System.currentTimeMillis()-100); >>>> } >>>> }; >>>> >>>> >>>> 2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn't work >>>> >>>> AssignerWithPeriodicWatermarks> assigner = new BoundedOutOfOrdernessTimestampExtractor>(Time.milliseconds(100)) { >>>> >>>> @Override >>>> public long extractTimestamp(Tuple2 element) { >>>> return System.currentTimeMillis(); >>>> } >>>> }; >>>> >>>> >>>> Do you see any difference in the approaches? >>>> >>>> - Jayant >>>> >>> >>> >> > --001a11406f1ac4895d0562d0e9b6 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi Fabian,
I want to extract timestamps from my event. However= , the events stream can be sparse at times (e.g. 2 days without any events)= .
What's the best strategy to crea= te watermarks if I want real-time processing of the events which enter the = stream?

Jayant Ameta

On Thu, Jan 11, 2018 at 4:53 PM, Fabian Hues= ke <fhueske@gmail.com> wrote:
Another thing to point out is that watermark= s are usually data-driven, i.e., they depend on the timestamps of the event= s and not on the clock of the machine.
Otherwise, you might observ= e a lot of late data, i.e., events with timestamps smaller than the last wa= termark.

If you assign timestamps and watermarks based on the = clock of the machine, you might also use ingestion time instead of event ti= me.

2018-01-11 11:49 GMT+01:00 Jayant Ameta= <wittyameta@gmail.com>:
Thanks Gary,
I was only trying with a fixed set of e= vents, so the Watermark was not advancing, like you said.


Jayant Ameta

On Thu, Jan 11, 2018 at 3:36 PM, Gary Yao <gary@data-artisans.com> wrote:
Hi Jayant,<= /div>

The difference is that the Watermarks from
BoundedOutOfOrdernessTimestampExtractor are based on the grea= test timestamp of
all previous events.= That is, if you do not receive new events, the Watermark
will not advance. In contrast, your custom implementa= tion of
AssignerWithPeriodicWatermarks= always advances the Watermark based on the wall
clock.

Maybe this will already help you to debug your applic= ation. If not, it would be
great to se= e a minimal working example.

Best,
Gary

On Wed, Jan 10, 2018 at 4:46 PM, Jayant Ameta <= wittyameta@gmail.= com> wrote:
Hi,
When using a BoundedOutOfOrdernessTime= stampExtractor, the trigger is not firing. However, the trigger fires = when using custom timestamp extractor with similar watermark.
Sample code below:
1.Assigner as anonymous class whic= h works fine
AssignerWithPeriodicWatermarks<=
Tuple2<Rule, T>> assig=
ner =3D new AssignerWithPeriodicWatermarks<=
wbr><Tuple2<Rule, T>>() {
<= pre style=3D"color:rgb(0,0,0);font-family:"Fira Code Light";font-= size:7.8pt"> @Ove= rride
= public long extractTimestamp(Tuple2<Rule, T<= /span>> element, lon= g previousElementTimestamp) {
return System.currentTimeMillis();
}

@Override
public final Watermark getCurrentWatermark() {<= br> // this= guarantees that the watermark never goes backwards.
return new Watermark(System.currentTimeMillis
()-100);
}
};

<= /pre>
2.BoundedOutOfOrdernessTimestampExtractor assigner which doesn&#=
39;t work
AssignerWithPeriodicWatermarks<Tuple2<Rule, T>> assigner =3D new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Rule, T>>(Time.milliseconds(100)) {
    @Override
public long extractTimestamp(Tuple2<Rule, T> element) {
return System.currentTimeMillis();
}
};

Do you see any difference in the approaches?

- Jayant




--001a11406f1ac4895d0562d0e9b6--