From user-return-27516-archive-asf-public=cust-asf.ponee.io@flink.apache.org Fri May 3 08:38:15 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 5A97218064D for ; Fri, 3 May 2019 10:38:10 +0200 (CEST) Received: (qmail 62900 invoked by uid 500); 3 May 2019 08:37:57 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 62890 invoked by uid 99); 3 May 2019 08:37:57 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 03 May 2019 08:37:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id C1FB2180D6C for ; Fri, 3 May 2019 08:37:56 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 2.811 X-Spam-Level: ** X-Spam-Status: No, score=2.811 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_NONE=-0.0001, RCVD_IN_MSPIKE_H2=-0.001, SCC_5_SHORT_WORD_LINES=1, T_SPF_PERMERROR=0.01, URIBL_BLOCKED=0.001, WEIRD_PORT=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=rovio.com Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id rnetwktzBQPA for ; Fri, 3 May 2019 08:37:47 +0000 (UTC) Received: from mail-lf1-f67.google.com (mail-lf1-f67.google.com [209.85.167.67]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with ESMTPS id 57AFD60E5A for ; Fri, 3 May 2019 08:37:46 +0000 (UTC) Received: by mail-lf1-f67.google.com with SMTP id n134so2032495lfn.11 for ; Fri, 03 May 2019 01:37:46 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=rovio.com; s=google; h=mime-version:references:in-reply-to:from:date:message-id:subject:to :cc; bh=hBRU8cH20uUoGBxu7gdkhAehdP4XdKWNWCg1mpAtm1o=; b=gg/MlHYSopforJTdNVa7vGnkcIsraTmPD9iodop6aSlPaH79y15nL6MZVZHa5xrhXB In66vfy5SbqQchauVz0s8r94yTPo7uzPM4rk+HVKdZc9Q7NSfekcGEUvbKmmJ+44AmV5 Z9UmOoIX+XETQ0G2ShuyuNRe4/66RHoOWr/Hs= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:references:in-reply-to:from:date :message-id:subject:to:cc; bh=hBRU8cH20uUoGBxu7gdkhAehdP4XdKWNWCg1mpAtm1o=; b=QsNd6yww5zzCWbnFGMqS8I5yjzLWxYD5hZTyoaOzcBN8OGQRKIo8mq3q2LC8P2XlG5 WMXWBZYbuzPpHsIu5ztznMu2en9Fpc2/wv2kSxy97mWWyhpQavVw5vb3xPM3qV1eTdQk mTMpdMLF9KEk0nDvqvoHocPDHUj/nX0dZw6YYYdvUfrHrM2q2K4w5jlGll6FcdTH7ITx VGXYwMxQoj8EVPMBaODX/touD/mDPiIvvfB2npBWNOhuxajPmQarpbdROVdzRK/JrmUZ R+ilLaD0NseVrLrXLAUPUm6V3GMSKsMPgNtvt9LdPB7MohIRqQyxufQGoJPsE8CHNlXZ S2UA== X-Gm-Message-State: APjAAAXt2a30HLei8NxMOAFXv/N7C5dCjrs3Vf3kOOY5yfhXtg9MqMQa QuEbAOqQ76iEbQTk7TP7kYy8SDnVQX3ltpMrdP5Yx2Vq X-Google-Smtp-Source: APXvYqyCBukQOfoCyhjhoNIYTXxuA9Sq9mfH/zGfEklP8coUW1mdYK7R57fGAY4wfCL+gd/lAW/nejfUOeY1SHOZWoA= X-Received: by 2002:ac2:4a89:: with SMTP id l9mr4227547lfp.60.1556872664164; Fri, 03 May 2019 01:37:44 -0700 (PDT) MIME-Version: 1.0 References: <514F56D1-15DE-4926-BC89-70F072A2AB9A@data-artisans.com> <2E2CD23C-14AF-4150-A913-6099A18A974A@data-artisans.com> <5A4DC214-2A5B-4D2A-82EA-E27ACE0FDC40@data-artisans.com> <89CF5755-6128-4057-A402-22DA01734437@data-artisans.com> <4E274FF6-57FB-48FE-B715-5E74FCDE5937@data-artisans.com> <967C8F58-E81C-4733-8E09-76558FF843C1@data-artisans.com> <727031CA-EC5C-4641-8024-4B03B329B781@ververica.com> In-Reply-To: From: Juho Autio Date: Fri, 3 May 2019 11:37:32 +0300 Message-ID: Subject: Re: Data loss when restoring from savepoint To: Konstantin Knauf Cc: Stefan Richter , user Content-Type: multipart/alternative; boundary="0000000000001bca7a0587f7aa7c" --0000000000001bca7a0587f7aa7c Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: quoted-printable Konstantin, thanks for providing the new code. Here are the latest results for jobs run with extended DEBUG logging. 20190427 (killed & restored), missing_rows.count(): 3470 20190428 (no kill / restore), missing_rows.count(): 0 I have shared the logs from 27th (after restore) in private with Konstantin= . On Fri, Apr 26, 2019 at 5:05 PM Konstantin Knauf wrote: > Hi Juho, > > sorry for not being more responsive the last two weeks, I was on vacation > for a good part of it. The fact that this also happens with Timers on > RocksDB is again confusing. The code that we mainly had a look at so far = is > not used by the rocksdb configuration. So the inconsistencies that we saw > in the logs, don't apply to the RocksDB configuration. > > Anyway, I agree to further track down the issue for the heap timers first= , > and then to move on to RocksDB. I have added more fine grained logging to > the branch [1]. The two additional classes, which you need to set the > logging level to DEBUG for, are > > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl > > org.apache.flink.streaming.api.operators.InternalTimerServiceSerializatio= nProxy > > Please run through the usual procedure of doing a savepoint and provide > the logs during recovery. > > Thank you for your perseverance, > > Konstantin > > [1] https://github.com/knaufk/flink/tree/logging-timers > > > On Thu, Apr 18, 2019 at 4:06 PM Oytun Tez wrote: > >> Thanks for the update, Juho, and please do keep updating :) I've been >> watching the thread silently, I am sure your findings helps many others = who >> watch the thread. >> >> >> >> >> >> --- >> Oytun Tez >> >> *M O T A W O R D* >> The World's Fastest Human Translation Platform. >> oytun@motaword.com =E2=80=94 www.motaword.com >> >> >> On Thu, Apr 18, 2019 at 8:26 AM Juho Autio wrote: >> >>> In the meanwhile, some additional results, continued with ROCKSDB timer >>> service: >>> >>> 20190416 (no cancellation), missing_rows.count(): 0 >>> 20190417 (cancel with savepoint & restore), missing_rows.count(): 54 >>> >>> On Tue, Apr 16, 2019 at 2:35 PM Juho Autio wrote= : >>> >>>> Ouch, we have a data loss case now also with ROCKSDB timer service >>>> factory. This time the job had failed for some reason & restored check= point >>>> by itself (I mean I didn=E2=80=99t cancel with savepoint this time. Pr= evious >>>> restore from savepoint was at 14-04-2019 06:21:45 UTC). >>>> >>>> In this case the number of lost ids was quite high: >>>> >>>> 20190415, missing_rows.count(): 706605 >>>> >>>> I don't know if the ROCKSDB timer service is a factor towards higher >>>> instability, but indeed I'd like to go back to testing with >>>> InteralTimerServiceImpl as well. Will switch back to that when the upd= ated >>>> branch is available. Also I'm not sure if the cause of data loss is si= milar >>>> now with ROCKSDB timer service factory (lost timers or maybe something >>>> else), because we didn't have corresponding DEBUG logging for this >>>> implementation. >>>> >>>> On Mon, Apr 15, 2019 at 11:27 AM Konstantin Knauf < >>>> konstantin@ververica.com> wrote: >>>> >>>>> Hi Juho, >>>>> >>>>> this is good news indeed! I have had a look at the _metadata files an= d >>>>> logs on Friday and it looks like a) the timer state is contained in t= he >>>>> savepoint files and b) the timer state is also initially read by the >>>>> TaskStateManagerImpl, but they it is somehow lost until the reach the >>>>> InteralTimerServiceImpl. I will provide updated version of my branch >>>>> with more logging output to find the reason for this today or tomorro= w. It >>>>> would be great, if you could test this again then. >>>>> >>>>> Best, >>>>> >>>>> Konstantin >>>>> >>>>> On Mon, Apr 15, 2019 at 9:49 AM Juho Autio >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Great news: >>>>>> There's no data loss (for the 3 days so far that were run) with >>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB. >>>>>> >>>>>> Each day the job was once cancelled with savepoint & restored. >>>>>> >>>>>> 20190412, missing_rows.count(): 0 >>>>>> 20190413, missing_rows.count(): 0 >>>>>> 20190414, missing_rows.count(): 0 >>>>>> >>>>>> Btw, now we don't get the DEBUG logs of >>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl an= y more, >>>>>> so I didn't know how to check from logs how many timers are restored= . But >>>>>> based on the results I'm assuming that all were successfully restore= d. >>>>>> >>>>>> We'll keep testing this a bit more, but seems really promising >>>>>> indeed. I thought at least letting it run for some days without >>>>>> cancellations and on the other hand cancelling many times within the= same >>>>>> day etc. >>>>>> >>>>>> Can I provide some additional debug logs or such to help find the bu= g >>>>>> when 'heap' is used for timers? Did you already analyze the _metadat= a files >>>>>> that I sent? >>>>>> >>>>>> On Thu, Apr 11, 2019 at 4:21 PM Juho Autio >>>>>> wrote: >>>>>> >>>>>>> Shared _metadata files also, in private. >>>>>>> >>>>>>> The job is now running with >>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB. I started it = from >>>>>>> empty state because I wasn't sure would this change be migrated >>>>>>> automatically(?). I guess clean setup like this is a good idea any = way. >>>>>>> First day that is fully processed with this conf will be tomorrow= =3DFriday, >>>>>>> and results can be compared on the next day.. I'll report back on t= hat on >>>>>>> Monday. I verified from Flink UI that the property is found in >>>>>>> Configuration, but I still feel a bit unsure about if it's actually= being >>>>>>> used. I wonder if there's some INFO level logging that could be che= cked to >>>>>>> confirm that? >>>>>>> >>>>>>> Thanks. >>>>>>> >>>>>>> On Thu, Apr 11, 2019 at 4:01 PM Konstantin Knauf < >>>>>>> konstantin@ververica.com> wrote: >>>>>>> >>>>>>>> Hi Juho, >>>>>>>> >>>>>>>> thank you. I will have a look at your logs later today or tomorrow= . >>>>>>>> Could you also provide the metadata file of the savepoints in ques= tion? It >>>>>>>> is located in the parent directory of that savepoint and should fo= llow this >>>>>>>> naming ptterns "savepoints_.*_savepoint_.*__metadata". >>>>>>>> >>>>>>>> Best, >>>>>>>> >>>>>>>> Konstantin >>>>>>>> >>>>>>>> On Thu, Apr 11, 2019 at 2:39 PM Stefan Richter < >>>>>>>> s.richter@ververica.com> wrote: >>>>>>>> >>>>>>>>> No, it also matters for savepoints. I think the doc here is >>>>>>>>> misleading, it is currently synchronous for all cases of RocksDB = keyed >>>>>>>>> state and heap timers. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Stefan >>>>>>>>> >>>>>>>>> On 11. Apr 2019, at 14:30, Juho Autio >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> Thanks Till. Any way, that's irrelevant in case of a savepoint, >>>>>>>>> right? >>>>>>>>> >>>>>>>>> On Thu, Apr 11, 2019 at 2:54 PM Till Rohrmann < >>>>>>>>> trohrmann@apache.org> wrote: >>>>>>>>> >>>>>>>>>> Hi Juho, >>>>>>>>>> >>>>>>>>>> yes, it means that the snapshotting of the timer state does not >>>>>>>>>> happen asynchronously but synchronously within the Task executor= thread. >>>>>>>>>> During this operation, your operator won't make any progress, po= tentially >>>>>>>>>> causing backpressure for upstream operators. >>>>>>>>>> >>>>>>>>>> If you want to use fully asynchronous snapshots while also using >>>>>>>>>> timer state, you should use the RocksDB backed timers. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Till >>>>>>>>>> >>>>>>>>>> On Thu, Apr 11, 2019 at 10:32 AM Juho Autio >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Ok, I'm testing that >>>>>>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB in the mea= nwhile. >>>>>>>>>>> >>>>>>>>>>> Btw, what does this actually mean (from >>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/stat= e/large_state_tuning.html >>>>>>>>>>> ): >>>>>>>>>>> >>>>>>>>>>> > The combination RocksDB state backend / with incremental >>>>>>>>>>> checkpoint / with heap-based timers currently does NOT support = asynchronous >>>>>>>>>>> snapshots for the timers state. Other state like keyed state is= still >>>>>>>>>>> snapshotted asynchronously. Please note that this is not a regr= ession from >>>>>>>>>>> previous versions and will be resolved with FLINK-10026. >>>>>>>>>>> >>>>>>>>>>> Is it just that snapshots are not asynchronous, so they cause >>>>>>>>>>> some pauses? Does "not supported" here mean just some performan= ce impact, >>>>>>>>>>> or also correctness? >>>>>>>>>>> >>>>>>>>>>> Our job at hand is using RocksDB state backend and incremental >>>>>>>>>>> checkpointing. However at least the restores that we've been te= sting here >>>>>>>>>>> have been from a *savepoint*, not an incremental checkpoint. >>>>>>>>>>> >>>>>>>>>>> On Wed, Apr 10, 2019 at 4:46 PM Konstantin Knauf < >>>>>>>>>>> konstantin@ververica.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Juho, >>>>>>>>>>>> >>>>>>>>>>>> one more thing we could try in a separate experiment is to >>>>>>>>>>>> change the timer state backend to RocksDB as well by setting >>>>>>>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB >>>>>>>>>>>> in the flink-conf.yaml and see if this also leads to the loss >>>>>>>>>>>> of records. That would narrow it down quite a bit. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> >>>>>>>>>>>> Konstantin >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Apr 10, 2019 at 1:02 PM Konstantin Knauf < >>>>>>>>>>>> konstantin@ververica.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>> >>>>>>>>>>>>> sorry for the late reply. Please continue to use the custom >>>>>>>>>>>>> Flink build and add additional logging for TaskStateManagerIm= pl by adding >>>>>>>>>>>>> the following line to your log4j configuration. >>>>>>>>>>>>> >>>>>>>>>>>>> log4j.logger.org.apache.flink.runtime.state.TaskStateManagerI= mpl=3DDEBUG >>>>>>>>>>>>> >>>>>>>>>>>>> Afterwards, do a couple of savepoint & restore until you see = a >>>>>>>>>>>>> number of restores < 80 as before and share the logs with me = (at least for >>>>>>>>>>>>> TaskStateMangerImpl & InternalTimerServiceImpl). >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> >>>>>>>>>>>>> Konstantin >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Apr 4, 2019 at 9:03 AM Juho Autio < >>>>>>>>>>>>> juho.autio@rovio.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Konstantin, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the follow-up. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> There are only 76 lines for restore in Job 3 instead of 80. >>>>>>>>>>>>>>> It would be very useful to know, if these lines were lost b= y the log >>>>>>>>>>>>>>> aggregation or really did not exist. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I fetched the actual taskmanager.log files to verify (we >>>>>>>>>>>>>> store the original files on s3). Then did grep for >>>>>>>>>>>>>> "InternalTimerServiceImpl - Restored". >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is for "job 1. (start - end) first restore with debug >>>>>>>>>>>>>> logging": >>>>>>>>>>>>>> Around 2019-03-26 09:08:43,352 - 78 hits >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is for "job 3. (start-middle) 3rd restore with debug >>>>>>>>>>>>>> logging (following day)": >>>>>>>>>>>>>> Around 2019-03-27 07:39:06,414 - 76 hits >>>>>>>>>>>>>> >>>>>>>>>>>>>> So yeah, we can rely on our log delivery to Kibana. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Note that as a new piece of information I found that the sam= e >>>>>>>>>>>>>> job also did an automatic restore from checkpoint around 201= 9-03-30 20:36 >>>>>>>>>>>>>> and there were 79 hits instead of 80. So it doesn't seem to = be only a >>>>>>>>>>>>>> problem in case of savepoints, can happen with a checkpoint = restore as well. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Were there any missing records in the output for the day of >>>>>>>>>>>>>>> the Job 1 -> Job 2 transition (26th of March)? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 20190326: missing 2592 >>>>>>>>>>>>>> 20190327: missing 4270 >>>>>>>>>>>>>> >>>>>>>>>>>>>> This even matches with the fact that on 26th 2 timers were >>>>>>>>>>>>>> missed in restore but on 27th it was 4. >>>>>>>>>>>>>> >>>>>>>>>>>>>> What's next? :) >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Thu, Apr 4, 2019 at 12:32 AM Konstantin Knauf < >>>>>>>>>>>>>> konstantin@ververica.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> one thing that makes the log output a little bit hard to >>>>>>>>>>>>>>> analyze is the fact, that the "Snapshot" lines include Save= points as well >>>>>>>>>>>>>>> as Checkpoints. To identify the savepoints, I looked at the= last 80 lines >>>>>>>>>>>>>>> per job, which seems plausible given the timestamps of the = lines. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> So, let's compare the number of timers before and after >>>>>>>>>>>>>>> restore: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Job 1 -> Job 2 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 23.091.002 event time timers for both. All timers for the >>>>>>>>>>>>>>> same window. So this looks good. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Job 2 -> Job 3 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 18.565.234 timers during snapshotting. All timers for the >>>>>>>>>>>>>>> same window. >>>>>>>>>>>>>>> 17.636.774 timers during restore. All timers for the same >>>>>>>>>>>>>>> window. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> There are only 76 lines for restore in Job 3 instead of 80. >>>>>>>>>>>>>>> It would be very useful to know, if these lines were lost b= y the log >>>>>>>>>>>>>>> aggregation or really did not exist. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Were there any missing records in the output for the day of >>>>>>>>>>>>>>> the Job 1 -> Job 2 transition (26th of March)? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Konstantin >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Mar 29, 2019 at 2:21 PM Juho Autio < >>>>>>>>>>>>>>> juho.autio@rovio.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I created a zip with these files: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> job 1. (start - end) first restore with debug logging >>>>>>>>>>>>>>>> job 2. (start-middle) second restore with debug logging >>>>>>>>>>>>>>>> (same day) >>>>>>>>>>>>>>>> job 2. (middle - end) before savepoint & cancel (following >>>>>>>>>>>>>>>> day) >>>>>>>>>>>>>>>> job 3. (start-middle) 3rd restore with debug logging >>>>>>>>>>>>>>>> (following day) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> It can be downloaded here: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> https://www.dropbox.com/s/33z0jbolueokao6/flink_debug_logs= .zip?dl=3D0 >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Thu, Mar 28, 2019 at 7:08 PM Konstantin Knauf < >>>>>>>>>>>>>>>> konstantin@ververica.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Yes, the number is the last number in the line. Feel free >>>>>>>>>>>>>>>>> to share all lines. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Konstantin >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Thu, Mar 28, 2019 at 5:00 PM Juho Autio < >>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi Konstantin! >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I would be interested in any changes in the number of >>>>>>>>>>>>>>>>>>> timers, not only the number of logged messages. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Sorry for the delay. I see, the count is the number of >>>>>>>>>>>>>>>>>> timers that last number on log line. For example for thi= s row it's 270409: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> March 26th 2019, 11:08:39.822 DEBUG >>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerS= erviceImpl Restored: >>>>>>>>>>>>>>>>>>> TimeWindow{start=3D1553558400000, end=3D1553644800000} = -> 270409 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The log lines don't contain task id =E2=80=93 how should= they be >>>>>>>>>>>>>>>>>> compared across different snapshots? Or should I share a= ll of these logs >>>>>>>>>>>>>>>>>> (at least couple of snapshots around the point of restor= e) and you'll >>>>>>>>>>>>>>>>>> compare them? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Mar 26, 2019 at 9:55 PM Konstantin Knauf < >>>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I based the branch on top of the current 1.6.4 branch. = I >>>>>>>>>>>>>>>>>>> can rebase on 1.6.2 for any future iterations. I would = be interested in any >>>>>>>>>>>>>>>>>>> changes in the number of timers, not only the number of= logged messages. >>>>>>>>>>>>>>>>>>> The sum of all counts should be the same during snapsho= tting and restore. >>>>>>>>>>>>>>>>>>> While a window is open, this number should always incre= ase (when comparing >>>>>>>>>>>>>>>>>>> multiple snapshots). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Konstantin >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Tue, Mar 26, 2019 at 11:01 AM Juho Autio < >>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Hi Konstantin, >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I got that debug logging working. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> You would now need to take a savepoint and restore >>>>>>>>>>>>>>>>>>>>> sometime in the middle of the day and should be able = to check >>>>>>>>>>>>>>>>>>>>> a) if there are any timers for the very old windows, >>>>>>>>>>>>>>>>>>>>> for which there is still some content lingering aroun= d >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> No timers for old windows were logged. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> All timers are for the same time window, for example: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> March 26th 2019, 11:08:39.822 DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl Restored: >>>>>>>>>>>>>>>>>>>>> TimeWindow{start=3D1553558400000, end=3D1553644800000= } -> 270409 >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Those milliseconds correspond to: >>>>>>>>>>>>>>>>>>>> Tue Mar 26 00:00:00 UTC 2019 =E2=80=93 Wed Mar 27 00:0= 0:00 UTC >>>>>>>>>>>>>>>>>>>> 2019. >>>>>>>>>>>>>>>>>>>> - So this seems normal >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> b) if there less timers after restore for the current >>>>>>>>>>>>>>>>>>>>> window. The missing timers would be recreated, as soo= n as any additional >>>>>>>>>>>>>>>>>>>>> records for the same key arrive within the window. Th= is means the number of >>>>>>>>>>>>>>>>>>>>> missing records might be less then the number of miss= ing timers. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Grepping for "Restored" gives 78 hits. That's >>>>>>>>>>>>>>>>>>>> suspicious because this job's parallelism is 80. The f= ollowing group for >>>>>>>>>>>>>>>>>>>> grep "Snapshot" already gives 80 hits. Ok actually tha= t would match with >>>>>>>>>>>>>>>>>>>> what you wrote: "missing timers would be recreated, as= soon as any >>>>>>>>>>>>>>>>>>>> additional records for the same key arrive within the = window". >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I tried killing & restoring once more. This time >>>>>>>>>>>>>>>>>>>> grepping for "Restored" gives 80 hits. Note that it's = possible that some >>>>>>>>>>>>>>>>>>>> logs had been lost around the time of restoration beca= use I'm browsing the >>>>>>>>>>>>>>>>>>>> logs through Kibana (ELK stack). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I will try kill & restore again tomorrow around noon & >>>>>>>>>>>>>>>>>>>> collect the same info. Is there anything else that you= 'd like me to share? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> By the way, it seems that your branch* is not based on >>>>>>>>>>>>>>>>>>>> 1.6.2 release, why so? It probably doesn't matter, but= in general would be >>>>>>>>>>>>>>>>>>>> good to minimize the scope of changes. But let's roll = with this for now, I >>>>>>>>>>>>>>>>>>>> don't want to build another package because it seems l= ike we're able to >>>>>>>>>>>>>>>>>>>> replicate the issue with this version :) >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>> Juho >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> *) >>>>>>>>>>>>>>>>>>>> https://github.com/apache/flink/compare/release-1.6.2.= ..knaufk:logging-timers >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Wed, Mar 20, 2019 at 2:20 PM Konstantin Knauf < >>>>>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I created a branch [1] which logs the number of event >>>>>>>>>>>>>>>>>>>>> time timers per namespace during snapshot and restore= . Please refer to [2] >>>>>>>>>>>>>>>>>>>>> to build Flink from sources. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> You need to set the logging level to DEBUG for >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl. If you >>>>>>>>>>>>>>>>>>>>> use log4j this is a one-liner in your log4j.propertie= s: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> log4j.logger.org.apache.flink.streaming.api.operators= .InternalTimerServiceImpl=3DDEBUG >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> The only additional logs will be the lines added in >>>>>>>>>>>>>>>>>>>>> the branch. The lines are of the following format ( -> >>>>>>>>>>>>>>>>>>>> Timers>), e.g. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=3D1553083589256, end=3D155= 3083589258} -> 1 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=3D1553083589256, end=3D155= 3083589258} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=3D1553083589456, end=3D155= 3083589458} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=3D1553083589356, end=3D155= 3083589358} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=3D1553083589482, end=3D155= 3083589484} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=3D1553083589456, end=3D155= 3083589458} -> 1 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=3D1553083589256, end=3D155= 3083589258} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=3D1553083589356, end=3D155= 3083589358} -> 1 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=3D1553083589456, end=3D155= 3083589458} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=3D1553083589482, end=3D155= 3083589484} -> 1 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=3D1553083589356, end=3D155= 3083589358} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=3D1553083589482, end=3D155= 3083589484} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=3D1553083589256, end=3D155= 3083589258} -> 1 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=3D1553083589456, end=3D155= 3083589458} -> 1 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=3D1553083589356, end=3D155= 3083589358} -> 1 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=3D1553083589482, end=3D155= 3083589484} -> 1 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=3D1553083589256, end=3D155= 3083589258} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=3D1553083589456, end=3D155= 3083589458} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=3D1553083589356, end=3D155= 3083589358} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=3D1553083589482, end=3D155= 3083589484} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=3D1553083589256, end=3D155= 3083589258} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=3D1553083589456, end=3D155= 3083589458} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=3D1553083589356, end=3D155= 3083589358} -> 2 >>>>>>>>>>>>>>>>>>>>> DEBUG >>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTime= rServiceImpl - >>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=3D1553083589482, end=3D155= 3083589484} -> 2 >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> You would now need to take a savepoint and restore >>>>>>>>>>>>>>>>>>>>> sometime in the middle of the day and should be able = to check >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> a) if there are any timers for the very old windows, >>>>>>>>>>>>>>>>>>>>> for which there is still some content lingering aroun= d >>>>>>>>>>>>>>>>>>>>> b) if there less timers after restore for the current >>>>>>>>>>>>>>>>>>>>> window. The missing timers would be recreated, as soo= n as any additional >>>>>>>>>>>>>>>>>>>>> records for the same key arrive within the window. Th= is means the number of >>>>>>>>>>>>>>>>>>>>> missing records might be less then the number of miss= ing timers. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Looking forward to the results! >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Konstantin >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>> https://github.com/knaufk/flink/tree/logging-timers >>>>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-relea= se-1.6/start/building.html#build-flink >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Tue, Mar 19, 2019 at 2:06 PM Juho Autio < >>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks, answers below. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> * Which Flink version do you need this for? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 1.6.2 >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> * You use RocksDBStatebackend, correct? If so, which >>>>>>>>>>>>>>>>>>>>>> value do your set for "state.backend.rocksdb.timer-s= ervice.factory" in the >>>>>>>>>>>>>>>>>>>>>> flink-conf.yaml. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Yes, RocksDBStatebackend. We don't >>>>>>>>>>>>>>>>>>>>>> set state.backend.rocksdb.timer-service.factory at a= ll, so whatever is the >>>>>>>>>>>>>>>>>>>>>> default in Flink 1.6.2? Based on the docs it seems t= hat it would be "heap" >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-rele= ase-1.6/ops/state/large_state_tuning.html >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Mon, Mar 18, 2019 at 6:26 PM Konstantin Knauf < >>>>>>>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I will prepare a Flink branch for you, which logs >>>>>>>>>>>>>>>>>>>>>>> the number of event time timers per window before s= napshot and after >>>>>>>>>>>>>>>>>>>>>>> restore. With this we should be able to check, if t= imers are lost during >>>>>>>>>>>>>>>>>>>>>>> savepoints. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Two questions: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> * Which Flink version do you need this for? 1.6? >>>>>>>>>>>>>>>>>>>>>>> * You use RocksDBStatebackend, correct? If so, whic= h >>>>>>>>>>>>>>>>>>>>>>> value do your set for "state.backend.rocksdb.timer-= service.factory" in the >>>>>>>>>>>>>>>>>>>>>>> flink-conf.yaml. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Konstantin >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 14, 2019 at 12:20 PM Juho Autio < >>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Hi Konstantin, >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Reading timers from snapshot doesn't seem >>>>>>>>>>>>>>>>>>>>>>>> straightforward. I wrote in private with Gyula, he= gave more suggestions >>>>>>>>>>>>>>>>>>>>>>>> (thanks!) but still it seems that it may be a rath= er big effort for me to >>>>>>>>>>>>>>>>>>>>>>>> figure it out. Would you be able to help with that= ? If yes, there's this >>>>>>>>>>>>>>>>>>>>>>>> existing unit test that can be extended to test re= ading timers: >>>>>>>>>>>>>>>>>>>>>>>> https://github.com/king/bravo/blob/master/bravo/sr= c/test/java/com/king/bravo/ReducerStateReadingTest.java#L37-L38 >>>>>>>>>>>>>>>>>>>>>>>> . The test already has a state with some values in= reducer window state, so >>>>>>>>>>>>>>>>>>>>>>>> I'm assuming that it must also contain some window= timers. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> This is what Gyula wrote to me: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Maybe I was wrong when I said the createOperatorSt= ateBackendsFromSnapshot >>>>>>>>>>>>>>>>>>>>>>>> is the way to do it. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On a second thought Timers are probably stored as >>>>>>>>>>>>>>>>>>>>>>>> raw keyed state in the operator. I don=E2=80=99t r= emember building any utility to >>>>>>>>>>>>>>>>>>>>>>>> read that. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> At the moment I am quite busy with other work so >>>>>>>>>>>>>>>>>>>>>>>> wont have time to build it for you, so you might h= ave to figure it out >>>>>>>>>>>>>>>>>>>>>>>> yourself. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I would try to look at how keyed states are read: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Look at the implementation of: >>>>>>>>>>>>>>>>>>>>>>>> createOperatorStateBackendsFromSnapshot() >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Instead of getManagedOperatorState you want to try >>>>>>>>>>>>>>>>>>>>>>>> getRawKeyedState and also look at how Flink restor= es it internally for >>>>>>>>>>>>>>>>>>>>>>>> Timers >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I would start looking around here I guess: >>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-= streaming-java/src/main/java/org/apache/flink/streaming/api/operators/Abstr= actStreamOperator.java#L238 >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/e8daa49a593ed= c401cd44761b25b1324b11be4a6/flink-streaming-java/src/main/java/org/apache/f= link/streaming/api/operators/StreamTaskStateInitializerImpl.java#L199 >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Tue, Mar 12, 2019 at 5:41 PM Gyula F=C3=B3ra < >>>>>>>>>>>>>>>>>>>>>>>> gyula.fora@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Should be possible to read timer states by: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> OperatorStateReader#createOperatorStateBackendFro= mSnapshot >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Then you have to get the timer state out of the >>>>>>>>>>>>>>>>>>>>>>>>> OperatorStateBackend, but keep in mind that this = will restore the operator >>>>>>>>>>>>>>>>>>>>>>>>> states in memory. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Mar 12, 2019 at 4:29 PM Konstantin Knauf = < >>>>>>>>>>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> okay, so it seems that although the watermark >>>>>>>>>>>>>>>>>>>>>>>>>> passed the endtime of the event time windows, t= he window was not triggered >>>>>>>>>>>>>>>>>>>>>>>>>> for some of the keys. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> The timers, which would trigger the firing of th= e >>>>>>>>>>>>>>>>>>>>>>>>>> window, are also part of the keyed state and are= snapshotted/restored. I >>>>>>>>>>>>>>>>>>>>>>>>>> would like to check if timers (as opposed to the= window content itself) are >>>>>>>>>>>>>>>>>>>>>>>>>> maybe lost during the savepoint & restore proced= ure. Using Bravo, are you >>>>>>>>>>>>>>>>>>>>>>>>>> also able to inspect the timer state of the save= points? In particular, I >>>>>>>>>>>>>>>>>>>>>>>>>> would be interested if for two subsequent savepo= ints all timers (i.e. one >>>>>>>>>>>>>>>>>>>>>>>>>> timer per window and key including the missing k= eys) are present in the >>>>>>>>>>>>>>>>>>>>>>>>>> savepoint. >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> @Gyula F=C3=B3ra : Does Br= avo >>>>>>>>>>>>>>>>>>>>>>>>>> support reading timer state as well? >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 7, 2019 at 6:41 PM Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> Right, the window operator is the one by name >>>>>>>>>>>>>>>>>>>>>>>>>>> "DistinctFunction". >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> http >>>>>>>>>>>>>>>>>>>>>>>>>>> http://10.1.59.75:20888/proxy/application_15519= 56351667_0001/jobs/3e4ffaadbd84af3488286863f00d4f23/vertices/19ede2f818524a= 7f310857e537fa6808/metrics\?get\=3D0.currentInputWatermark,1.currentInputWa= termark,2.currentInputWatermark,3.currentInputWatermark,4.currentInputWater= mark,5.currentInputWatermark,6.currentInputWatermark,7.currentInputWatermar= k,8.currentInputWatermark,9.currentInputWatermark,10.currentInputWatermark,= 11.currentInputWatermark,12.currentInputWatermark,13.currentInputWatermark,= 14.currentInputWatermark,15.currentInputWatermark,16.currentInputWatermark,= 17.currentInputWatermark,18.currentInputWatermark,19.currentInputWatermark,= 20.currentInputWatermark,21.currentInputWatermark,22.currentInputWatermark,= 23.currentInputWatermark,24.currentInputWatermark,25.currentInputWatermark,= 26.currentInputWatermark,27.currentInputWatermark,28.currentInputWatermark,= 29.currentInputWatermark,30.currentInputWatermark,31.currentInputWatermark,= 32.currentInputWatermark,33.currentInputWatermark,34.currentInputWatermark,= 35.currentInputWatermark,36.currentInputWatermark,37.currentInputWatermark,= 38.currentInputWatermark,39.currentInputWatermark,40.currentInputWatermark,= 41.currentInputWatermark,42.currentInputWatermark,43.currentInputWatermark,= 44.currentInputWatermark,45.currentInputWatermark,46.currentInputWatermark,= 47.currentInputWatermark,48.currentInputWatermark,49.currentInputWatermark,= 50.currentInputWatermark,51.currentInputWatermark,52.currentInputWatermark,= 53.currentInputWatermark,54.currentInputWatermark,55.currentInputWatermark,= 56.currentInputWatermark,57.currentInputWatermark,58.currentInputWatermark,= 59.currentInputWatermark,60.currentInputWatermark,61.currentInputWatermark,= 62.currentInputWatermark,63.currentInputWatermark,64.currentInputWatermark,= 65.currentInputWatermark,66.currentInputWatermark,67.currentInputWatermark,= 68.currentInputWatermark,69.currentInputWatermark,70.currentInputWatermark,= 71.currentInputWatermark,72.currentInputWatermark,73.currentInputWatermark,= 74.currentInputWatermark,75.currentInputWatermark,76.currentInputWatermark,= 77.currentInputWatermark,78.currentInputWatermark,79.currentInputWatermark >>>>>>>>>>>>>>>>>>>>>>>>>>> | jq '.[].value' --raw-output | uniq -c >>>>>>>>>>>>>>>>>>>>>>>>>>> 80 1551980102743 >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> date -r "$((1551980102743/1000))" >>>>>>>>>>>>>>>>>>>>>>>>>>> Thu Mar 7 19:35:02 EET 2019 >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> To me that makes sense =E2=80=93 how would the = window be >>>>>>>>>>>>>>>>>>>>>>>>>>> triggered at all, if not all sub-tasks have a h= igh enough watermark, so >>>>>>>>>>>>>>>>>>>>>>>>>>> that the operator level watermark can be advanc= ed. >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 7, 2019 at 5:33 PM Konstantin Knauf= < >>>>>>>>>>>>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> great, we are getting closer :) Could you >>>>>>>>>>>>>>>>>>>>>>>>>>>> please check the "Watermarks" tab the Flink UI= of this job and check if the >>>>>>>>>>>>>>>>>>>>>>>>>>>> current watermark for all parallel subtasks of= the WindowOperator is close >>>>>>>>>>>>>>>>>>>>>>>>>>>> to the current date/time? >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 7, 2019 at 3:01 PM Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Wow, indeed the missing data from previous >>>>>>>>>>>>>>>>>>>>>>>>>>>>> date is still found in the savepoint! >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually what I now found is that there is >>>>>>>>>>>>>>>>>>>>>>>>>>>>> still data from even older dates in the state= : >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> %%spark >>>>>>>>>>>>>>>>>>>>>>>>>>>>> state_json_next_day.groupBy(state_json_next_d= ay.ts.substr(1, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> 10).alias('day')).count().orderBy('day').show= (n=3D1000) >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> +----------+--------+ >>>>>>>>>>>>>>>>>>>>>>>>>>>>> | day| count| >>>>>>>>>>>>>>>>>>>>>>>>>>>>> +----------+--------+ >>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2018-08-22| 4206| >>>>>>>>>>>>>>>>>>>>>>>>>>>>> .. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> (manually truncated) >>>>>>>>>>>>>>>>>>>>>>>>>>>>> .. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2019-02-03| 4| >>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2019-02-14| 12881| >>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2019-02-15| 1393| >>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2019-02-25| 8774| >>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2019-03-06| 9293| >>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2019-03-07|28113105| >>>>>>>>>>>>>>>>>>>>>>>>>>>>> +----------+--------+ >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Of course that's the expected situation after >>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have learned that some window contents are= left untriggered. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't have the logs any more, but I think >>>>>>>>>>>>>>>>>>>>>>>>>>>>> on 2018-08-22 I have reset the state, and sin= ce then it's been always >>>>>>>>>>>>>>>>>>>>>>>>>>>>> kept/restored from savepoint. I can also see = some dates there on which I >>>>>>>>>>>>>>>>>>>>>>>>>>>>> didn't cancel the stream. But I can't be sure= if it has gone through some >>>>>>>>>>>>>>>>>>>>>>>>>>>>> automatic restart by flink. So we can't rule = out that some window contents >>>>>>>>>>>>>>>>>>>>>>>>>>>>> wouldn't sometimes also be missed during norm= al operation. However, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint restoration at least makes the prob= lem more prominent. I have >>>>>>>>>>>>>>>>>>>>>>>>>>>>> previously mentioned that I would suspect thi= s to be some kind of race >>>>>>>>>>>>>>>>>>>>>>>>>>>>> condition that is affected by load on the clu= ster. Reason for my suspicion >>>>>>>>>>>>>>>>>>>>>>>>>>>>> is that during savepoint restoration the clus= ter is also catching up kafka >>>>>>>>>>>>>>>>>>>>>>>>>>>>> offsets on full speed, so it is considerably = more loaded than usually. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Otherwise this problem might not have much to= do with savepoints of course. >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Are you able to investigate the problem in >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink code based on this information? >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Many thanks, >>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 6, 2019 at 1:41 PM Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the investigation & summary. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As you suggested, I will next take savepoint= s >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on two subsequent days & check the reducer s= tate for both days. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 6, 2019 at 1:18 PM Konstantin >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Knauf wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (Moving the discussion back to the ML) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after looking into your code, we are still >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pretty much in the dark with respect what i= s going wrong. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Let me try to summarize, what we know given >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your experiments so far: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) the lost records were processed and put >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> into state *before* the restart of the job,= not afterwards >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) the lost records are part of the state >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after the restore (because they are contain= ed in subsequent savepoints) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) the sinks are not the problem (because >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the metrics of the WindowOperator showed th= at the missing records have not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> been sent to the sinks) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4) it is not the batch job used for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reference, which is wrong, because of 1) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 5) records are only lost when restarting >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from a savepoint (not during normal operati= ons) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One explanation would be, that one of the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> WindowOperators did not fire (for whatever = reason) and the missing records >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are still in the window's state when you ru= n your test. Could you please >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check, whether this is the case by taking a= savepoint on the next day and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check if the missing records are contained = in it. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Feb 18, 2019 at 8:32 PM Juho Autio = < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Konstantin, thanks. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I gathered the additional info as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussed. No surprises there. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * do you know if all lost records are >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contained in the last savepoint you took = before the window fired? This >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would mean that no records are lost after= the last restore. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Indeed this is the case. I saved the list >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of all missing IDs, analyzed the savepoint= with Bravo, and the savepoint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state (already) contained all IDs that wer= e eventually missed in output. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * could you please check the numRecordsOut >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metric for the WindowOperator (FlinkUI ->= TaskMetrics -> Select TaskChain >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> containing WindowOperator -> find metric)= ? Is the count reported there >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> correct (no missing data)? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The number matches with output rows. The >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sum of numRecordsOut metrics was 45755630,= and count(*) of the output on s3 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> resulted in the same number. Batch output = has a bit more IDs of course >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (this time it was 1194). You wrote "Is the= count reported there correct (no >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> missing data)?" but I have slightly differ= ent viewpoint; I agree that the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reported count is correct (in flink's scop= e, because the number is the same >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as what's in output file). But I think "no= missing data" doesn't belong >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here. Data is missing, but it's consistent= ly missing from both output files >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and numRecordsOut metrics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Next thing I'll work on is preparing the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> code to be shared.. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Btw, I used this script to count the sum o= f >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> numRecordsOut (I'm going to look into enab= ling Sl4jReporter eventually) : >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> JOB_URL=3D >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> http://10.1.56.245:20888/proxy/application= _1550217512987_0001/jobs/068813ab8e6cbebaf7d306a0f41993c2 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunctionID=3D`http $JOB_URL \ >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> | jq '.vertices[] | select(.name =3D=3D >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "DistinctFunction") | .id' --raw-output` >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> echo >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "DistinctFunctionID=3D$DistinctFunctionID" >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> http >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> $JOB_URL/vertices/19ede2f818524a7f310857e5= 37fa6808/metrics | jq '.[] | .id' >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --raw-output | grep "[0-9][0-9]*\\.numReco= rdsOut$" \ >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> | xargs -I@ sh -c "http GET >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> $JOB_URL/vertices/19ede2f818524a7f310857e5= 37fa6808/metrics?get=3D@ | jq >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> '.[0].value' --raw-output" > numRecordsOut= .txt >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> # " eval_math( '+'.join( file.readlines ) = ) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> " >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> paste -sd+ numRecordsOut.txt | bc >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Feb 14, 2019 at 2:44 PM Konstantin >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Knauf wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * does the output of the streaming job >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contain any data, which is not containe= d in the batch >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> No. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * do you know if all lost records are >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contained in the last savepoint you too= k before the window fired? This >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would mean that no records are lost aft= er the last restore. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I haven't built the tooling required to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check all IDs like that, but yes, that's= my understanding currently. To >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check that I would need to: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - kill the stream only once on a given >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> day (so that there's only one savepoint = creation & restore) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - next day or later: save all missing id= s >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from batch output comparison >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - next day or later: read the savepoint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with bravo & check that it contains all = of those missing IDs >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> However I haven't built the tooling for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that yet. Do you think it's necessary to= verify that this assumption holds? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It would be another data point and might >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> help us to track down the problem. Wether= it is worth doing it, depends on >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the result, i.e. wether the current assum= ption would be falsified or not, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> but we only know that in retrospect ;) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * could you please check the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> numRecordsOut metric for the WindowOper= ator (FlinkUI -> TaskMetrics -> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Select TaskChain containing WindowOpera= tor -> find metric)? Is the count >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reported there correct (no missing data= )? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is that metric the result of window >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger? If yes, you must mean that I ch= eck the value of that metric on the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> next day after restore, so that it only = contains the count for the output >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of previous day's window? The counter is= reset to 0 when job starts (even >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> when state is restored), right? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, this metric would be incremented whe= n >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the window is triggered. Yes, please chec= k this metric after the window, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> during which the restore happened, is fir= ed. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If you don't have a MetricsReporter >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configured so far, I recommend to quickly= register a Sl4jReporter to log >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> out all metrics every X seconds (maybe ev= en minutes for your use case): >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flin= k-docs-release-1.7/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4j= slf4jreporter. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then you don't need to go trough the WebU= I and can keep a history of the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Otherwise, do you have any suggestions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for how to instrument the code to narrow= down further where the data gets >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lost? To me it would make sense to proce= ed with this, because the problem >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> seems hard to reproduce outside of our e= nvironment. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Let's focus on checking this metric above= , >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to make sure that the WindowOperator is a= ctually emitting less records than >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the overall number of keys in the state a= s your experiments suggest, and on >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sharing the code. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Feb 14, 2019 at 10:57 AM >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you are right the problem has actually >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> been narrowed down quite a bit over tim= e. Nevertheless, sharing the code >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (incl. flink-conf.yaml) might be a good= idea. Maybe something strikes the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> eye, that we have not thought about so = far. If you don't feel comfortable >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sharing the code on the ML, feel free t= o send me a PM. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Besides that, three more questions: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * does the output of the streaming job >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contain any data, which is not containe= d in the batch output? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * do you know if all lost records are >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contained in the last savepoint you too= k before the window fired? This >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would mean that no records are lost aft= er the last restore. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * could you please check the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> numRecordsOut metric for the WindowOper= ator (FlinkUI -> TaskMetrics -> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Select TaskChain containing WindowOpera= tor -> find metric)? Is the count >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reported there correct (no missing data= )? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Feb 13, 2019 at 3:19 PM Gyula >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> F=C3=B3ra wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry not posting on the mail list was >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> my mistake :/ >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, 13 Feb 2019 at 15:01, Juho >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Autio wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for stepping in, did you post >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> outside of the mailing list on purpos= e btw? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This I did long time ago: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To rule out for good any questions >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> about sink behaviour, the job was ki= lled and started with an additional >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka sink. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The same number of ids were missed i= n >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> both outputs: KafkaSink & BucketingS= ink. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (I wrote about that On Oct 1, 2018 in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this email thread) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> After that I did the savepoint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> analysis with Bravo. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently I'm indeed trying to get >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions how to debug further, for= example, where to add additional >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> kafka output, to catch where the data= gets lost. That would probably be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> somewhere in Flink's internals. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I could try to share the full code >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also, but IMHO the problem has been q= uite well narrowed down, considering >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that data can be found in savepoint, = savepoint is successfully restored, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and after restoring the data doesn't = go to "user code" (like the reducer) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any more. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Feb 13, 2019 at 3:47 PM Gyula >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> F=C3=B3ra wrot= e: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think the reason you are not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> getting much answers here is because= it is very hard to debug this problem >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> remotely. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Seemingly you do very normal >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> operations, the state contains all t= he required data and nobody else has >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> hit a similar problem for ages. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My best guess would be some bug with >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the deduplication or output writing = logic but without a complete code >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> example its very hard to say anythin= g useful. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Did you try writing it to Kafka to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> see if the output is there? (that wa= y we could rule out the dedup probllem) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Feb 13, 2019 at 2:37 PM Juho >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Autio wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stefan (or anyone!), please, could = I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have some feedback on the findings = that I reported on Dec 21, 2018? This is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still a major blocker.. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Jan 31, 2019 at 11:46 AM >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, is there anyone that could >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> help with this? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 11, 2019 at 8:14 AM >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stefan, would you have time to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> comment? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wednesday, January 2, 2019, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bump =E2=80=93 does anyone know = if Stefan >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be available to comment the= latest findings? Thanks. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Dec 21, 2018 at 2:33 PM >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stefan, I managed to analyze >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint with bravo. It seems = that the data that's missing from output >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> *is* found in savepoint. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I simplified my test case to th= e >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> following: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - job 1 has bee running for ~10 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> days >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - savepoint X created & job 1 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cancelled >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - job 2 started with restore >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from savepoint X >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then I waited until the next da= y >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so that job 2 has triggered the= 24 hour window. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then I analyzed the output & >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - compare job 2 output with the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output of a batch pyspark scrip= t =3D> find 4223 missing rows >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - pick one of the missing rows >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (say, id Z) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - read savepoint X with bravo, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter for id Z =3D> Z was foun= d in the savepoint! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> How can it be possible that the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value is in state but doesn't e= nd up in output after state has been >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restored & window is eventually= triggered? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also did similar analysis on >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the previous case where I savep= ointed & restored the job multiple times (5) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> within the same 24-hour window.= A missing id that I drilled down to, was >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> found in all of those savepoint= s, yet missing from the output that gets >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> written at the end of the day. = This is even more surprising: that the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> missing ID was written to the n= ew savepoints also after restoring. Is the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reducer state somehow decoupled= from the window contents? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Big thanks to bravo-developer >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula for guiding me through to= be able read the reducer state! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/king/bravo/p= ull/11 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula also had an idea for how >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to troubleshoot the missing dat= a in a scalable way: I could add some "side >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> effect kafka output" on individ= ual operators. This should allow tracking >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more closely at which point the= data gets lost. However, maybe this would >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have to be in some Flink's inte= rnal components, and I'm not sure which >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> those would be. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 19, 2018 at 11:52 A= M >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Stefan, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bravo doesn't currently suppor= t >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reading a reducer state. I gav= e it a try but couldn't get to a working >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation yet. If anyone = can provide some insight on how to make this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> work, please share at github: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/king/bravo/= pull/11 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 3:32 P= M >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I was glad to find that bravo >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> had now been updated to suppo= rt installing bravo to a local maven repo. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I was able to load a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint created by my job,= thanks to the example provided in bravo >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> README, but I'm still missing= the essential piece. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My code was: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> OperatorStateReader >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reader =3D new OperatorStateR= eader(env2, savepoint, "DistinctFunction"); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DontKnowWhatTypeThisI= s >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reducingState =3D reader.read= KeyedStates(what should I put here?); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't know how to read the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> values collected from reduce(= ) calls in the state. Is there a way to access >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the reducing state of the win= dow with bravo? I'm a bit confused how this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> works, because when I check w= ith debugger, flink internally uses >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a ReducingStateDescriptor wit= h name=3Dwindow-contents, but still reading >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> operator state for "DistinctF= unction" didn't at least throw an exception >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ("window-contents" threw =E2= =80=93 obviously there's no operator by that name). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 15, 2018 at 2:25 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PM Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Stefan, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry but it doesn't seem >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> immediately clear to me what= 's a good way to use >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/king/brav= o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> . >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> How are people using it? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would you for example modify= build.gradle somehow to publish the bravo as a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> library locally/internally? = Or add code directly in the bravo project >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (locally) and run it from th= ere (using an IDE, for example)? Also it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> doesn't seem like the bravo = gradle project supports building a flink job >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> jar, but if it does, how do = I do it? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 9:30 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PM Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Good then, I'll try to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> analyze the savepoints with= Bravo. Thanks! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > How would you assume that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> backpressure would influenc= e your updates? Updates to each local state >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still happen event-by-event= , in a single reader/writing thread. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sure, just an ignorant gues= s >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by me. I'm not familiar wit= h most of Flink's internals. Any way high >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> backpressure is not a seen = on this job after it has caught up the lag, so >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at I thought it would be wo= rth mentioning. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 6:24 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PM Stefan Richter < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> s.richter@data-artisans.com= > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Am 04.10.2018 um 16:08 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> schrieb Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > you could take a look at >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bravo [1] to query your sa= vepoints and to check if the state in the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint complete w.r.t y= our expectations >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks. I'm not 100% if >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this is the case, but to m= e it seemed like the missed ids were being logged >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by the reducer soon after = the job had started (after restoring a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint). But on the oth= er hand, after that I also made another savepoint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> & restored that, so what I= could check is: does that next savepoint have >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the missed ids that were l= ogged (a couple of minutes before the savepoint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was created, so there shou= ld've been more than enough time to add them to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the state before the savep= oint was triggered) or not. Any way, if I would >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be able to verify with Bra= vo that the ids are missing from the savepoint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (even though reduced logge= d that it saw them), would that help in figuring >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> out where they are lost? I= s there some major difference compared to just >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> looking at the final outpu= t after window has been triggered? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that makes a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> difference. For example, y= ou can investigate if there is a state loss or a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem with the windowing= . In the savepoint you could see which keys >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> exists and to which window= s they are assigned. Also just to make sure there >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is no misunderstanding: on= ly elements that are in the state at the start of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a savepoint are expected t= o be part of the savepoint; all elements between >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> start and completion of th= e savepoint are not expected to be part of the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > I also doubt that the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem is about backpress= ure after restore, because the job will only >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> continue running after the= state restore is already completed. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I'm not suspecting >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that the state restoring w= ould be the problem either. My concern was about >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> backpressure possibly mess= ing with the updates of reducing state? I would >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tend to suspect that updat= ing the state consistently is what fails, where >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> heavy load / backpressure = might be a factor. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> How would you assume that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> backpressure would influen= ce your updates? Updates to each local state >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still happen event-by-even= t, in a single reader/writing thread. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 4:1= 8 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PM Stefan Richter < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> s.richter@data-artisans.co= m> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you could take a look at >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bravo [1] to query your s= avepoints and to check if the state in the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint complete w.r.t = your expectations. I somewhat doubt that there is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a general problem with th= e state/savepoints because many users are >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> successfully running it o= n a large state and I am not aware of any data >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> loss problems, but nothin= g is impossible. What the savepoint does is also >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> straight forward: iterate= a db snapshot and write all key/value pairs to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> disk, so all data that wa= s in the db at the time of the savepoint, should >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> show up. I also doubt tha= t the problem is about backpressure after restore, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because the job will only= continue running after the state restore is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already completed. Did yo= u check if you are using exactly-one-semantics or >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at-least-once semantics? = Also did you check that the kafka consumer start >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> position is configured pr= operly [2]? Are watermarks generated as expected >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after restore? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One more unrelated >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> high-level comment that I= have: for a granularity of 24h windows, I wonder >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if it would not make sens= e to use a batch job instead? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stefan >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/king/b= ravo >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://ci.apache.org/pro= jects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-sta= rt-position-configuration >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Am 04.10.2018 um 14:53 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> schrieb Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestion= s! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > In general, it would be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tremendously helpful to h= ave a minimal working example which allows to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reproduce the problem. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Definitely. The problem >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with reproducing has been= that this only seems to happen in the bigger >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> production data volumes. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> That's why I'm hoping to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> find a way to debug this = with the production data. With that it seems to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consistently cause some m= isses every time the job is killed/restored. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > check if it happens for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> shorter windows, like 1h = etc >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What would be the benefit >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of that compared to 24h w= indow? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > simplify the job to not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use a reduce window but s= imply a time window which outputs the window >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> events. Then counting the= input and output events should allow you to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> verify the results. If yo= u are not seeing missing events, then it could >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have something to do with= the reducing state used in the reduce function. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hm, maybe, but not sure >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how useful that would be,= because it wouldn't yet prove that it's related >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to reducing, because not = having a reduce function could also mean smaller >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> load on the job, which mi= ght alone be enough to make the problem not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manifest. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is there a way to debug >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> what goes into the reduci= ng state (including what gets removed or >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> overwritten and what rest= ored), if that makes sense..? Maybe some suitable >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> logging could be used to = prove that the lost data is written to the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reducing state (or at lea= st asked to be written), but not found any more >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> when the window closes an= d state is flushed? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On configuration once >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more, we're using RocksDB= state backend with asynchronous incremental >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpointing. The state = is restored from savepoints though, we haven't >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> been using those checkpoi= nts in these tests (although they could be used in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> case of crashes =E2=80=93= but we haven't had those now). >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3:25 PM Till Rohrmann < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trohrmann@apache.org> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> another idea to further >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> narrow down the problem = could be to simplify the job to not use a reduce >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window but simply a time= window which outputs the window events. Then >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> counting the input and o= utput events should allow you to verify the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> results. If you are not = seeing missing events, then it could have something >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to do with the reducing = state used in the reduce function. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In general, it would be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tremendously helpful to = have a minimal working example which allows to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reproduce the problem. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Till >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2:02 PM Andrey Zagrebin = < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisans.com= > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can you try to reduce >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the job to minimal repr= oducible example and share the job and input? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - some simple records a= s >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> input, e.g. tuples of p= rimitive types saved as cvs >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - minimal deduplication >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> job which processes the= m and misses records >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - check if it happens >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for shorter windows, li= ke 1h etc >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - setup which you use >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for the job, ideally lo= cally reproducible or cloud >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 4 Oct 2018, at 11:13= , >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry to insist, but we >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> seem to be blocked for = any serious usage of state in Flink if we can't rely >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on it to not miss data = in case of restore. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would anyone have >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions for how to = troubleshoot this? So far I have verified with DEBUG >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> logs that our reduce fu= nction gets to process also the data that is missing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from window output. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 1, 2018 at >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 11:56 AM Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Andrey, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To rule out for good >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any questions about si= nk behaviour, the job was killed and started with an >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> additional Kafka sink. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The same number of ids >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> were missed in both ou= tputs: KafkaSink & BucketingSink. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I wonder what would be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the next steps in debu= gging? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 21, 2018 a= t >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3:49 PM Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, Andrey. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > so it means that th= e >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint does not lo= ose at least some dropped records. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm not sure what you >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mean by that? I mean,= it was known from the beginning, that not everything >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is lost before/after = restoring a savepoint, just some records around the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> time of restoration. = It's not 100% clear whether records are lost before >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> making a savepoint or= after restoring it. Although, based on the new DEBUG >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> logs it seems more li= ke losing some records that are seen ~soon after >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restoring. It seems l= ike Flink would be somehow confused either about the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restored state vs. ne= w inserts to state. This could also be somehow linked >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to the high back pres= sure on the kafka source while the stream is catching >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> up. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > If it is feasible >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for your setup, I sug= gest to insert one more map function after reduce and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> before sink. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > etc. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Isn't that the same >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> thing that we discuss= ed before? Nothing is sent to BucketingSink before the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window closes, so I d= on't see how it would make any difference if we >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replace the Bucketing= Sink with a map function or another sink type. We >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> don't create or resto= re savepoints during the time when BucketingSink gets >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> input or has open buc= kets =E2=80=93 that happens at a much later time of day. I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would focus on figuri= ng out why the records are lost while the window is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> open. But I don't kno= w how to do that. Would you have any additional >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 21, 2018 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at 3:30 PM Andrey Zag= rebin < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisans.= com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so it means that the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint does not l= oose at least some dropped records. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If it is feasible fo= r >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your setup, I sugges= t to insert one more map function after reduce and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> before sink. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The map function >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should be called rig= ht after window is triggered but before flushing to s3. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The result of reduce >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (deduped record) cou= ld be logged there. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This should allow to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check whether the pr= ocessed distinct records were buffered in the state >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after the restoratio= n from the savepoint or not. If they were buffered we >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should see that ther= e was an attempt to write them to the sink from the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Another suggestion i= s >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to try to write reco= rds to some other sink or to both. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> E.g. if you can >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> access file system o= f workers, maybe just into local files and check >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> whether the records = are also dropped there. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 20 Sep 2018, at >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 15:37, Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com= > >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Andrey! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I was finally able t= o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> gather the DEBUG log= s that you suggested. In short, the reducer logged that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it processed at leas= t some of the ids that were missing from the output. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "At least some", >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because I didn't hav= e the job running with DEBUG logs for the full 24-hour >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window period. So I = was only able to look up if I can find >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> *some* of the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> missing ids in the D= EBUG logs. Which I did indeed. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I changed the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunction.jav= a to do this: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> public >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Map = reduce(Map value1, Map >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value2) { >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LOG.debug("DistinctF= unction.reduce returns: {}=3D{}", value1.get("field"), >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value1.get("id")); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> return value= 1; >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> vi >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flink-1.6.0/conf/log= 4j.properties >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> log4j.logger.org.apa= che.flink.streaming.runtime.tasks.StreamTask=3DDEBUG >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> log4j.logger.com.rov= io.ds.flink.uniqueid.DistinctFunction=3DDEBUG >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then I ran the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> following kind of te= st: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Cancelled the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on-going job with sa= vepoint created at ~Sep 18 08:35 UTC 2018 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Started a new >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cluster & job with D= EBUG enabled at ~09:13, restored from that previous >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cluster's savepoint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Ran until caught u= p >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> offsets >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Cancelled the job >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with a new savepoint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Started a new job >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> _without_ DEBUG, whi= ch restored the new savepoint, let it keep running so >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that it will eventua= lly write the output >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then on the next day= , >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after results had be= en flushed when the 24-hour window closed, I compared >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the results again wi= th a batch version's output. And found some missing ids >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as usual. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I drilled down to on= e >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specific missing id = (I'm replacing the actual value with AN12345 below), >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which was not found = in the stream output, but was found in batch output & >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flink DEBUG logs. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Related to that id, = I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> gathered the followi= ng information: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18~09:13:21,= 000 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> job started & savepo= int is restored >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 09:14:29,085 missing= id is processed for the first time, proved by this log >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> line: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 09:14:29,085 DEBUG c= om.rovio.ds.flink.uniqueid.DistinctFunction >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - DistinctFunc= tion.reduce returns: s.aid1=3DAN12345 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 09:15:14,264 first s= ynchronous part of checkpoint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 09:15:16,544 first a= synchronous part of checkpoint >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ( >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more occurrences of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoints (~1 min = checkpointing time + ~1 min delay before next) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> / >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more occurrences of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunction.red= uce >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 09:23:45,053 missing= id is processed for the last time >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18~10:20:00,= 000 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint created & = job cancelled >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To be noted, there >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was high backpressur= e after restoring from savepoint until the stream >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> caught up with the k= afka offsets. Although, our job uses assign timestamps >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> & watermarks on the = flink kafka consumer itself, so event time of all >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions is synchr= onized. As expected, we don't get any late data in the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> late data side outpu= t. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> From this we can see >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that the missing ids= are processed by the reducer, but they must get lost >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> somewhere before the= 24-hour window is triggered. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it's worth >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioning once more= that the stream doesn't miss any ids if we let it's >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> running without inte= rruptions / state restoring. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What's next? >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Aug 29, 2018 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at 3:49 PM Andrey Za= grebin < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisans= .com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > only when the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 24-hour window trig= gers, BucketingSink gets a burst of input >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is of course >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> totally true, my un= derstanding is the same. We cannot exclude problem there >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for sure, just save= points are used a lot w/o problem reports >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and BucketingSink i= s known to be problematic with s3. That is why, I asked >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > You also wrote >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that the timestamps= of lost event are 'probably' around the time of the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint, if it is= not yet for sure I would also check it. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Although, bucketing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink might loose an= y data at the end of the day (also from the middle). The >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fact, that it is al= ways around the time of taking a savepoint and not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> random, is surely s= uspicious and possible savepoint failures need to be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> investigated. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the s3 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem, s3 doc say= s: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > The caveat is tha= t >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if you make a HEAD = or GET request to the key name (to find if the object >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> exists) before crea= ting the object, Amazon S3 provides >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'eventual consisten= cy' for read-after-write. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The algorithm you >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggest is how it i= s roughly implemented now >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (BucketingSink.open= NewPartFile). My understanding is that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'eventual consisten= cy=E2=80=99 means that even if you just created file (its name >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is key) it can be t= hat you do not get it in the list or exists (HEAD) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> returns false and y= ou risk to rewrite the previous part. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The BucketingSink w= as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> designed for a stan= dard file system. s3 is used over a file system wrapper >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> atm but does not al= ways provide normal file system guarantees. See also >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> last example in [1]= . >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://codeburst.i= o/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 29 Aug 2018, at >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 12:11, Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.co= m> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey, thank you >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> very much for the d= ebugging suggestions, I'll try them. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In the meanwhile tw= o >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more questions, ple= ase: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > Just to keep in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mind this problem w= ith s3 and exclude it for sure. I would also check >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> whether the size of= missing events is around the batch size of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BucketingSink or no= t.Fair enough, but I also want to focus on debugging the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> most probable subje= ct first. So what do you think about this =E2=80=93 true or >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> false: only when th= e 24-hour window triggers, BucketinSink gets a burst of >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> input. Around the s= tate restoring point (middle of the day) it doesn't get >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any input, so it ca= n't lose anything either. Isn't this true, or have I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> totally missed how = Flink works in triggering window results? I would not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expect there to be = any optimization that speculatively triggers early >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> results of a regula= r time window to the downstream operators. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > The old >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BucketingSink has i= n general problem with s3. Internally BucketingSink >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> queries s3 as a fil= e system to list already written file parts (batches) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and determine index= of the next part to start. Due to eventual consistency >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of checking file ex= istence in s3 [1], the BucketingSink can rewrite the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> previously written = part and basically loose it. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I was wondering, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> what does S3's "rea= d-after-write consistency" (mentioned on the page you >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> linked) actually me= an. It seems that this might be possible: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - LIST keys, find >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current max index >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - choose next index >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> =3D max + 1 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - HEAD next index: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if it exists, keep = adding + 1 until key doesn't exist on S3 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> But definitely >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sounds easier if a = sink keeps track of files in a way that's guaranteed to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be consistent. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Aug 27, 201= 8 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at 2:04 PM Andrey Z= agrebin < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisan= s.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Hi,true, Stre= amingFileSink does not support s3 in 1.6.0, it is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> planned for the nex= t 1.7 release, sorry for confusion.The old BucketingSink >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> has in general prob= lem with s3. Internally BucketingSink queries s3 as a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> file system to list= already written file parts (batches) and determine >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> index of the next p= art to start. Due to eventual consistency of checking >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> file existence in s= 3 [1], the BucketingSink can rewrite the previously >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> written part and ba= sically loose it. It should be fixed for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> StreamingFileSink i= n 1.7 where Flink keeps its own track of written parts >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and does not rely o= n s3 as a file system. I also include Kostas, he might >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> add more details. J= ust to keep in mind this problem with s3 and exclude it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for sure I would a= lso check whether the size of missing events is around >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the batch size of B= ucketingSink or not. You also wrote that the timestamps >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of lost event are '= probably' around the time of the savepoint, if it is not >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> yet for sure I woul= d also check it.Have you already checked the log files >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of job manager and = task managers for the job running before and after the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restore from the ch= eck point? Is everything successful there, no errors, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> relevant warnings o= r exceptions?As the next step, I would suggest to log >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all encountered eve= nts in DistinctFunction.reduce if possible for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> production data and= check whether the missed events are eventually >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> processed before or= after the savepoint. The following log message >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> indicates a border = between the events that should be included into the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint (logged b= efore) or not:=E2=80=9C{} ({}, synchronous part) in thread {} >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> took {} ms=E2=80=9D= (template)Also check if the savepoint has been overall >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> completed:"{} ({}, = asynchronous part) in thread {} took {} >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ms."Best,Andrey[1] >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://docs.aws.am= azon.com/AmazonS3/latest/dev/Introduction.htmlOn >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 24 Aug 2018, at 20:= 41, Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.co= m> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Hi,Using Stre= amingFileSink is not a convenient option for production >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use for us as it do= esn't support s3*. I could use StreamingFileSink just to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> verify, but I don't= see much point in doing so. Please consider my previous >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> comment:> I realize= d that BucketingSink must not play any role in this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem. This is be= cause only when the 24-hour window triggers, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BucketingSink gets = a burst of input. Around the state restoring point >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (middle of the day)= it doesn't get any input, so it can't lose anything >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> either (right?).I c= ould also use a kafka sink instead, but I can't imagine >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how there could be = any difference. It's very real that the sink doesn't get >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any input for a lon= g time until the 24-hour window closes, and then it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> quickly writes out = everything because it's not that much data eventually >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for the distinct va= lues.Any ideas for debugging what's happening around the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint & restora= tion time?*) I actually implemented StreamingFileSink as >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an alternative sink= . This was before I came to realize that most likely the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink component has = nothing to do with the data loss problem. I tried it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with s3n:// path ju= st to see an exception being thrown. In the source code >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I indeed then found= an explicit check for the target path scheme to be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "hdfs://". On Fri, = Aug 24, 2018 at 7:49 PM Andrey Zagrebin < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisan= s.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Ok, I think b= efore further debugging the window reduced state, could >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you try the new =E2= =80=98StreamingFileSink=E2=80=99 [1] introduced in Flink 1.6.0 instead >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of the previous 'Bu= cketingSink=E2=80=99?Cheers,Andrey[1] >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://ci.apache.o= rg/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.htmlOn >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 24 Aug 2018, at 18:= 03, Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.co= m> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Yes, sorry fo= r my confusing comment. I just meant that it seems like >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there's a bug somew= here now that the output is missing some data.> I would >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wait and check the = actual output in s3 because it is the main result of the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> jobYes, and that's = what I have already done. There seems to be always some >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data loss with the = production data volumes, if the job has been restarted >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on that day.Would y= ou have any suggestions for how to debug this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> further?Many thanks= for stepping in.On Fri, Aug 24, 2018 at 6:37 PM Andrey >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Zagrebin < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisan= s.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Hi Juho,So it= is a per key deduplication job.Yes, I would wait and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check the actual ou= tput in s3 because it is the main result of the job and> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The late data aroun= d the time of taking savepoint might be not included >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> into the savepoint = but it should be behind the snapshotted offset in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka.is not a bug, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is a possible be= haviour.The savepoint is a snapshot of the data in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transient which is = already consumed from Kafka.Basically the full contents >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of the window resul= t is split between the savepoint and what can come after >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the savepoint'ed of= fset in Kafka but before the window result is written >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> into s3. Allowed la= teness should not affect it, I am just saying that the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> final result in s3 = should include all records after it. This is what should >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be guaranteed but n= ot the contents of the intermediate >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint.Cheers,An= dreyOn 24 Aug 2018, at 16:52, Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.co= m> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Thanks for yo= ur answer!I check for the missed data from the final >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output on s3. So I = wait until the next day, then run the same thing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> re-implemented in b= atch, and compare the output.> The late data around the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> time of taking save= point might be not included into the savepoint but it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should be behind th= e snapshotted offset in Kafka.Yes, I would definitely >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expect that. It see= ms like there's a bug somewhere.> Then it should just >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> come later after th= e restore and should be reduced within the allowed >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lateness into the f= inal result which is saved into s3.Well, as far as I >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know, allowed laten= ess doesn't play any role here, because I started >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> running the job wit= h allowedLateness=3D0, and still get the data loss, while >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> my late data output= doesn't receive anything.> Also, is this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `DistinctFunction.r= educe` just an example or the actual implementation, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> basically saving ju= st one of records inside the 24h window in s3? then what >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is missing there?Ye= s, it's the actual implementation. Note that there's a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> keyBy before the Di= stinctFunction. So there's one record for each key >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (which is the combi= nation of a couple of fields). In practice I've seen >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that we're missing = ~2000-4000 elements on each restore, and the total >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output is obviously= much more than that.Here's the full code for the key >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> selector:public cla= ss MapKeySelector implements >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KeySelector, Object> { private final String[] >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fields; public M= apKeySelector(String... fields) { this.fields =3D >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fields; } @Ov= erride public Object getKey(Map >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> event) throws Excep= tion { Tuple key =3D >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Tuple.getTupleClass= (fields.length).newInstance(); for (int i =3D 0; i >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < fields.length; i+= +) { >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key.setField(event.= getOrDefault(fields[i], ""), i); } return >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key; }}And a mor= e exact example on how it's used: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .keyBy(new MapKey= Selector("ID", "PLAYER_ID", "FIELD", "KEY_NAME", >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "KEY_VALUE")) = .timeWindow(Time.days(1)) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .reduce(new Distinc= tFunction())On Fri, Aug 24, 2018 at 5:26 PM Andrey >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Zagrebin < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisan= s.com> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Hi Juho,Where= exactly does the data miss? When do you notice that? Do >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you check it:- debu= gging `DistinctFunction.reduce` right after resume in >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the middle of the d= ay or - some distinct records miss in the final output >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of BucketingSink in= s3 after window result is actually triggered and saved >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> into s3 at the end = of the day? is this the main output?The late data around >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the time of taking = savepoint might be not included into the savepoint but >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it should be behind= the snapshotted offset in Kafka. Then it should just >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> come later after th= e restore and should be reduced within the allowed >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lateness into the f= inal result which is saved into s3.Also, is this >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `DistinctFunction.r= educe` just an example or the actual implementation, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> basically saving ju= st one of records inside the 24h window in s3? then what >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is missing there?Ch= eers,AndreyOn 23 Aug 2018, at 15:42, Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.co= m> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:I changed to = allowedLateness=3D0, no change, still missing data when >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restoring from save= point.On Tue, Aug 21, 2018 at 10:43 AM Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.co= m> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:I realized th= at BucketingSink must not play any role in this problem. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is because onl= y when the 24-hour window triggers, BucketinSink gets a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> burst of input. Aro= und the state restoring point (middle of the day) it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> doesn't get any inp= ut, so it can't lose anything either (right?).I will >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> next try removing t= he allowedLateness entirely from the equation.In the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> meanwhile, please l= et me know if you have any suggestions for debugging the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lost data, for exam= ple what logs to enable.We use FlinkKafkaConsumer010 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> btw. Are there any = known issues with that, that could contribute to lost >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data when restoring= a savepoint?On Fri, Aug 17, 2018 at 4:23 PM Juho Autio < >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.co= m> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Some data is = silently lost on my Flink stream job when state is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restored from a sav= epoint.Do you have any debugging hints to find out where >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> exactly the data ge= ts dropped?My job gathers distinct values using a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 24-hour window. It = doesn't have any custom state management.When I cancel >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the job with savepo= int and restore from that savepoint, some data is >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> missed. It seems to= be losing just a small amount of data. The event time >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of lost data is pro= bably around the time of savepoint. In other words the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rest of the time wi= ndow is not entirely missed =E2=80=93 collection works correctly >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also for (most of t= he) events that come in after restoring.When the job >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> processes a full 24= -hour window without interruptions it doesn't miss >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> anything.Usually th= e problem doesn't happen in test environments that have >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> smaller parallelism= and smaller data volumes. But in production volumes the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> job seems to be con= sistently missing at least something on every >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restore.This issue = has consistently happened since the job was initially >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> created. It was at = first run on an older version of Flink 1.5-SNAPSHOT and >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it still happens on= both Flink 1.5.2 & 1.6.0.I'm wondering if this could be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for example some sy= nchronization issue between the kafka consumer offsets >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> vs. what's been wri= tten by BucketingSink?1. Job content, simplified >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> kafkaStream = .flatMap(new ExtractFieldsFunction()) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .keyBy(new Ma= pKeySelector(1, 2, 3, 4)) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .timeWindow(Time.da= ys(1)) .allowedLateness(allowedLateness) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .side= OutputLateData(lateDataTag) .reduce(new >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunction())= .addSink(sink) // use a >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fixed number of out= put partitions .setParallelism(8))/** * >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Usage: .keyBy("the"= , "distinct", "fields").reduce(new >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunction())= */public class DistinctFunction implements >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ReduceFunction> { @Override public >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Map= reduce(Map value1, Map >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value2) { re= turn value1; }}2. State configurationboolean >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> enableIncrementalCh= eckpointing =3D true;String statePath =3D " >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> s3n://bucket/savepo= ints";new >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> RocksDBStateBackend= (statePath, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> enableIncrementalCh= eckpointing);Checkpointing Mode Exactly OnceInterval 1m >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 0sTimeout 10m 0sMin= imum Pause Between Checkpoints 1m 0sMaximum Concurrent >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Checkpoints 1Persis= t Checkpoints Externally Enabled (retain on >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cancellation)3. Buc= ketingSink configurationWe use BucketingSink, I don't >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think there's anyth= ing special here, if not the fact that we're writing to >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> S3. String o= utputPath =3D " >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> s3://bucket/output"= ; >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BucketingSink= > sink =3D new >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BucketingSink>(outputPath) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .setBucketer(new Pr= ocessdateBucketer()) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .setBatchSize(batch= Size) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketT= hreshold(inactiveBucketThreshold) >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketC= heckInterval(inactiveBucketCheckInterval); >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink.setWriter(new = IdJsonWriter());4. Kafka & event timeMy flink job reads >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the data from Kafka= , using a BoundedOutOfOrdernessTimestampExtractor on the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> kafka consumer to s= ynchronize watermarks accross all kafka partitions. We >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also write late dat= a to side output, but nothing is written there =E2=80=93 if it >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would, it could exp= lain missed data in the main output (I'm also sure that >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> our late data writi= ng works, because we previously had some actual late >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data which ended up= there).5. allowedLatenessIt may be or may not be >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> relevant that I hav= e also enabled allowedLateness with 1 minute lateness on >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the 24-hour window:= If that makes sense, I could try removing >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> allowedLateness ent= irely? That would be just to rule out that Flink doesn't >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have a bug that's r= elated to restoring state in combination with the >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> allowedLateness fea= ture. After all, all of our data should be in a good >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> enough order to not= be late, given the max out of orderness used on kafka >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consumer timestamp = extractor.Thank you in advance! >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> *Juho Autio* >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Senior Data Engineer >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Engineering, Games >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Rovio Entertainment Corporation >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Mobile: + 358 (0)45 313 0122 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> www.rovio.com >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> *This message and its attachments may >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contain confidential information and = is intended solely for the attention >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and use of the named addressee(s). If= you are not the intended recipient >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and / or you have received this messa= ge in error, please contact the sender >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> immediately and delete all material y= ou have received in this message. You >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are hereby notified that any use of t= he information, which you have >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> received in error in whatsoever form,= is strictly prohibited. Thank you for >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your co-operation.* >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +49 160 91394525 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - The >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Apache Flink Conference >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Time >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 115, 10115 Berlin, Germany >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Charlottenburg: HRB 158244 B >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dr. Stephan Ewen >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +49 160 91394525 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - The Apache >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink Conference >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Time >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115= , >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 10115 Berlin, Germany >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> HRB 158244 B >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dr. Stephan Ewen >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +49 160 91394525 >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - The Apache >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink Conference >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Tim= e >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 10115 Berlin, Germany >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> HRB 158244 B >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stephan Ewen >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect >>>>>>>>>>>>>>>>>>>>>>>>>>>> +49 160 91394525 >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData >>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward >>>>>>>>>>>>>>>>>>>>>>>>>>>> - The Apache Flink Conference >>>>>>>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, >>>>>>>>>>>>>>>>>>>>>>>>>>>> 10115 Berlin, Germany >>>>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH >>>>>>>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB >>>>>>>>>>>>>>>>>>>>>>>>>>>> 158244 B >>>>>>>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. >>>>>>>>>>>>>>>>>>>>>>>>>>>> Stephan Ewen >>>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect >>>>>>>>>>>>>>>>>>>>>>>>>> +49 160 91394525 >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData >>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward >>>>>>>>>>>>>>>>>>>>>>>>>> - The Apache Flink Conference >>>>>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 >>>>>>>>>>>>>>>>>>>>>>>>>> Berlin, Germany >>>>>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH >>>>>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB >>>>>>>>>>>>>>>>>>>>>>>>>> 158244 B >>>>>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. >>>>>>>>>>>>>>>>>>>>>>>>>> Stephan Ewen >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect >>>>>>>>>>>>>>>>>>>>>>> +49 160 91394525 >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData >>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward - >>>>>>>>>>>>>>>>>>>>>>> The Apache Flink Conference >>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 >>>>>>>>>>>>>>>>>>>>>>> Berlin, Germany >>>>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH >>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 15824= 4 >>>>>>>>>>>>>>>>>>>>>>> B >>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan >>>>>>>>>>>>>>>>>>>>>>> Ewen >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect >>>>>>>>>>>>>>>>>>>>> +49 160 91394525 >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData >>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>> Join Flink Forward - The >>>>>>>>>>>>>>>>>>>>> Apache Flink Conference >>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 >>>>>>>>>>>>>>>>>>>>> Berlin, Germany >>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH >>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 = B >>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan >>>>>>>>>>>>>>>>>>>>> Ewen >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect >>>>>>>>>>>>>>>>>>> +49 160 91394525 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Follow us @VervericaData >>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> Join Flink Forward - The >>>>>>>>>>>>>>>>>>> Apache Flink Conference >>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin= , >>>>>>>>>>>>>>>>>>> Germany >>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> Data Artisans GmbH >>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewe= n >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect >>>>>>>>>>>>>>>>> +49 160 91394525 >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Follow us @VervericaData >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> Join Flink Forward - The >>>>>>>>>>>>>>>>> Apache Flink Conference >>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, >>>>>>>>>>>>>>>>> Germany >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> Data Artisans GmbH >>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect >>>>>>>>>>>>>>> +49 160 91394525 >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Follow us @VervericaData >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Join Flink Forward - The >>>>>>>>>>>>>>> Apache Flink Conference >>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, >>>>>>>>>>>>>>> Germany >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Data Artisans GmbH >>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Konstantin Knauf | Solutions Architect >>>>>>>>>>>>> +49 160 91394525 >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Follow us @VervericaData >>>>>>>>>>>>> -- >>>>>>>>>>>>> Join Flink Forward - The Apache >>>>>>>>>>>>> Flink Conference >>>>>>>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>>>>>>> -- >>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, >>>>>>>>>>>>> Germany >>>>>>>>>>>>> -- >>>>>>>>>>>>> Data Artisans GmbH >>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Konstantin Knauf | Solutions Architect >>>>>>>>>>>> +49 160 91394525 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Follow us @VervericaData >>>>>>>>>>>> -- >>>>>>>>>>>> Join Flink Forward - The Apache >>>>>>>>>>>> Flink Conference >>>>>>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>>>>>> -- >>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germa= ny >>>>>>>>>>>> -- >>>>>>>>>>>> Data Artisans GmbH >>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Konstantin Knauf | Solutions Architect >>>>>>>> >>>>>>>> +49 160 91394525 >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Follow us @VervericaData >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Join Flink Forward - The Apache Flink >>>>>>>> Conference >>>>>>>> >>>>>>>> Stream Processing | Event Driven | Real Time >>>>>>>> >>>>>>>> -- >>>>>>>> >>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>>>>> >>>>>>>> -- >>>>>>>> Data Artisans GmbH >>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>>>>>> >>>>>>> >>>>> >>>>> -- >>>>> >>>>> Konstantin Knauf | Solutions Architect >>>>> >>>>> +49 160 91394525 >>>>> >>>>> >>>>> >>>>> Follow us @VervericaData >>>>> >>>>> -- >>>>> >>>>> Join Flink Forward - The Apache Flink >>>>> Conference >>>>> >>>>> Stream Processing | Event Driven | Real Time >>>>> >>>>> -- >>>>> >>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany >>>>> >>>>> -- >>>>> Data Artisans GmbH >>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B >>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen >>>>> >>>> > > -- > > Konstantin Knauf | Solutions Architect > > +49 160 91394525 > > > Planned Absences: 17.04.2019 - 26.04.2019 > > > > > Follow us @VervericaData > > -- > > Join Flink Forward - The Apache Flink > Conference > > Stream Processing | Event Driven | Real Time > > -- > > Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany > > -- > Data Artisans GmbH > Registered at Amtsgericht Charlottenburg: HRB 158244 B > Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen > --0000000000001bca7a0587f7aa7c Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Konstantin, thanks for providing the new code.

Here are the latest results for jobs run with extende= d DEBUG logging.

20190427 = (killed & restored), missing_rows.count(): 3470
20190428=C2=A0(no kill / restore), missing_rows.count()= :=C2=A00

I have shared the logs from 27th (after restore) in p= rivate with Konstantin.

On Fri, Apr 26, 2019 at 5:05= PM Konstantin Knauf <konstantin@ververica.com> wrote:
Hi Juho,

so= rry for not being more responsive the last two weeks, I was on vacation for= a good part of it. The fact that this also happens with Timers on RocksDB = is again confusing. The code that we mainly had a look at so far is not use= d by the rocksdb configuration. So the inconsistencies that we saw in the l= ogs, don't apply to the RocksDB configuration.

Anyway, I agree to further track down the issue for the heap timers = first, and then to move on to RocksDB. I have added more fine grained loggi= ng to the branch [1]. The two additional classes, which you need to set the= logging level to DEBUG for, are

org.apache.flink.= streaming.api.operators.StreamTaskStateInitializerImpl
org.apache= .flink.streaming.api.operators.InternalTimerServiceSerializationProxy
=

Please run through the usual procedure of doing a savep= oint and provide the logs during recovery.

Th= ank you for your perseverance,

Konstantin
=



On Thu, Apr 18,= 2019 at 4:06 PM Oytun Tez <oytun@motaword.com> wrote:
Thanks for the update, Juho,= and please do keep updating :) I've been watching the thread silently,= I am sure your findings helps many others who watch the thread.
<= br>




---
Oytun Tez

M O T A W O R D
The World's Fastest Human Translation Platform.

<= /div>

On Thu, Apr 18, 2019 at 8:26 AM Juho Autio <juho.autio@rovio.com> wrote:
<= /div>
In the meanwhile, some additional results, continued with=C2= =A0ROCKSDB timer service:

= 20190416 (no cancellation),=C2=A0missing_rows.count(): 0
20190417 (cancel with savepoint & restore),=C2=A0missing_rows.= count(): 54

On Tue, Apr 16, 2019 at 2:35 PM Ju= ho Autio <juho= .autio@rovio.com> wrote:
Ouch, we have a d= ata loss case now also with ROCKSDB timer service factory. This time the jo= b had failed for some reason & restored checkpoint by itself (I mean I = didn=E2=80=99t cancel with savepoint this time. Previous restore from savep= oint was at 14-04-2019 06:21:45 UTC).

In this case= the number of lost ids was quite high:

20190415, = missing_rows.count():=C2=A0706605

I don't = know if the ROCKSDB timer service is a factor towards higher instability, b= ut indeed I'd like to go back to testing with InteralTimerServiceImpl a= s well. Will switch back to that when the updated branch is available. Also= I'm not sure if the cause of data loss is similar now with ROCKSDB tim= er service factory (lost timers or maybe something else), because we didn&#= 39;t have corresponding DEBUG logging for this implementation.

On Mon, Apr 1= 5, 2019 at 11:27 AM Konstantin Knauf <konstantin@ververica.com> wrote:
H= i Juho,

this is good news indeed! I have had = a look at the _metadata files and logs on Friday and it looks like a) the t= imer state is contained in the savepoint files and b) the timer state is al= so initially read by the Ta= skStateManagerImpl, but they it is somehow lost until the reach the = InteralTimerServiceImpl. I will provide updated version of my branch with more logging output t= o find the reason for this today or tomorrow. It would be great, if you cou= ld test this again then.

Best,

Konstantin

On Mon, Apr 15, 2019 at 9:49 AM Juho Au= tio <juho.auti= o@rovio.com> wrote:
Hi,

Great news:
There's no data loss (for th= e 3 days so far that were run) with state.backend.rocksdb.timer-service.fac= tory: ROCKSDB.

Each day the job was once = cancelled with savepoint & restored.

20190412,= missing_rows.count(): 0
20190413, missing_rows.count(): 0
<= div>20190414, missing_rows.count(): 0

Btw, now we don't get the DEBUG logs of org.apache.fl= ink.streaming.api.operators.InternalTimerServiceImpl any more, so I didn= 9;t know how to check from logs how many timers are restored. But based on = the results I'm assuming that all were successfully restored.

We'll keep testing this a bit more, but see= ms really promising indeed. I thought at least letting it run for some days= without cancellations and on the other hand cancelling many times within t= he same day etc.

Can I provide some additional deb= ug logs or such to help find the bug when 'heap' is used for timers= ? Did you already analyze the _metadata files that I sent?

On Thu, Apr 11, 2= 019 at 4:21 PM Juho Autio <juho.autio@rovio.com> wrote:
Shared _metadata files also, in private.
The job is now running with state.backend.rocksdb.timer-service= .factory: ROCKSDB. I started it from empty state because I wasn't sure = would this change be migrated automatically(?). I guess clean setup like th= is is a good idea any way. First day that is fully processed with this conf= will be tomorrow=3DFriday, and results can be compared on the next day.. I= 'll report back on that on Monday. I verified from Flink UI that the pr= operty is found in Configuration, but I still feel a bit unsure about if it= 's actually being used. I wonder if there's some INFO level logging= that could be checked to confirm that?

Thanks.


No, it also matters for savepo= ints. I think the doc here is misleading, it is currently synchronous for a= ll cases of RocksDB keyed state and heap timers.=C2=A0

B= est,
Stefan

On 11. Apr= 2019, at 14:30, Juho Autio <juho.autio@rovio.com> wrote:

Thanks Till. Any way, that's irrelevant= in case of a savepoint, right?

On Thu, Apr 11, 2019 at 2:54 PM Till Rohrman= n <trohrmann@a= pache.org> wrote:
Hi Juho,

yes, it means that = the snapshotting of the timer state does not happen asynchronously but sync= hronously within the Task executor thread. During this operation, your oper= ator won't make any progress, potentially causing backpressure for upst= ream operators.

If you want to use fully asynchron= ous snapshots while also using timer state, you should use the RocksDB back= ed timers.

Cheers,
Till

<= div class=3D"gmail_quote">
On Thu, Apr= 11, 2019 at 10:32 AM Juho Autio <juho.autio@rovio.com> wrote:
<= div dir=3D"ltr">
Ok, I= 9;m testing that state.backend.rocksdb.timer-service.factory: ROCKSDB in th= e meanwhile.

Btw, what doe= s this actually mean (from=C2=A0https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_sta= te_tuning.html):

> The combination RocksDB st= ate backend / with incremental checkpoint / with heap-based timers currentl= y does NOT support asynchronous snapshots for the timers state. Other state= like keyed state is still snapshotted asynchronously. Please note that thi= s is not a regression from previous versions and will be resolved with FLIN= K-10026.

Is it just that snapshots are not asy= nchronous, so they cause some pauses? Does "not supported" here m= ean just some performance impact, or also correctness?

=
Our job at hand is using RocksDB state backend and incremental checkpo= inting. However at least the restores that we've been testing here have= been from a savepoint, not an incremental checkpoint.

On Wed, Apr 10, 2019 at 4:46 PM Konstantin Knauf <konstantin@ververic= a.com> wrote:
Hi Juho,

one more = thing we could try in a separate experiment is to change the timer state ba= ckend to RocksDB as well by setting
state.backend.rocksdb.= timer-service.factory: ROCKSDB
in the flink-c= onf.yaml and see if this also leads to the loss of records. That would narr= ow it down quite a bit.

Best,
<= br>
Konstantin



On We= d, Apr 10, 2019 at 1:02 PM Konstantin Knauf <konstantin@ververica.com> wrote:<= br>
Hi Juho,

<= div>sorry for the late reply. Please continue to use the custom Flink buil= d and add additional logging for TaskStateManagerImpl by adding the followi= ng line to your log4j configuration.
log4j.logger.org.apache.flink.runtim=
e.state.TaskStateManagerImpl=3DDEBUG
Afterwards, do a couple of savepoint & restore until you= see a number of restores < 80 as before and share the logs with me (at = least for TaskStateMangerImpl & InternalTimerServiceImpl).

Best,

Konstantin
<= div>

On Thu, Apr 4, 2019 at 9:03 AM Juho Autio <juho.autio@rovio.com> w= rote:
Hi Konstantin,

Thanks for the follow-up.
=C2=A0
There are only 76 lines for restore = in Job 3 instead of 80. It would be very useful to know, if these lines wer= e lost by the log aggregation or really did not exist.=C2=A0

I fetched the actual taskmanager.log files to verify (we = store the original files on s3). Then did grep for "InternalTimerServi= ceImpl=C2=A0 - Restored".

This is for "j= ob 1. (start - end) first restore with debug logging":
A= round 2019-03-26 09:08:43,352 - 78 hits

This i= s for "job 3. (start-middle) 3rd restore with debug logging (following= day)":
Around=C2=A02019-03-27 07:39:06,414 - 76 hits

So yeah, we can rely on our log delivery to Kib= ana.

Note that as a new piece of information I found t= hat the same job also did an automatic restore from checkpoint around 2019-= 03-30 20:36 and there were 79 hits instead of 80. So it doesn't seem to= be only a problem in case of savepoints, can happen with a checkpoint rest= ore as well.

Were there any missing records in the output for the day of the Job= 1 -> Job 2 transition (26th of March)?

= 20190326: missing 2592
20190327: missing 4270
<= br>
This even matches with the fact that on 26th 2 timers were mi= ssed in restore but on 27th it was 4.

What's n= ext? :)

On= Thu, Apr 4, 2019 at 12:32 AM Konstantin Knauf <konstantin@ververica.com> wrot= e:
Hi Juho,=C2=A0

one thing that makes the log outp= ut a little bit hard to analyze is the fact, that the "Snapshot" = lines include Savepoints as well as Checkpoints. To identify the savepoints= , I looked at the last 80 lines per job, which seems plausible given the ti= mestamps of the lines.

So, let's compare the n= umber of timers before and after restore:

Job= 1 -> Job 2

2= 3.091.002 event time timers for both. All timers for the same window. So th= is looks good.

Job 2 -> Job 3
=

18.565.234 timers during snapshotting. All timers for t= he same window.
17.636.774 timers during restore. All timers for the s= ame window.

There are only 76 lin= es for restore in Job 3 instead of 80. It would be very useful to know, if = these lines were lost by the log aggregation or really did not exist.
<= /div>

Were there any missing records i= n the output for the day of the Job 1 -> Job 2 transition (26th of March= )?

Best,

Konstantin<= br>

=



On Fri, Mar = 29, 2019 at 2:21 PM Juho Autio <juho.autio@rovio.com> wrote:
Thanks,

I crea= ted a zip with these files:

job 1. (start - end) f= irst restore with debug logging
job 2. (start-middle) second rest= ore with debug logging (same day)
job 2. (middle - end) before sa= vepoint & cancel (following day)
job 3. (start-middle) 3rd re= store with debug logging (following day)

It ca= n be downloaded here:

On Thu, Mar 28, 2019 at 7:08 PM Konstantin Knauf <konstantin@ververica.com> = wrote:
Hi Juho,

Yes, the number is the= last number in the line. Feel free to share all lines.=C2=A0
Best,

Konstantin
<= br>
On Thu,= Mar 28, 2019 at 5:00 PM Juho Autio <juho.autio@rovio.com> wrote:
Hi Konstantin!

I would be interested in any changes in the number of time= rs, not only the number of logged messages.

Sorr= y for the delay. I see, the count is the number of timers that last number = on log line. For example for this row it's 270409:

<= blockquote class=3D"gmail_quote" style=3D"margin:0px 0px 0px 0.8ex;border-l= eft:1px solid rgb(204,204,204);padding-left:1ex">March 26th 2019, 11:08:39.= 822 DEBUG org.apache.flink.streaming.api.operators.InternalTim= erServiceImpl Restored: TimeWin= dow{start=3D1553558400000, end=3D1553644800000} -> 270409

The log lines don't contain task id =E2=80=93 h= ow should they be compared across different snapshots? Or should I share al= l of these logs (at least couple of snapshots around the point of restore) = and you'll compare them?

Thanks.

On Tue, Mar 26, 2019 at 9:55 PM Konstantin Knauf <konstantin@ververic= a.com> wrote:
Hi Juho,

I based t= he branch on top of the current 1.6.4 branch. I can rebase on 1.6.2 for any= future iterations. I would be interested in any changes in the number of t= imers, not only the number of logged messages. The sum of all counts should= be the same during snapshotting and restore. While a window is open, this = number should always increase (when comparing multiple snapshots).

Best,

Konstantin






On Tue, Mar 26, 2019 at 11:01 AM Juho Autio <juho.autio@rovio.com> wrote:
<= div dir=3D"ltr">
Hi Konstantin,

I got that debug logging working.

You would now need to take a savepoint= and restore sometime in the middle of the day and should be able to check<= br>a) if there are any timers for the very old windows, for which there is = still some content lingering around

No = timers for old windows were logged.

All timers are= for the same time window, for example:

March 26th 2019, 11:08:39.822 DEBUG org.apache.flink.streaming.api.operators.InternalTimerService= Impl Restored: TimeWindow{start= =3D1553558400000, end=3D1553644800000} -> 270409
=
Those milliseconds correspond to:
Tue Mar 26 00:00= :00 UTC 2019 =E2=80=93=C2=A0Wed Mar 27 00:00:00 UTC 2019.
- S= o this seems normal
=C2=A0
b) if there less timers after restore for the current wind= ow. The missing timers would be recreated, as soon as any additional record= s for the same key arrive within the window. This means the number of missi= ng records might be less then the number of missing timers.

Grepping for "Restored" gives 78 hits= . That's suspicious because this job's parallelism is 80. The follo= wing group for grep "Snapshot" already gives 80 hits. Ok actually= that would match with what you wrote: "missing timers would be recrea= ted, as soon as any additional records for the same key arrive within the w= indow".

I tried killing & restoring= once more. This time grepping for "Restored" gives 80 hits. Note= that it's possible that some logs had been lost around the time of res= toration because I'm browsing the logs through Kibana (ELK stack).

I will try kill & restore again tomorrow around no= on & collect the same info. Is there anything else that you'd like = me to share?

By the way, it seems that your branch= * is not based on 1.6.2 release, why so? It probably doesn't matter, bu= t in general would be good to minimize the scope of changes. But let's = roll with this for now, I don't want to build another package because i= t seems like we're able to replicate the issue with this version :)

Thanks,
Juho

On Wed, Mar 20, 2019 at = 2:20 PM Konstantin Knauf <konstantin@ververica.com> wrote:
<= div dir=3D"ltr">
Hi Juho,=C2=A0

I created a bran= ch [1] which logs the number of event time timers per namespace during snap= shot and restore.=C2=A0 Please refer to [2] to build Flink from sources.

You need to set the logging level to DEBUG for org.a= pache.flink.streaming.api.operators.InternalTimerServiceImpl. If you use lo= g4j this is a one-liner in your log4j.properties:
log4j.logger.org.apache.flink.st= reaming.api.operators.InternalTimerServiceImpl=3DDE= BUG
The only additional logs will be the lines added = in the branch. The lines are of the following format (<Window> -> = <Number of Timers>), e.g.

DEBUG = org.apache.flink.streaming.api.operators.InternalTimerServiceImpl=C2=A0 - S= napshot: TimeWindow{start=3D1553083589256, end=3D1553083589258} -> 1
= DEBUG org.apache.flink.streaming.api.operators.InternalTimerServiceImpl=C2= =A0 - Snapshot: TimeWindow{start=3D1553083589256, end=3D1553083589258} ->= ; 2
DEBUG org.apache.flink.streaming.api.operators.InternalTimerServiceI= mpl=C2=A0 - Snapshot: TimeWindow{start=3D1553083589456, end=3D1553083589458= } -> 2
DEBUG org.apache.flink.streaming.api.operators.InternalTimerSe= rviceImpl=C2=A0 - Snapshot: TimeWindow{start=3D1553083589356, end=3D1553083= 589358} -> 2
DEBUG org.apache.flink.streaming.api.operators.InternalT= imerServiceImpl=C2=A0 - Snapshot: TimeWindow{start=3D1553083589482, end=3D1= 553083589484} -> 2
DEBUG org.apache.flink.streaming.api.operators.Int= ernalTimerServiceImpl=C2=A0 - Snapshot: TimeWindow{start=3D1553083589456, e= nd=3D1553083589458} -> 1
DEBUG org.apache.flink.streaming.api.operato= rs.InternalTimerServiceImpl=C2=A0 - Snapshot: TimeWindow{start=3D1553083589= 256, end=3D1553083589258} -> 2
DEBUG org.apache.flink.streaming.api.o= perators.InternalTimerServiceImpl=C2=A0 - Snapshot: TimeWindow{start=3D1553= 083589356, end=3D1553083589358} -> 1
DEBUG org.apache.flink.streaming= .api.operators.InternalTimerServiceImpl=C2=A0 - Snapshot: TimeWindow{start= =3D1553083589456, end=3D1553083589458} -> 2
DEBUG org.apache.flink.st= reaming.api.operators.InternalTimerServiceImpl=C2=A0 - Snapshot: TimeWindow= {start=3D1553083589482, end=3D1553083589484} -> 1
DEBUG org.apache.fl= ink.streaming.api.operators.InternalTimerServiceImpl=C2=A0 - Snapshot: Time= Window{start=3D1553083589356, end=3D1553083589358} -> 2
DEBUG org.apa= che.flink.streaming.api.operators.InternalTimerServiceImpl=C2=A0 - Snapshot= : TimeWindow{start=3D1553083589482, end=3D1553083589484} -> 2
DEBUG o= rg.apache.flink.streaming.api.operators.InternalTimerServiceImpl=C2=A0 - Re= stored: TimeWindow{start=3D1553083589256, end=3D1553083589258} -> 1
D= EBUG org.apache.flink.streaming.api.operators.InternalTimerServiceImpl=C2= =A0 - Restored: TimeWindow{start=3D1553083589456, end=3D1553083589458} ->= ; 1
DEBUG org.apache.flink.streaming.api.operators.InternalTimerServiceI= mpl=C2=A0 - Restored: TimeWindow{start=3D1553083589356, end=3D1553083589358= } -> 1
DEBUG org.apache.flink.streaming.api.operators.InternalTimerSe= rviceImpl=C2=A0 - Restored: TimeWindow{start=3D1553083589482, end=3D1553083= 589484} -> 1
DEBUG org.apache.flink.streaming.api.operators.InternalT= imerServiceImpl=C2=A0 - Restored: TimeWindow{start=3D1553083589256, end=3D1= 553083589258} -> 2
DEBUG org.apache.flink.streaming.api.operators.Int= ernalTimerServiceImpl=C2=A0 - Restored: TimeWindow{start=3D1553083589456, e= nd=3D1553083589458} -> 2
DEBUG org.apache.flink.streaming.api.operato= rs.InternalTimerServiceImpl=C2=A0 - Restored: TimeWindow{start=3D1553083589= 356, end=3D1553083589358} -> 2
DEBUG org.apache.flink.streaming.api.o= perators.InternalTimerServiceImpl=C2=A0 - Restored: TimeWindow{start=3D1553= 083589482, end=3D1553083589484} -> 2
DEBUG org.apache.flink.streaming= .api.operators.InternalTimerServiceImpl=C2=A0 - Restored: TimeWindow{start= =3D1553083589256, end=3D1553083589258} -> 2
DEBUG org.apache.flink.st= reaming.api.operators.InternalTimerServiceImpl=C2=A0 - Restored: TimeWindow= {start=3D1553083589456, end=3D1553083589458} -> 2
DEBUG org.apache.fl= ink.streaming.api.operators.InternalTimerServiceImpl=C2=A0 - Restored: Time= Window{start=3D1553083589356, end=3D1553083589358} -> 2
DEBUG org.apa= che.flink.streaming.api.operators.InternalTimerServiceImpl=C2=A0 - Restored= : TimeWindow{start=3D1553083589482, end=3D1553083589484} -> 2

=

Yo= u would now need to take a savepoint and restore sometime in the middle of = the day and should be able to check

a) if there ar= e any timers for the very old windows, for which there is still some conten= t lingering around
b) if there less timers after restore for the = current window. The missing timers would be recreated, as soon as any addit= ional records for the same key arrive within the window. This means the num= ber of missing records might be less then the number of missing timers.

Looking forward to the results!

On Tue, Mar 19, 2019= at 2:06 PM Juho Autio <juho.autio@rovio.com> wrote:
Thanks, answers below.

* Which Flink versio= n do you need this for?

1.6.2

=
* You use RocksDBStatebackend, correct? If so, which value do your set= for "state.backend.rocksdb.timer-service.factory" in the flink-c= onf.yaml.

Yes, RocksDBStatebackend. We don't s= et=C2=A0state.backend.rocksdb.timer-service.factory at all, so whatever is = the default in Flink 1.6.2? Based on the docs it seems that it would be &qu= ot;heap"

On Mon, Mar 18, 201= 9 at 6:26 PM Konstantin Knauf <konstantin@ververica.com> wrote:
Hi Juho,=

I will prepare a Flink branch for you, which= logs the number of event time timers per window before snapshot and after = restore. With this we should be able to check, if timers are lost during sa= vepoints.

Two questions:

<= div>* Which Flink version do you need this for? 1.6?
* You use Ro= cksDBStatebackend, correct? If so, which value do your set for "state.= backend.rocksdb.timer-service.factory" in the flink-conf.yaml.

Cheers,

Konstantin



On Thu, Mar 14, 2019 at 12:20 PM Juho Autio = <juho.autio@ro= vio.com> wrote:
Hi Konstantin,

=
Reading timers from snapshot doesn't seem straightforward. I= wrote in private with Gyula, he gave more suggestions (thanks!) but still = it seems that it may be a rather big effort for me to figure it out. Would = you be able to help with that? If yes, there's this existing unit test = that can be extended to test reading timers: https://github.com/king/bravo/blob/m= aster/bravo/src/test/java/com/king/bravo/ReducerStateReadingTest.java#L37-L= 38 . The test already has a state with some values in reducer window st= ate, so I'm assuming that it must also contain some window timers.

This is what Gyula wrote to me:

Maybe I was wrong when I sai= d the=C2=A0createOperatorStateBackendsFromSnapshot is the way to do it.

On a second thought Timers are probably stored a= s raw keyed state in the operator. I don=E2=80=99t remember building any ut= ility to read that.

=C2=A0=

At the moment I am quite busy with other work so wont= have time to build it for you, so you might have to figure it out yourself= .

I would try to look at how keyed state= s are read:

=C2=A0<= /p>

Look at the implementation of: createOperatorStateBackendsFro= mSnapshot()

Instead of getManagedOperato= rState you want to try getRawKeyedState and also look at how Flink restores= it internally for Timers

I would start = looking around here I guess:=C2=A0https://= github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/= apache/flink/streaming/api/operators/AbstractStreamOperator.java#L238

=C2=A0

https://github.com/apache/flink/blob/e8= daa49a593edc401cd44761b25b1324b11be4a6/flink-streaming-java/src/main/java/o= rg/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java= #L199


On Tue, Mar 12, 2019 at 5:41 PM = Gyula F=C3=B3ra <gyula.fora@gmail.com> wrote:
Should be possible= to read timer states by:=C2=A0
OperatorStateReader#c= reateOperatorStateBackendFromSnapshot

= Then you have to get the timer state out of the OperatorStateBackend, but k= eep in mind that this will restore the operator states in memory.

Gyula


On Tue, Mar 12, 2019= at 4:29 PM Konstantin Knauf <konstantin@ververica.com> wrote:
Hi Juho, =

okay, so it seems that although the watermark= passed the endtime of the event time windows,=C2=A0 the window was not tri= ggered for some of the keys.

The timers, whic= h would trigger the firing of the window, are also part of the keyed state = and are snapshotted/restored. I would like to check if timers (as opposed t= o the window content itself) are maybe lost during the savepoint & rest= ore procedure. Using Bravo, are you also able to inspect the timer state of= the savepoints? In particular, I would be interested if for two subsequent= savepoints all timers (i.e. one timer per window and key including the mi= ssing keys) are present in the savepoint.

@Gyula F=C3=B3ra: Does Bravo support reading timer state as well?

Cheers,

Konstantin


On Thu, Mar 7, 2019 at 6:41 PM Juho Autio <juho.autio@rovio.com> w= rote:
Right, the= window operator is the one by name "DistinctFunction".

http http://10.1.59.75:20888/proxy/application_1= 551956351667_0001/jobs/3e4ffaadbd84af3488286863f00d4f23/vertices/19ede2f818= 524a7f310857e537fa6808/metrics\?get\=3D0.currentInputWatermark,1.currentInp= utWatermark,2.currentInputWatermark,3.currentInputWatermark,4.currentInputW= atermark,5.currentInputWatermark,6.currentInputWatermark,7.currentInputWate= rmark,8.currentInputWatermark,9.currentInputWatermark,10.currentInputWaterm= ark,11.currentInputWatermark,12.currentInputWatermark,13.currentInputWaterm= ark,14.currentInputWatermark,15.currentInputWatermark,16.currentInputWaterm= ark,17.currentInputWatermark,18.currentInputWatermark,19.currentInputWaterm= ark,20.currentInputWatermark,21.currentInputWatermark,22.currentInputWaterm= ark,23.currentInputWatermark,24.currentInputWatermark,25.currentInputWaterm= ark,26.currentInputWatermark,27.currentInputWatermark,28.currentInputWaterm= ark,29.currentInputWatermark,30.currentInputWatermark,31.currentInputWaterm= ark,32.currentInputWatermark,33.currentInputWatermark,34.currentInputWaterm= ark,35.currentInputWatermark,36.currentInputWatermark,37.currentInputWaterm= ark,38.currentInputWatermark,39.currentInputWatermark,40.currentInputWaterm= ark,41.currentInputWatermark,42.currentInputWatermark,43.currentInputWaterm= ark,44.currentInputWatermark,45.currentInputWatermark,46.currentInputWaterm= ark,47.currentInputWatermark,48.currentInputWatermark,49.currentInputWaterm= ark,50.currentInputWatermark,51.currentInputWatermark,52.currentInputWaterm= ark,53.currentInputWatermark,54.currentInputWatermark,55.currentInputWaterm= ark,56.currentInputWatermark,57.currentInputWatermark,58.currentInputWaterm= ark,59.currentInputWatermark,60.currentInputWatermark,61.currentInputWaterm= ark,62.currentInputWatermark,63.currentInputWatermark,64.currentInputWaterm= ark,65.currentInputWatermark,66.currentInputWatermark,67.currentInputWaterm= ark,68.currentInputWatermark,69.currentInputWatermark,70.currentInputWaterm= ark,71.currentInputWatermark,72.currentInputWatermark,73.currentInputWaterm= ark,74.currentInputWatermark,75.currentInputWatermark,76.currentInputWaterm= ark,77.currentInputWatermark,78.currentInputWatermark,79.currentInputWaterm= ark | jq '.[].value' --raw-output | uniq -c
=C2=A0 80= 1551980102743

date -r "$((1551980102743/1000))"
Thu Mar=C2=A0 7 19:35:02 EET 2019

= To me that makes sense =E2=80=93 how would the window be triggered at all, = if not all sub-tasks have a high enough watermark, so that the operator lev= el watermark can be advanced.

On Thu, Mar 7, 2019 at 5:33 P= M Konstantin Knauf <konstantin@ververica.com> wrote:
Hi Juho,

great, we are getting closer :)=C2=A0 Could you pleas= e check the "Watermarks" tab the Flink UI of this job and check i= f the current watermark for all parallel subtasks of the WindowOperator is = close to the current date/time?

Best,

Konstantin


On Thu, Mar 7, 201= 9 at 3:01 PM Juho Autio <juho.autio@rovio.com> wrote:
Wow, indeed the missing data from previous d= ate is still found in the savepoint!

Actually what= I now found is that there is still data from even older dates in the state= :

%%spark
state_json_next_day.group= By(state_json_next_day.ts.substr(1, 10).alias('day')).count().order= By('day').show(n=3D1000)

+-----= -----+--------+
|=C2=A0 =C2=A0 =C2=A0 =C2=A0day|=C2=A0 =C2=A0coun= t|
+----------+--------+
|2018-08-22|=C2=A0 =C2=A0 4206= |
..
(manually truncated)
..
|2019-= 02-03|=C2=A0 =C2=A0 =C2=A0 =C2=A04|
|2019-02-14|=C2=A0 =C2=A0= 12881|
|2019-02-15|=C2=A0 =C2=A0 1393|
|2019-02-25|=C2= =A0 =C2=A0 8774|
|2019-03-06|=C2=A0 =C2=A0 9293|
|2019-= 03-07|28113105|
+----------+--------+

<= div>Of course that's the expected situation after we have learned that = some window contents are left untriggered.

I don&#= 39;t have the logs any more, but I think on=C2=A02018-08-22 I have reset th= e state, and since then it's been always kept/restored from savepoint. = I can also see some dates there on which I didn't cancel the stream. Bu= t I can't be sure if it has gone through some automatic restart by flin= k. So we can't rule out that some window contents wouldn't sometime= s also be missed during normal operation. However, savepoint restoration at= least makes the problem more prominent. I have previously mentioned that I= would suspect this to be some kind of race condition that is affected by l= oad on the cluster. Reason for my suspicion is that during savepoint restor= ation the cluster is also catching up kafka offsets on full speed, so it is= considerably more loaded than usually. Otherwise this problem might not ha= ve much to do with savepoints of course.

Are you a= ble to investigate the problem in Flink code based on this information?

Many thanks,
Juho

On Wed, Mar 6, = 2019 at 1:41 PM Juho Autio <juho.autio@rovio.com> wrote:
Th= anks for the investigation & summary.

As you s= uggested, I will next take savepoints on two subsequent days & check th= e reducer state for both days.

On Wed, Mar 6, 2019 at 1:18 PM Kon= stantin Knauf <konstantin@ververica.com> wrote:
(Moving the discussion b= ack to the ML)

Hi Juho,

after looking into your code, we are still pretty much in the dark = with respect what is going wrong.

Let me try = to summarize, what we know given your experiments so far:
1) the lost records were processed and put into state *before*= the restart of the job, not afterwards
2) the lost records a= re part of the state after the restore (because they are contained in subse= quent savepoints)
3) the sinks are not the problem (because the metrics of the=20 WindowOperator showed that the missing records have not been sent to the sinks)
4) it is not the batch job used for reference, which is w= rong, because of 1)
5) records are only lost when restarting = from a savepoint (not during normal operations)

On= e explanation would be, that one of the WindowOperators did not fire (for w= hatever reason) and the missing records are still in the window's state= when you run your test. Could you please check, whether this is the case b= y taking a savepoint on the next day and check if the missing records are c= ontained in it.

Best,

Konstantin

On Mon, Feb 18, 2019 at 8:32 PM Juho Autio <juho.autio@rovio.co= m> wrote:
Hi Konstantin, thanks.

I gathered the add= itional info as discussed. No surprises there.

* do you know if all lost records are contained in the last savepoint you= took before the window fired? This would mean that no records are lost aft= er the last restore.

Ind= eed this is the case. I saved the list of all missing IDs, analyzed the sav= epoint with Bravo, and the savepoint state (already) contained all IDs that= were eventually missed in output.

* could yo= u please check the numRecordsOut metric for the WindowOperator (FlinkUI -&g= t; TaskMetrics -> Select TaskChain containing WindowOperator -> find = metric)? Is the count reported there correct (no missing data)?

The number matches with output rows. The sum of= =C2=A0numRecordsOut metrics was 45755630, and count(*) of the output on s3 = resulted in the same number. Batch output has a bit more IDs of course (thi= s time it was 1194). You wrote "Is the count reported there correct (n= o missing data)?" but I have slightly different viewpoint; I agree tha= t the reported count is correct (in flink's scope, because the number i= s the same as what's in output file). But I think "no missing data= " doesn't belong here. Data is missing, but it's consistently = missing from both output files and numRecordsOut=C2=A0metrics.

Next thing I'll work on is preparing the co= de to be shared..


Btw, I used this = script to count the sum of numRecordsOut (I'm going to look into enabli= ng Sl4jReporter eventually) :


| jq '.vertices[] | sele= ct(.name =3D=3D "DistinctFunction") | .id' --raw-output`
echo "DistinctFunctionID=3D$DistinctFunctionID"
<= br>
http $JOB_URL/vertices/19ede2f818524a7f310857e537fa6808/metri= cs | jq '.[] | .id' --raw-output | grep "[0-9][0-9]*\\.numReco= rdsOut$" \
| xa= rgs -I@ sh -c "http GET $JOB_URL/vertices/19ede2f818524a7f310857e537fa= 6808/metrics?get=3D@ | jq '.[0].value' --raw-output" > numR= ecordsOut.txt

# " eval_math( '+'.join= ( file.readlines ) ) "
paste -sd+ numRecordsOut.txt | bc

Hi Juho, <= br>


* does the output o= f the streaming job contain any data, which is not contained in the batch= =C2=A0

No.

<= blockquote class=3D"gmail_quote" style=3D"margin:0px 0px 0px 0.8ex;border-l= eft:1px solid rgb(204,204,204);padding-left:1ex">* do you know if all lost = records are contained in the last savepoint you took before the window fire= d? This would mean that no records are lost after the last restore.

I haven't built the tooling required to = check all IDs like that, but yes, that's my understanding currently. To= check that I would need to:
- kill the stream only once on a giv= en day (so that there's only one savepoint creation & restore)
- next day or later: save all missing ids from batch output compariso= n
- next day or later:=C2=A0read the savepoint with bravo & c= heck that it contains all of those missing IDs

How= ever I haven't built the tooling for that yet. Do you think it's ne= cessary to verify that this assumption holds?

It would be another data point and might help us= to track down the problem. Wether it is worth doing it, depends on the res= ult, i.e. wether the current assumption would be falsified or not, but we o= nly know that in retrospect ;)
=C2=A0
* could you= please check the numRecordsOut metric for the WindowOperator (FlinkUI ->= ; TaskMetrics -> Select TaskChain containing WindowOperator -> find m= etric)? Is the count reported there correct (no missing data)?
=

Is that metric the result of window trigger? If y= es, you must mean that I check the value of that metric on the next day aft= er restore, so that it only contains the count for the output of previous d= ay's window? The counter is reset to 0 when job starts (even when state= is restored), right?

Yes, this metric would be incremented when the window is triggered. Yes,= please check this metric after the window, during which the restore happen= ed, is fired.

If you don't have a Metrics= Reporter configured so far, I recommend to quickly register a Sl4jReporter = to log out all metrics every X seconds (maybe even minutes for your use cas= e): https://ci.apache.org/projects/flink/flink-docs-release-1.7/m= onitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter. T= hen you don't need to go trough the WebUI and can keep a history of the= metrics.
=C2=A0
Otherwis= e, do you have any suggestions for how to instrument the code to narrow dow= n further where the data gets lost? To me it would make sense to proceed wi= th this, because the problem seems hard to reproduce outside of our environ= ment.
=
Let's focus on checking this metri= c above, to make sure that the WindowOperator is actually emitting less rec= ords than the overall number of keys in the state as your experiments sugge= st, and on sharing the code.
=C2= =A0
On Thu, Feb 14, 2019 at 10:57 AM Konstantin Knauf = <konstanti= n@ververica.com> wrote:
Hi Juho,

you are right the problem has actually been narrowed down quite a bit over= time. Nevertheless, sharing the code (incl. flink-conf.yaml) might be a go= od idea. Maybe something strikes the eye, that we have not thought about so= far. If you don't feel comfortable sharing the code on the ML, feel fr= ee to send me a PM.=C2=A0

Besides that, three more= questions:

* does the output of the streamin= g job contain any data, which is not contained in the batch output?
* do you know if all lost records are contained in the last savepoint yo= u took before the window fired? This would mean that no records are lost af= ter the last restore.
* could you please check the numRecordsOut = metric for the WindowOperator (FlinkUI -> TaskMetrics -> Select TaskC= hain containing WindowOperator -> find metric)? Is the count reported th= ere correct (no missing data)?

Cheers,
<= div>
Konstantin




On= Wed, Feb 13, 2019 at 3:19 PM Gyula F=C3=B3ra <gyula.fora@gmail.com> wrote:
Sorry not posting on the mail list was my mistake :/


On Wed, 13 Feb 2019 at 15:01, Juho Autio <juho.autio@rovio.com>= wrote:
Thanks for step= ping in, did you post outside of the mailing list on purpose btw?

<= /div>
This I did long time ago:

To rule out for good any questions abou= t sink behaviour, the job was killed and started with an additional Kafka s= ink.
The same number of ids were missed in both outputs: KafkaSink &= BucketingSink.

(I wrote about that On Oct = 1, 2018 in this email thread)

After that I did the= savepoint analysis with Bravo.

Currently I'm = indeed trying to get suggestions how to debug further, for example, where t= o add additional kafka output, to catch where the data gets lost. That woul= d probably be somewhere in Flink's internals.

I could try to share the full code also, but IMHO the problem has bee= n quite well narrowed down, considering that data can be found in savepoint= , savepoint is successfully restored, and after restoring the data doesn= 9;t go to "user code" (like the reducer) any more.

On Wed, Feb 13, 2019 at 3:47 PM Gyula F=C3=B3ra &= lt;gyula.fora@gma= il.com> wrote:
Hi=C2=A0Juho!
I think the reason you are not get= ting much answers here is because it is very hard to debug this problem rem= otely.=C2=A0
Seemingly you do very normal operations, the state c= ontains all the required data and nobody else has hit a similar problem for= ages.

My best guess would be some bug with the de= duplication or output writing logic but without a complete code example its= very hard to say anything useful.
Did you try writing it to Kafk= a to see if the output is there? (that way we could rule out the dedup prob= llem)

Cheers,
Gyula

On Wed, Feb 13,= 2019 at 2:37 PM Juho Autio <juho.autio@rovio.com> wrote:
S= tefan (or anyone!), please, could I have some feedback on the findings that= I reported on Dec 21, 2018? This is still a major blocker..

On Thu, Jan 31,= 2019 at 11:46 AM Juho Autio <juho.autio@rovio.com> wrote:
Hello, is there a= nyone that could help with this?

On Fri, Jan 11, 2019 at 8:14 AM Juho Autio <= juho.autio@rovio.= com> wrote:
Stefan, would you have time to comment?

On Wednesday, January 2, = 2019, Juho Autio <juho.autio@rovio.com> wrote:
Bump =E2=80=93 does anyone know if = Stefan will be available to comment the latest findings? Thanks.

<= div class=3D"gmail_quote">
On Fri, Dec 21, 2018 at 2:33 PM = Juho Autio <ju= ho.autio@rovio.com> wrote:
Stefan, I managed to analyze savepoint with bra= vo. It seems that the data that's missing from output is found i= n savepoint.

I simplified = my test case to the following:

- job 1 has bee run= ning for ~10 days
- savepoint X created & job 1 cancelled
- job 2 started with restore from savepoint X

Then I waited until the next day so that job 2 has triggered the 24 hour= window.

Then I analyzed the output & savepoin= t:

- compare job 2 output with the output of a bat= ch pyspark script =3D> find 4223 missing rows
- pick one of th= e missing rows (say, id Z)
- read savepoint X with bravo, filter = for id Z =3D> Z was found in the savepoint!

How can it be possible that the value is in state but doesn't end up = in output after state has been restored & window is eventually triggere= d?

I also did similar analysis on the previous case whe= re I savepointed & restored the job multiple times (5) within the same = 24-hour window. A missing id that I drilled down to, was found in all of th= ose savepoints, yet missing from the output that gets written at the end of= the day. This is even more surprising: that the missing ID was written to = the new savepoints also after restoring. Is the reducer state somehow decou= pled from the window contents?

Big thanks to bravo= -developer Gyula for guiding me through to be able read the reducer state! = https:/= /github.com/king/bravo/pull/11

Gyula also had = an idea for how to troubleshoot the missing data in a scalable way: I could= add some "side effect kafka output" on individual operators. Thi= s should allow tracking more closely at which point the data gets lost. How= ever, maybe this would have to be in some Flink's internal components, = and I'm not sure which those would be.

Cheers,=
Juho

=
On Mon, Nov 19, 2018 at 11:52 AM Juho Autio <juho.autio@rovio.com>= ; wrote:

Hi Stefan,
=
Bravo doesn't currently support reading a re= ducer state. I gave it a try but couldn't get to a working implementati= on yet. If anyone can provide some insight on how to make this work, please= share at github:

Thanks.

On Tue, Oct 23, 2018 at 3:32 PM Juho Autio <<= a href=3D"mailto:juho.autio@rovio.com" target=3D"_blank">juho.autio@rovio.c= om> wrote:
I was glad to find that bravo= had now been updated to support installing bravo to a local maven repo.
I was able to load a checkpoint created by my job, thanks = to the example provided in bravo README, but I'm still missing the esse= ntial piece.

My code was:

=
=C2=A0 =C2=A0 =C2=A0 =C2=A0 OperatorStateReader reader =3D new Op= eratorStateReader(env2, savepoint, "DistinctFunction");
=C2=A0 =C2=A0 =C2=A0 =C2=A0 DontKnowWhatTypeThisIs reducingStat= e =3D reader.readKeyedStates(what should I put here?);

=
I don't know how to read the values collected from reduce() = calls in the state. Is there a way to access the reducing state of the wind= ow with bravo? I'm a bit confused how this works, because when I check = with debugger, flink internally uses a=C2=A0ReducingStateDescriptor with=C2= =A0name=3Dwindow-contents, but still reading operator state for "Disti= nctFunction" didn't at least throw an exception ("window-cont= ents" threw =E2=80=93 obviously there's no operator by that name).=

Cheers,
Juho

On Mon, = Oct 15, 2018 at 2:25 PM Juho Autio <juho.autio@rovio.com> wrote:
Hi Stefan,

Sorry but it doesn't seem immediately clear to me what's a g= ood way to use = https://github.com/king/bravo.

How are people = using it? Would you for example modify build.gradle somehow to publish the = bravo as a library locally/internally? Or add code directly in the bravo pr= oject (locally) and run it from there (using an IDE, for example)? Also it = doesn't seem like the bravo gradle project supports building a flink jo= b jar, but if it does, how do I do it?

Thanks.

On Thu, Oct 4, 2018 = at 9:30 PM Juho Autio <juho.autio@rovio.com> wrote:
Good then, I'll try to analyze the savepoints with Bravo. Thanks!<= /div>

> How = would you assume that backpressure would influence your updates? Updates to= each local state still happen event-by-event, in a single reader/writing t= hread.

Sure, just an ignorant guess by= me. I'm not familiar with most of Flink's internals. Any way high = backpressure is not a seen on this job after it has caught up the lag, so a= t I thought it would be worth mentioning.

On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <s.richter@data-= artisans.com> wrote:
Hi,

Am 04.10.20= 18 um 16:08 schrieb Juho Autio <juho.autio@rovio.com>:

>=C2=A0you could take a look at Bravo [1] to q= uery your savepoints and to check if the state in the savepoint complete w.= r.t your expectations

Thanks. I'm = not 100% if this is the case, but to me it seemed like the missed ids were = being logged by the reducer soon after the job had started (after restoring= a savepoint). But on the other hand, after that I also made another savepo= int & restored that, so what I could check is: does that next savepoint= have the missed ids that were logged (a couple of minutes before the savep= oint was created, so there should've been more than enough time to add = them to the state before the savepoint was triggered) or not. Any way, if I= would be able to verify with Bravo that the ids are missing from the savep= oint (even though reduced logged that it saw them), would that help in figu= ring out where they are lost? Is there some major difference compared to ju= st looking at the final output after window has been triggered?
=


I think that makes a difference. For examp= le, you can investigate if there is a state loss or a problem with the wind= owing. In the savepoint you could see which keys exists and to which window= s they are assigned. Also just to make sure there is no misunderstanding: o= nly elements that are in the state at the start of a savepoint are expected= to be part of the savepoint; all elements between start and completion of = the savepoint are not expected to be part of the savepoint.


>=C2=A0I also doubt that the problem is about backpressure= after restore, because the job will only continue running after the state = restore is already completed.

Yes, I&#= 39;m not suspecting that the state restoring would be the problem either. M= y concern was about backpressure possibly messing with the updates of reduc= ing state? I would tend to suspect that updating the state consistently is = what fails, where heavy load / backpressure might be a factor.
<= /div>


How would you assume that backpressure woul= d influence your updates? Updates to each local state still happen event-by= -event, in a single reader/writing thread.


On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter = <s.rich= ter@data-artisans.com> wrote:
Hi,

you could take a look at B= ravo [1] to query your savepoints and to check if the state in the savepoin= t complete w.r.t your expectations. I somewhat doubt that there is a genera= l problem with the state/savepoints because many users are successfully run= ning it on a large state and I am not aware of any data loss problems, but = nothing is impossible. What the savepoint does is also straight forward: it= erate a db snapshot and write all key/value pairs to disk, so all data that= was in the db at the time of the savepoint, should show up. I also doubt t= hat the problem is about backpressure after restore, because the job will o= nly continue running after the state restore is already completed. Did you = check if you are using exactly-one-semantics or at-least-once semantics? Al= so did you check that the kafka consumer start position is configured prope= rly [2]? Are watermarks generated as expected after restore?

=
One more unrelated high-level comment that I have: for a granula= rity of 24h windows, I wonder if it would not make sense to use a batch job= instead?

Best,
Stefan

[1]=C2=A0https://github.com/king/bravo

Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.autio@rovio.com>:
= Thanks for the suggestions!

> In general, it would be= tremendously helpful to have a minimal working example which allows to rep= roduce the problem.

Definitely. The problem with r= eproducing has been that this only seems to happen in the bigger production= data volumes.

That's why I'm hoping to fi= nd a way to debug this with the production data. With that it seems to cons= istently cause some misses every time the job is killed/restored.

> check if it happens for shorter windows, like 1h = etc

What would be the benefit of that compar= ed to 24h window?

>=C2=A0simplify the job to no= t use a reduce window but simply a time window which outputs the window eve= nts. Then counting the input and output events should allow you to verify t= he results. If you are not seeing missing events, then it could have someth= ing to do with the reducing state used in the reduce function.
Hm, maybe, but not sure how useful that would be, because it w= ouldn't yet prove that it's related to reducing, because not having= a reduce function could also mean smaller load on the job, which might alo= ne be enough to make the problem not manifest.

Is = there a way to debug what goes into the reducing state (including what gets= removed or overwritten and what restored), if that makes sense..? Maybe so= me suitable logging could be used to prove that the lost data is written to= the reducing state (or at least asked to be written), but not found any mo= re when the window closes and state is flushed?

On= configuration once more, we're using RocksDB state backend with asynch= ronous incremental checkpointing. The state is restored from savepoints tho= ugh, we haven't been using those checkpoints in these tests (although t= hey could be used in case of=C2=A0crashes =E2=80=93 but we haven't had = those now).

On Th= u, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrmann@apache.org> wrote:
Hi Juho,
another idea to further narrow down the problem could be t= o simplify the job to not use a reduce window but simply a time window whic= h outputs the window events. Then counting the input and output events shou= ld allow you to verify the results. If you are not seeing missing events, t= hen it could have something to do with the reducing state used in the reduc= e function.

In general, it would be tremendously h= elpful to have a minimal working example which allows to reproduce the prob= lem.

Cheers,
Till

On Thu, Oct 4, 2018 at 2:02 PM Andrey = Zagrebin <= andrey@data-artisans.com> wrote:
Hi Juho,

can you try to red= uce the job to minimal reproducible example and share the job and input?

For example:
- some simple records as inpu= t, e.g. tuples of primitive types saved as cvs
- minimal deduplic= ation job which processes them and misses records
- check if it h= appens for shorter windows, like 1h etc
- setup which you use for= the job, ideally locally=C2=A0reproducible or cloud

Best,
Andrey

On 4 Oct 2018, at 11:13, Juho Autio <juho.autio@rovio.com> wrote:

<= div dir=3D"ltr">Sorry to insist, but we seem to be blocked for any serious = usage of state in Flink if we can't rely on it to not miss data in case= of restore.

Would anyone have suggestions for how to tr= oubleshoot this? So far I have verified with DEBUG logs that our reduce fun= ction gets to process also the data that is missing from window output.
=
On Mon, Oct 1, 2018 at 11:5= 6 AM Juho Autio <juho.autio@rovio.com> wrote:
Hi Andrey,

To ru= le out for good any questions about sink behaviour, the job was killed and = started with an additional Kafka sink.

The same nu= mber of ids were missed in both outputs: KafkaSink & BucketingSink.

I wonder what would be the next steps in debugging?
On Fri, Sep 21, 2018 at 3= :49 PM Juho Autio <juho.autio@rovio.com> wrote:
Thanks, And= rey.

> so it means that= the savepoint does not loose at least some dropped records.

I'm not sure what you mean by that? I mean, = it was known from the beginning, that not everything is lost before/after r= estoring a savepoint, just some records around the time of restoration. It&= #39;s not 100% clear whether records are lost before making a savepoint or = after restoring it. Although, based on the new DEBUG logs it seems more lik= e losing some records that are seen ~soon after restoring. It seems like Fl= ink would be somehow confused either about the restored state vs. new inser= ts to state. This could also be somehow linked to the high back pressure on= the kafka source while the stream is catching up.

> If it is feasible for your setup, I suggest to insert one more m= ap function after reduce and before sink.
> etc.

Isn't that the same thing that we discussed before? N= othing is sent to BucketingSink before the window closes, so I don't se= e how it would make any difference if we replace the BucketingSink with a m= ap function or another sink type. We don't create or restore savepoints= during the time when BucketingSink gets input or has open buckets =E2=80= =93 that happens at a much later time of day. I would focus on figuring out= why the records are lost while the window is open. But I don't know ho= w to do that. Would you have any additional suggestions?

On Fri, Sep 21, 2018 at 3:30 PM Andrey Z= agrebin <a= ndrey@data-artisans.com> wrote:
Hi Juho,

so it means that th= e savepoint does not loose at least some dropped records.

If it is feasible for your setup, I suggest to insert one more map = function after reduce and before sink.=C2=A0
The map function sho= uld be called right after window is triggered but before flushing to s3.
The result of reduce (deduped record) could be logged there.
<= div>This should allow to check whether the processed distinct records were = buffered in the state after the restoration from the savepoint or not. If t= hey were buffered we should see that there was an attempt to write them to = the sink from the state.

Another suggestion is to = try to write records to some other sink or to both.=C2=A0
E.g. if= you can access file system of workers, maybe just into local files and che= ck whether the records are also dropped there.

Best,
Andrey

On 20= Sep 2018, at 15:37, Juho Autio <juho.autio@rovio.com> wrote:

Hi Andrey!

I was finally able to gather the DEBU= G logs that you suggested. In short, the reducer logged that it processed a= t least some of the ids that were missing from the output.

"At least some", because I didn't have the job runni= ng with DEBUG logs for the full 24-hour window period. So I was only able t= o look up if I can find some=C2=A0of the missing ids in the DEBUG lo= gs. Which I did indeed.

I changed the DistinctFunc= tion.java to do this:

=C2=A0 =C2=A0 @Override=
=C2=A0 =C2=A0 public Map<String, String> reduce(Map<Str= ing, String> value1, Map<String, String> value2) {
=C2= =A0 =C2=A0 =C2=A0 =C2=A0 LOG.debug("DistinctFunction.reduce returns: {= }=3D{}", value1.get("field"), value1.get("id"));
=C2=A0 =C2=A0 =C2=A0 =C2=A0 return value1;
=C2=A0 =C2=A0= }

Then:

vi fl= ink-1.6.0/conf/log4j.properties
log4j.logger.org.apache.flink.str= eaming.runtime.tasks.StreamTask=3DDEBUG
log4j.logger.com.rovio.ds= .flink.uniqueid.DistinctFunction=3DDEBUG

The= n I ran the following kind of test:

- Cancelled th= e on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
-= Started a new cluster & job with DEBUG enabled at ~09:13, restored fro= m that previous cluster's savepoint
- Ran until caug= ht up offsets
- Cancelled the job with a new savepoint
- Started a new job _without_ DEBUG, which restored the new savepoint= , let it keep running so that it will eventually write the output

Then on the next day, after results had been flushed when t= he 24-hour window closed, I compared the results again with a batch version= 's output. And found some missing ids as usual.

I drilled down to one specific missing id (I'm replacing the actual v= alue with AN12345 below), which was not found in the stream output, but was= found in batch output & flink DEBUG logs.

Rel= ated to that id, I gathered the following information:

=
2018-09-18~09:13:21,000 job started & savepoint is restored
<= div>
2018-09-18 09:14:29,085 missing id is processed for the = first time, proved by this log line:
2018-09-18 09:14:29,085 DEBU= G com.rovio.ds.flink.uniqueid.DistinctFunction=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 - DistinctFunction.reduce returns: s.aid= 1=3DAN12345

2018-09-18 09:15:14,264 first sync= hronous part of checkpoint
2018-09-18 09:15:16,544 first asynchro= nous part of checkpoint

(
more occurrences of checkpoints (~1 min chec= kpointing time + ~1 min delay before next)
/
more occurrences of DistinctFunction.reduce
)

=
2018-09-18 09:23:45,053 missing id is processed for the last tim= e

2018-09-18~10:20:00,000 savepoint created & = job cancelled

To be noted, there was high backpres= sure after restoring from savepoint until the stream caught up with the kaf= ka offsets. Although, our job uses assign timestamps & watermarks on th= e flink kafka consumer itself, so event time of all partitions is synchroni= zed. As expected, we don't get any late data in the late data side outp= ut.

From this we can see that the miss= ing ids are processed by the reducer, but they must get lost somewhere befo= re the 24-hour window is triggered.

I think it'= ;s worth mentioning once more that the stream doesn't miss any ids if w= e let it's running without interruptions / state restoring.
<= br>
What's next?








On 29 Aug 2018, at = 12:11, Juho Autio <juho.autio@rovio.com> wrote:

And= rey, thank you very much for the debugging suggestions, I'll try them.<= div>
In the meanwhile two more questions, please:
> Just to keep in mind this problem with s3 and exclude it for sur= e. I would also check whether the size of missing events is around the batc= h size of BucketingSink or not.Fair enough, but I also want to focus on deb= ugging the most probable subject first. So what do you think about this =E2= =80=93 true or false: only when the 24-hour window triggers, BucketinSink g= ets a burst of input. Around the state restoring point (middle of the day) = it doesn't get any input, so it can't lose anything either. Isn'= ;t this true, or have I totally missed how Flink works in triggering window= results? I would not expect there to be any optimization that speculativel= y triggers early results of a regular time window to the downstream operato= rs.
> The old BucketingSink has in general problem = with s3. Internally BucketingSink queries s3 as a file system to list alrea= dy written file parts (batches) and determine index of the next part to sta= rt. Due to eventual consistency of checking file existence in s3 [1], the B= ucketingSink can rewrite the previously written part and basically loose it= .
I was wondering, what does S3's "read-after= -write consistency" (mentioned on the page you linked) actually mean. = It seems that this might be possible:
- LIST keys, find current m= ax index
- choose next index =3D max=C2=A0+ 1
- HEAD ne= xt index: if it exists, keep adding=C2=A0+ 1=C2=A0until key doesn't exi= st on S3
But definitely sounds easier if a sink keeps = track of files in a way that's guaranteed to be consistent.
<= /div>
Cheers,
Juho
On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <andrey@data-artisans.com> wr= ote:Hi,true,=C2=A0StreamingFileSink does not support s3 in 1.6.0, it is pla= nned for the next 1.7 release, sorry for confusion.The old=C2=A0BucketingSi= nk has in general problem with s3. Internally=C2=A0BucketingSink queries s3= as a file system=C2=A0to list already written file parts (batches) and det= ermine index of the next part to start. Due to eventual consistency of chec= king file existence in s3 [1], the BucketingSink can rewrite the previously= written part and basically loose it. It should be fixed for StreamingFileS= ink in 1.7 where Flink keeps its own track of written parts and does not re= ly on s3 as a file system.=C2=A0I also include Kostas, he might add more de= tails.=C2=A0Just to keep in mind this problem with s3 and exclude it for su= re=C2=A0 I would also check whether the size of missing events is around th= e batch size of BucketingSink or not. You also wrote that the timestamps of= lost event are 'probably' around the time of the savepoint, if it = is not yet for sure I would also check it.Have you already checked the log = files of job manager and task managers for the job running before and after= the restore from the check point? Is everything successful there, no error= s, relevant warnings or exceptions?As the next step, I would suggest to log= all encountered events in DistinctFunction.reduce if possible for producti= on data and check whether the missed events are eventually processed before= or after the savepoint. The following log message indicates a border betwe= en the events that should be included into the savepoint (logged before) or= not:=E2=80=9C{} ({}, synchronous part) in thread {} took {} ms=E2=80=9D (t= emplate)Also check if the savepoint has been overall completed:"{} ({}= , asynchronous part) in thread {} took {} ms."Best,Andrey[1]=C2=A0https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduc= tion.htmlOn 24 Aug 2018, at 20:41, Juho Autio <juho.autio@rovio.com> wrote:Hi,= Using StreamingFileSink is not a convenient option for production use for u= s as it doesn't support s3*. I could use StreamingFileSink=C2=A0just to= verify, but I don't see much point in doing so. Please consider my pre= vious comment:>=C2=A0I realized that BucketingSink must not play any rol= e in this problem. This is because only when the 24-hour window triggers, B= ucketingSink gets a burst of input. Around the state restoring point (middl= e of the day) it doesn't get any input, so it can't lose anything e= ither (right?).I could also use a kafka sink instead, but I can't imagi= ne how there could be any difference. It's very real that the sink does= n't get any input for a long time until the 24-hour window closes, and = then it quickly writes out everything because it's not that much data e= ventually for the distinct values.Any ideas for debugging what's happen= ing around the savepoint & restoration time?*) I actually implemented S= treamingFileSink as an alternative sink.=C2=A0This was before I came to rea= lize that most likely the sink component has nothing to do with the data lo= ss problem. I tried it with s3n://=C2=A0path just to see an exception being= thrown. In the source code I indeed then found an explicit check for the t= arget path scheme to be "hdfs://".=C2=A0On Fri, Aug 24, 2018 at 7= :49 PM Andrey Zagrebin <andrey@data-artisans.com> wrote:Ok, I think before fur= ther debugging the window reduced state,=C2=A0could you try the new =E2=80= =98StreamingFileSink=E2=80=99 [1] introduced in Flink 1.6.0 instead of the = previous 'BucketingSink=E2=80=99?Cheers,Andrey[1]=C2=A0https://ci.apache.org/projects/flink/flink-= docs-stable/dev/connectors/streamfile_sink.htmlOn 24 Aug 2018, at 18:03= , Juho Autio <= juho.autio@rovio.com> wrote:Yes, sorry for my confusing comment. I j= ust meant that it seems like there's a bug somewhere now that the outpu= t is missing some data.>=C2=A0I would wait and check the actual output i= n s3 because it is the main result of the jobYes, and that's what I hav= e already done. There seems to be always some data loss with the production= data volumes, if the job has been restarted on that day.Would you have any= suggestions for how to debug this further?Many thanks for stepping in.On F= ri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <andrey@data-artisans.com> wrote:H= i Juho,So it is a per key deduplication job.Yes, I would wait and check the= actual output in s3 because it is the main result of the job and> The l= ate data around the time of taking savepoint might be not included into the= savepoint but it should be=C2=A0behind the snapshotted offset in Kafka.is not a bug, it is a poss= ible behaviour.The savepoint is a snapshot of the data in transient which i= s already consumed from Kafka.Basically the full contents of the window res= ult is split between the savepoint and what can come after the=C2=A0savepoi= nt'ed offset in Kafka but before the window result is written into s3.= =C2=A0Allowed lateness should not affect it, I am just saying that the fina= l result in s3 should include all records after it.=C2=A0This is what shoul= d be guaranteed but not the contents of the intermediate savepoint.Cheers,A= ndreyOn 24 Aug 2018, at 16:52, Juho Autio <juho.autio@rovio.com> wrote:Thanks for = your answer!I check for the missed data from the final output on s3. So I w= ait until the next day, then run the same thing re-implemented in batch, an= d compare the output.> The late data around the time of taking savepoint= might be not included into the savepoint but it should be behind the snaps= hotted offset in Kafka.Yes, I would definitely expect that. It seems like t= here's a bug somewhere.> Then it should just come later after the re= store and should be reduced within the allowed lateness into the final resu= lt which is saved into s3.Well, as far as I know, allowed lateness doesn= 9;t play any role here, because I started running the job with allowedLaten= ess=3D0, and still get the data loss, while my late data output doesn't= receive anything.> Also, is this `DistinctFunction.reduce` just an exam= ple or the actual implementation, basically saving just one of records insi= de the 24h window in s3? then what is missing there?Yes, it's the actua= l implementation. Note that there's a keyBy before the=C2=A0DistinctFun= ction. So there's one record for each key (which is the combination of = a couple of fields). In practice I've seen that we're missing ~2000= -4000 elements on each restore, and the total output is obviously much more= than that.Here's the full code for the key selector:public class MapKe= ySelector implements KeySelector<Map<String,String>, Object> {= =C2=A0 =C2=A0 private final String[] fields;=C2=A0 =C2=A0 public MapKeySele= ctor(String... fields) {=C2=A0 =C2=A0 =C2=A0 =C2=A0 this.fields =3D fields;= =C2=A0 =C2=A0 }=C2=A0 =C2=A0 @Override=C2=A0 =C2=A0 public Object getKey(Ma= p<String, String> event) throws Exception {=C2=A0 =C2=A0 =C2=A0 =C2= =A0 Tuple key =3D Tuple.getTupleClass(fields.length).newInstance();=C2=A0 = =C2=A0 =C2=A0 =C2=A0 for (int i =3D 0; i < fields.length; i++) {=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 key.setField(event.getOrDefault(fields[i= ], ""), i);=C2=A0 =C2=A0 =C2=A0 =C2=A0 }=C2=A0 =C2=A0 =C2=A0 =C2= =A0 return key;=C2=A0 =C2=A0 }}And a more exact example on how it's use= d:=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0=C2=A0.keyBy(new M= apKeySelector("ID", "PLAYER_ID", "FIELD", &qu= ot;KEY_NAME", "KEY_VALUE"))=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 .timeWindow(Time.days(1))=C2=A0 =C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .reduce(new DistinctFunction())On Fri, Aug = 24, 2018 at 5:26 PM Andrey Zagrebin <andrey@data-artisans.com> wrote:Hi Juho,W= here exactly does the data miss? When do you notice that?=C2=A0Do you check= it:- debugging `DistinctFunction.reduce` right after resume in the middle = of the day=C2=A0or=C2=A0- some distinct records miss in the final output of= =C2=A0BucketingSink in s3 after window result is actually triggered and sav= ed into s3 at the end of the day? is this the main output?The late data aro= und the time of taking savepoint might be not included into the savepoint b= ut it should be behind the snapshotted offset in Kafka. Then it should just= come later after the restore and should be reduced within the allowed late= ness into the final result which is saved into s3.Also, is this `DistinctFu= nction.reduce` just an example or the actual implementation, basically savi= ng just one of records inside the 24h window in s3? then what is missing th= ere?Cheers,AndreyOn 23 Aug 2018, at 15:42, Juho Autio <juho.autio@rovio.com> wrote= :I changed to allowedLateness=3D0, no change, still missing data when resto= ring from savepoint.On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.autio@rovio.com= > wrote:I realized that BucketingSink must not play any role in this pro= blem. This is because only when the 24-hour window triggers, BucketinSink g= ets a burst of input. Around the state restoring point (middle of the day) = it doesn't get any input, so it can't lose anything either (right?)= .I will next try removing the allowedLateness entirely from the equation.In= the meanwhile, please let me know if you have any suggestions for debuggin= g the lost data, for example what logs to enable.We use FlinkKafkaConsumer0= 10 btw. Are there any known issues with that, that could contribute to lost= data when restoring a savepoint?On Fri, Aug 17, 2018 at 4:23 PM Juho Autio= <juho.autio@r= ovio.com> wrote:Some data is silently lost on my Flink stream job wh= en state is restored from a savepoint.Do you have any debugging hints to fi= nd out where exactly the data gets dropped?My job gathers distinct values u= sing a 24-hour window. It doesn't have any custom state management.When= I cancel the job with savepoint and restore from that savepoint, some data= is missed. It seems to be losing just a small amount of data. The event ti= me of lost data is probably around the time of savepoint. In other words th= e rest of the time window is not entirely missed =E2=80=93 collection works= correctly also for (most of the) events that come in after restoring.When = the job processes a full 24-hour window without interruptions it doesn'= t miss anything.Usually the problem doesn't happen in test environments= that have smaller parallelism and smaller data volumes. But in production = volumes the job seems to be consistently missing at least something on ever= y restore.This issue has consistently happened since the job was initially = created. It was at first run on an older version of Flink 1.5-SNAPSHOT and = it still happens on both Flink 1.5.2 & 1.6.0.I'm wondering if this = could be for example some synchronization issue between the kafka consumer = offsets vs. what's been written by BucketingSink?1. Job content, simpli= fied=C2=A0 =C2=A0 =C2=A0 =C2=A0=C2=A0kafkaStream=C2=A0 =C2=A0 =C2=A0 =C2=A0= =C2=A0 =C2=A0 =C2=A0 =C2=A0 .flatMap(new ExtractFieldsFunction())=C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .keyBy(new MapKeySelector(= 1, 2, 3, 4))=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .timeWi= ndow(Time.days(1))=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .= allowedLateness(allowedLateness)=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 .sideOutputLateData(lateDataTag)=C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 .reduce(new DistinctFunction())=C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .addSink(sink)=C2=A0 =C2=A0 =C2= =A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 // use a fixed number of output part= itions=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .setParalleli= sm(8))/**=C2=A0* Usage: .keyBy("the", "distinct", "= ;fields").reduce(new DistinctFunction())=C2=A0*/public class DistinctF= unction implements ReduceFunction<java.util.Map<String, String>>= ; {=C2=A0 =C2=A0 @Override=C2=A0 =C2=A0 public Map<String, String> re= duce(Map<String, String> value1, Map<String, String> value2) {= =C2=A0 =C2=A0 =C2=A0 =C2=A0 return value1;=C2=A0 =C2=A0 }}2. State configur= ationboolean enableIncrementalCheckpointing =3D true;String statePath =3D &= quot;s3n://bucket/savepoints";new RocksDBStateBackend(statePath= , enableIncrementalCheckpointing);Checkpointing Mode Exactly OnceInterval 1= m 0sTimeout 10m 0sMinimum Pause Between Checkpoints 1m 0sMaximum Concurrent= Checkpoints 1Persist Checkpoints Externally Enabled (retain on cancellatio= n)3. BucketingSink configurationWe use BucketingSink, I don't think the= re's anything special here, if not the fact that we're writing to S= 3.=C2=A0 =C2=A0 =C2=A0 =C2=A0 String outputPath =3D "s3://bucket/ou= tput";=C2=A0 =C2=A0 =C2=A0 =C2=A0 BucketingSink<Map<String, = String>> sink =3D new BucketingSink<Map<String, String>>(= outputPath)=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .setBuck= eter(new ProcessdateBucketer())=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 .setBatchSize(batchSize)=C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 = =C2=A0 =C2=A0 =C2=A0 .setInactiveBucketThreshold(inactiveBucketThreshold)= =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 =C2=A0 .setInactiveBucketC= heckInterval(inactiveBucketCheckInterval);=C2=A0 =C2=A0 =C2=A0 =C2=A0 sink.= setWriter(new IdJsonWriter());4. Kafka & event timeMy flink job reads t= he data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the = kafka consumer to synchronize watermarks accross all kafka partitions. We a= lso write late data to side output, but nothing is written there =E2=80=93 = if it would, it could explain missed data in the main output (I'm also = sure that our late data writing works, because we previously had some actua= l late data which ended up there).5. allowedLatenessIt may be or may not be= relevant that I have also enabled allowedLateness with 1 minute lateness o= n the 24-hour window:If that makes sense, I could try removing allowedLaten= ess entirely? That would be just to rule out that Flink doesn't have a = bug that's related to restoring state in combination with the allowedLa= teness feature. After all, all of our data should be in a good enough order= to not be late, given the max out of orderness used on kafka consumer time= stamp extractor.Thank you in advance!


=


=






=


=










--
Juho Autio
= Senior Data Engineer

Data Engineering,= Games
Rovio Entertainment Corporation
<= /span>
Mobile: + 358 (0)45 313 0122
juho.autio@rovio.com=C2=A0
www.rovio.co= m


This message and its attachments may contain confidential inform= ation and is intended solely for the attention and use of the named address= ee(s). If you are not the intended recipient and / or you have received thi= s message in error, please contact the sender immediately and delete all ma= terial you have received in this message. You are hereby notified that any = use of the information, which you have received in error in whatsoever form= , is strictly prohibited. Thank you for your co-operation.



--
Konstantin Knauf | Solutions Architect
+49 160 91394525

<= div style=3D"line-height:1.38;margin-top:0pt;margin-bottom:0pt">=

= Follow us @VervericaData
--
Flink Forward - The Apache Flink Conference
Stream Process= ing | Event Driven | Real Time
--=
Data Artisans GmbH | Invalidenst= rasse 115, 10115 Berlin, Germany
= --
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B=
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan = Ewen=C2=A0=C2=A0=C2=A0



--
Konstantin Knauf | S= olutions Architect
+49 160 913945= 25


Follow us @Verv= ericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processing | Event Driven | Real Time
--
Data Artis= ans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--

Registered at Amtsgericht Charlo= ttenburg: HRB 158244 B
Managing Directors: Dr. Kostas= Tzoumas, Dr. Stephan Ewen= =C2=A0=C2=A0=C2=A0



--
=
Konstantin Knauf | Solutions Architect
+49 160 91394525

Follow us @VervericaData
--
Join Flin= k Forward - The Apache F= link Conference
=
Stream Processing | Event Driven | Real Time<= /span>
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, German= y
--
Data Artisans GmbH
Registered= at Amtsgericht Charlottenburg: HRB 158244 B
Managing= Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen=C2=A0=C2=A0=C2=A0



--
Konstantin Knauf | Solutions Archit= ect
+49 160 91394525
=

Follow us @VervericaData<= /span>
--
Join Flink Forward - The Apache Flink Conference
Stre= am Processing | Event Driven | Real Time
--
Data Artisans GmbH | I= nvalidenstrasse 115, 10115 Berlin, Germany
--
Data Artis= ans GmbH
Registered at Amtsgericht Charlottenburg: HR= B 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr= . Stephan Ewen=C2=A0=C2=A0= =C2=A0


--
=
Konstantin Knauf= | Solutions Architect
<= br>
Follow us @VervericaData
--
Join = Flink Forward - The Apache Flink= Conference
Stream Processing | E= vent Driven | Real Time
--=
Data Artisans GmbH | Invalidenstrasse 1= 15, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B=
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen=C2=A0=C2=A0=C2=A0



--
Konstantin Knauf<= /span> | Solutions Architect
+49 160 91394525
<= /div>
<= span style=3D"font-size:10pt;font-family:Roboto;background-color:transparen= t;font-weight:400;font-style:normal;font-variant-ligatures:normal;font-vari= ant-caps:normal;font-variant-east-asian:normal;text-decoration:none;vertica= l-align:baseline;white-space:pre-wrap">Follow us @VervericaData
--
Join Flink Forward - The Apache Flink Conference
Stream Processi= ng | Event Driven | Real Time
--<= /span>
Data Artisans GmbH | Invalidenstr= asse 115, 10115 Berlin, Germany
-= -
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B<= /span>
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan E= wen=C2=A0=C2=A0=C2=A0


--
Konstantin Knauf | Solutions Archit= ect
+49 160 91394525
=

Follow us @VervericaData
--
Join Flink For= ward - The Apache Flink<= /span> Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
<= span style=3D"font-size:10pt;font-family:Roboto;background-color:transparen= t;font-weight:400;font-style:normal;font-variant-ligatures:normal;font-vari= ant-caps:normal;font-variant-east-asian:normal;text-decoration:none;vertica= l-align:baseline;white-space:pre-wrap">--
Data Artisans GmbH
Registered at A= mtsgericht Charlottenburg: HRB 158244 B
Managing Dire= ctors: Dr. Kostas Tzoumas, Dr. Stephan Ewen=C2=A0=C2=A0=C2=A0


--
Konstantin Knauf | Solutions Architec= t
+49 160 91394525

= Follow us @VervericaData
--
Join Flink Forwa= rd - The Apache Flink Conference
Stream Processing | Event Driven | Real Time<= /div>
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amt= sgericht Charlottenburg: HRB 158244 B
Managing Direct= ors: Dr. Kostas Tzoumas, Dr. Stephan Ewen=C2=A0=C2=A0=C2=A0


--
Konstantin Knauf | = Solutions Architect
+49 160 91394= 525

Follow us @VervericaData
--
Join Flink Forward - T= he Apache Flink Conference
Stream Processing | Event Driven |= Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Ber= lin, Germany
--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen=C2=A0=C2=A0=C2=A0

--
Konstantin Knauf | Solutions Architec= t
+49 160 91394525

= Follow us @VervericaData
--
Join Flink Forwa= rd - The Apache Flink Conference
Stream Processing | Event Driven | Real Time<= /div>
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
--
Data Artisans GmbH
Registered at Amt= sgericht Charlottenburg: HRB 158244 B
Managing Direct= ors: Dr. Kostas Tzoumas, Dr. Stephan Ewen=C2=A0=C2=A0=C2=A0


--
Konstantin Knauf | Solutions Architect
= +49 160 91394525
Follow us @VervericaData
--
Join Flink Forward - The Apache Flink C= onference
Stream Processing | Eve= nt Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115= , 10115 Berlin, Germany
--=
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
=C2=A0=C2=A0=C2=A0
<= /div>


--
Konstantin Knauf | Solutions Arch= itect
+49 160 91394525
<= a href=3D"https://www.ververica.com/" style=3D"text-decoration:none" target= =3D"_blank">

Follow us @VervericaData
--
Join Flink For= ward - The Apache Flink<= /span> Conference
Stream Processing | Event Driven | Real Time
--
Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
<= span style=3D"font-size:10pt;font-family:Roboto;background-color:transparen= t;font-weight:400;font-style:normal;font-variant-ligatures:normal;font-vari= ant-caps:normal;font-variant-east-asian:normal;text-decoration:none;vertica= l-align:baseline;white-space:pre-wrap">--
Data Artisans GmbH
Registered at A= mtsgericht Charlottenburg: HRB 158244 B
Managing Dire= ctors: Dr. Kostas Tzoumas, Dr. Stephan Ewen=C2=A0=C2=A0=C2=A0



--
=

= Konstantin= Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Even= t Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 = Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlotten= burg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen<= /span>=C2= =A0=C2=A0=C2=A0


--
=

= Konstantin= Knauf | Solutions Architect

+49 160 91394525


Follow us @VervericaData

--

Join Flink Forward - The Apache Flink Conference

Stream Processing | Even= t Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 = Berlin, Germany

--

Data Artisans GmbH
Registered at Amtsgericht Charlotten= burg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen<= /span>=C2= =A0=C2=A0=C2=A0


--

Konstantin Knauf | Solut= ions Architect

+49 160 91394525


Planned Absences: 17.04.2019 - 26.04.2019

=


Follow us @VervericaData

--

Join Flink Forw= ard - The Apache Flink Conference

Stream Processing | Event Driven | Real = Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany<= /span>

--

Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244= B=
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen=C2=A0=C2=A0=C2=A0 =
=
--0000000000001bca7a0587f7aa7c--