From user-return-19146-archive-asf-public=cust-asf.ponee.io@flink.apache.org Tue Apr 3 19:39:50 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 6E6AE18064D for ; Tue, 3 Apr 2018 19:39:49 +0200 (CEST) Received: (qmail 18205 invoked by uid 500); 3 Apr 2018 17:39:48 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 18195 invoked by uid 99); 3 Apr 2018 17:39:48 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Apr 2018 17:39:48 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id BC83418000B for ; Tue, 3 Apr 2018 17:39:47 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.879 X-Spam-Level: * X-Spam-Status: No, score=1.879 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id e2g9L1-MAy9d for ; Tue, 3 Apr 2018 17:39:46 +0000 (UTC) Received: from mail-ua0-f180.google.com (mail-ua0-f180.google.com [209.85.217.180]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with ESMTPS id EABD75F19C for ; Tue, 3 Apr 2018 17:39:45 +0000 (UTC) Received: by mail-ua0-f180.google.com with SMTP id o34so11523038uae.9 for ; Tue, 03 Apr 2018 10:39:45 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=0XU+GtK7EVLTj0J5y3PZ9lLHrrQEMaIlp56H58f3AGw=; b=MnHbEZMu9OLWMcH5US8WHVTjuFkusPRWTG6AKq1d7AerIms4lcxXM2mMrTdHuQL+2K +dw332ciaElqyln2ivHq1aWOrm0SmXgq3ag3Qf26OSnaNeT//Z9RRZQo9nc8KEQuGNd6 8dz2TcPpa0EILJ0q5WcM3eWb/EmHJ0se/563Ia/XjqhhM5D4ZIIyXX4X3axjH6IE/T01 OPKuzMWAKh2FzEAdhVswnm79gHBuhHGMwKLH7wIbK2VfiOckf+26IYNCLKXTm4gn1JqF lReMrUqu9zUGRufUG0rvUAaZBmunDI4GXAgG7iwENaw6cqnM4Rhi8WPnDna9OjxcwShw +9IA== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=0XU+GtK7EVLTj0J5y3PZ9lLHrrQEMaIlp56H58f3AGw=; b=Hxf8kXtYkGpH2Q+PoQdRL/2xRVFHQHu4cn/7hwE67vL4q8kH9SaqdrRQgmm4n4EgVN Aad2ycrEpeh2K69h2OTRg8kY+T/2nounUDYC92wmVWXRRWiuHHWdlAndxSVu0T9rDIQk BzflhZgpX8I1pjisVsQJmxuAtfnucbJutpZV5Oomn8giCU0lFbO9xNDhpaWOtStMfGy3 hSj5g7K5ck17v1tDOo5IiPrPdZYYyZuF3juPJeJwV/Ox0T6yBsapilcbUAVplj44gjfa miaRqAIrBQ9o/Vs36+dgT/s1hccRB+Q8i6LPBtw96DShT8/QGNmtYIuUJIgQEKqqNpQg 7IxA== X-Gm-Message-State: ALQs6tC+lzm3voE7yBy0DxBwniPcWQg95/GW5Qw2RefJ7ozAfiZu0VmE S8+lFWd28Fe9ggCQrzkMidBwh6532KcdaXXr7Ck= X-Google-Smtp-Source: AIpwx48/buGNm1u/avQjrzgTC0nxp0DahAfJYbpJmefmvSbrqYUX4u9OLZhs9aNHfUJawnBp45WBvV2eOzL5xgdQlk4= X-Received: by 10.176.48.2 with SMTP id f2mr8786029ual.51.1522777178997; Tue, 03 Apr 2018 10:39:38 -0700 (PDT) MIME-Version: 1.0 Received: by 10.159.55.202 with HTTP; Tue, 3 Apr 2018 10:39:38 -0700 (PDT) In-Reply-To: <9122416d-b6fb-053a-ea67-13429a2ddaf0@apache.org> References: <9122416d-b6fb-053a-ea67-13429a2ddaf0@apache.org> From: Chengzhi Zhao Date: Tue, 3 Apr 2018 13:39:38 -0400 Message-ID: Subject: Re: Watermark Question on Failed Process To: Timo Walther Cc: user Content-Type: multipart/alternative; boundary="94eb2c1fdc34d387280568f530e8" --94eb2c1fdc34d387280568f530e8 Content-Type: text/plain; charset="UTF-8" Thanks Timo for your response and the references. I will try BoundedOutOfOrdernessTimestampExtractor, and if it does't work as expected, I will handle it as a separated pipeline. Also, is there a way to retrieve the last watermark before/after failure? So maybe I can persist the watermark to external storage and resume as a separated pipeline? Best, Chengzhi On Tue, Apr 3, 2018 at 7:58 AM, Timo Walther wrote: > Hi Chengzhi, > > if you emit a watermark even though there is still data with a lower > timestamp, you generate "late data" that either needs to be processed in a > separate branch of your pipeline (see sideOutputLateData() [1]) or should > force your existing operators to update their previously emitted results. > The latter means holding state or the contents of your windows longer (see > allowedLateness() [1]). I think in general a processing time watermark > strategy might not be suitable for reprocessing. Either you parameterize > your watermark generator such that you can pass information through job > parameters or you use another strategy such as > BoundedOutOfOrdernessTimestampExtractor [2] and sinks that allow > idempotent updates. > > I hope this helps. > > Regards, > Timo > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.4/dev/stream/operators/windows.html#windows > [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_ > timestamp_extractors.html > > Am 02.04.18 um 23:51 schrieb Chengzhi Zhao: > > Hello, flink community, > > I am using period watermark and extract the event time from each records > from files in S3. I am using the `TimeLagWatermarkGenerator` as it was > mentioned in flink documentation. > > Currently, a new watermark will be generated using processing time by > fixed amount > > override def getCurrentWatermark: Watermark = { > new Watermark(System.currentTimeMillis() - maxTimeLag) > } > > This would work fine as long as process is running. However, in case of > failures, I mean if there was some bad data or out of memory occurs, I need > to stop the process and it will take me time to get back. If the maxTimeLag=3 > hours, and it took me 12 hours to realize and fix it. > > My question is since I am using processing time as part of the watermark, > when flink resumed from failures, will some records might be ignored by the > watermark? And what's the best practice to catchup and continue without > losing data? Thanks! > > Best, > Chengzhi > > > --94eb2c1fdc34d387280568f530e8 Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Thanks Timo for your response an= d the references.

<= div style=3D"color:rgb(34,34,34);font-family:arial,sans-serif;font-size:sma= ll;font-style:normal;font-variant-ligatures:normal;font-variant-caps:normal= ;font-weight:400;letter-spacing:normal;text-align:start;text-indent:0px;tex= t-transform:none;white-space:normal;word-spacing:0px;text-decoration-style:= initial;text-decoration-color:initial">I will try=C2=A0BoundedOutOfOrdernessTimestampExtr= actor, and if it does't work as expected, I will handle it as a separat= ed pipeline.
Also, is there a way to retrieve the last watermark before/= after failure? So maybe I can persist the watermark to external=C2=A0storag= e and resume as a=C2=A0separated pipeline?

Best,
Chengzhi
<= br>

On Tue, Apr 3, 2018 at 7:58 AM, Timo Walther <twalthr@apache.org&g= t; wrote:
=20 =20 =20
Hi Chengzhi,

if you emit a watermark even though there is still data with a lower timestamp, you generate "late data" that either needs= to be processed in a separate branch of your pipeline (see sideOutputLateData() [1]) or should force your existing operators to update their previously emitted results. The latter means holding state or the contents of your windows longer (see allowedLateness() [1]). I think in general a processing time watermark strategy might not be suitable for reprocessing. Either you parameterize your watermark generator such that you can pass information through job parameters or you use another strategy such as BoundedOutOfOrdernessTimestampExtractor [2] and sinks th= at allow idempotent updates.

I hope this helps.

Regards,
Timo

[1] https://ci.apache.org/projects/fli= nk/flink-docs-release-1.4/dev/stream/operators/windows.html#windo= ws
[2] https://ci.apache.org/projects/flink/fli= nk-docs-release-1.4/dev/event_timestamp_extractors.html

Am 02.04.18 um 23:51 schrieb Chengzhi Zhao:
Hello, flink community,

I am using period watermark and extract the event time from each records from files in S3. I am using the `TimeLagWatermarkGenerator` as it was mentioned in flink documentation.

Currently, a new watermark will be generated using processing time by fixed amount

override def getCurrentWatermark: Watermark =3D {
=C2=A0 =C2=A0 new Watermark(System.currentTimeMillis() - = maxTimeLag)
}

This would work fine as long as process is running. However, in case of failures, I mean if there was some bad data or out of memory occurs, I need to stop the process and it will take me time to get back. If the=C2=A0maxTimeLag=3D3 hours, and it took me 12 hours to realize and fix it.

My question is since I am using processing time as part of the watermark, when flink resumed from failures, will some records might be ignored by the watermark? And what's the bes= t practice to catchup and continue without losing data? Thanks!

Best,
Chengzhi



--94eb2c1fdc34d387280568f530e8--