flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Juho Autio <juho.au...@rovio.com>
Subject Re: Data loss when restoring from savepoint
Date Fri, 03 May 2019 08:37:32 GMT
Konstantin, thanks for providing the new code.

Here are the latest results for jobs run with extended DEBUG logging.

20190427 (killed & restored), missing_rows.count(): 3470
20190428 (no kill / restore), missing_rows.count(): 0

I have shared the logs from 27th (after restore) in private with Konstantin.

On Fri, Apr 26, 2019 at 5:05 PM Konstantin Knauf <konstantin@ververica.com>
wrote:

> Hi Juho,
>
> sorry for not being more responsive the last two weeks, I was on vacation
> for a good part of it. The fact that this also happens with Timers on
> RocksDB is again confusing. The code that we mainly had a look at so far is
> not used by the rocksdb configuration. So the inconsistencies that we saw
> in the logs, don't apply to the RocksDB configuration.
>
> Anyway, I agree to further track down the issue for the heap timers first,
> and then to move on to RocksDB. I have added more fine grained logging to
> the branch [1]. The two additional classes, which you need to set the
> logging level to DEBUG for, are
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
>
> org.apache.flink.streaming.api.operators.InternalTimerServiceSerializationProxy
>
> Please run through the usual procedure of doing a savepoint and provide
> the logs during recovery.
>
> Thank you for your perseverance,
>
> Konstantin
>
> [1] https://github.com/knaufk/flink/tree/logging-timers
>
>
> On Thu, Apr 18, 2019 at 4:06 PM Oytun Tez <oytun@motaword.com> wrote:
>
>> Thanks for the update, Juho, and please do keep updating :) I've been
>> watching the thread silently, I am sure your findings helps many others who
>> watch the thread.
>>
>>
>>
>>
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oytun@motaword.com — www.motaword.com
>>
>>
>> On Thu, Apr 18, 2019 at 8:26 AM Juho Autio <juho.autio@rovio.com> wrote:
>>
>>> In the meanwhile, some additional results, continued with ROCKSDB timer
>>> service:
>>>
>>> 20190416 (no cancellation), missing_rows.count(): 0
>>> 20190417 (cancel with savepoint & restore), missing_rows.count(): 54
>>>
>>> On Tue, Apr 16, 2019 at 2:35 PM Juho Autio <juho.autio@rovio.com> wrote:
>>>
>>>> Ouch, we have a data loss case now also with ROCKSDB timer service
>>>> factory. This time the job had failed for some reason & restored checkpoint
>>>> by itself (I mean I didn’t cancel with savepoint this time. Previous
>>>> restore from savepoint was at 14-04-2019 06:21:45 UTC).
>>>>
>>>> In this case the number of lost ids was quite high:
>>>>
>>>> 20190415, missing_rows.count(): 706605
>>>>
>>>> I don't know if the ROCKSDB timer service is a factor towards higher
>>>> instability, but indeed I'd like to go back to testing with
>>>> InteralTimerServiceImpl as well. Will switch back to that when the updated
>>>> branch is available. Also I'm not sure if the cause of data loss is similar
>>>> now with ROCKSDB timer service factory (lost timers or maybe something
>>>> else), because we didn't have corresponding DEBUG logging for this
>>>> implementation.
>>>>
>>>> On Mon, Apr 15, 2019 at 11:27 AM Konstantin Knauf <
>>>> konstantin@ververica.com> wrote:
>>>>
>>>>> Hi Juho,
>>>>>
>>>>> this is good news indeed! I have had a look at the _metadata files and
>>>>> logs on Friday and it looks like a) the timer state is contained in the
>>>>> savepoint files and b) the timer state is also initially read by the
>>>>> TaskStateManagerImpl, but they it is somehow lost until the reach the
>>>>> InteralTimerServiceImpl. I will provide updated version of my branch
>>>>> with more logging output to find the reason for this today or tomorrow. It
>>>>> would be great, if you could test this again then.
>>>>>
>>>>> Best,
>>>>>
>>>>> Konstantin
>>>>>
>>>>> On Mon, Apr 15, 2019 at 9:49 AM Juho Autio <juho.autio@rovio.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Great news:
>>>>>> There's no data loss (for the 3 days so far that were run) with
>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB.
>>>>>>
>>>>>> Each day the job was once cancelled with savepoint & restored.
>>>>>>
>>>>>> 20190412, missing_rows.count(): 0
>>>>>> 20190413, missing_rows.count(): 0
>>>>>> 20190414, missing_rows.count(): 0
>>>>>>
>>>>>> Btw, now we don't get the DEBUG logs of
>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl any more,
>>>>>> so I didn't know how to check from logs how many timers are restored. But
>>>>>> based on the results I'm assuming that all were successfully restored.
>>>>>>
>>>>>> We'll keep testing this a bit more, but seems really promising
>>>>>> indeed. I thought at least letting it run for some days without
>>>>>> cancellations and on the other hand cancelling many times within the same
>>>>>> day etc.
>>>>>>
>>>>>> Can I provide some additional debug logs or such to help find the bug
>>>>>> when 'heap' is used for timers? Did you already analyze the _metadata files
>>>>>> that I sent?
>>>>>>
>>>>>> On Thu, Apr 11, 2019 at 4:21 PM Juho Autio <juho.autio@rovio.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Shared _metadata files also, in private.
>>>>>>>
>>>>>>> The job is now running with
>>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB. I started it from
>>>>>>> empty state because I wasn't sure would this change be migrated
>>>>>>> automatically(?). I guess clean setup like this is a good idea any way.
>>>>>>> First day that is fully processed with this conf will be tomorrow=Friday,
>>>>>>> and results can be compared on the next day.. I'll report back on that on
>>>>>>> Monday. I verified from Flink UI that the property is found in
>>>>>>> Configuration, but I still feel a bit unsure about if it's actually being
>>>>>>> used. I wonder if there's some INFO level logging that could be checked to
>>>>>>> confirm that?
>>>>>>>
>>>>>>> Thanks.
>>>>>>>
>>>>>>> On Thu, Apr 11, 2019 at 4:01 PM Konstantin Knauf <
>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>
>>>>>>>> Hi Juho,
>>>>>>>>
>>>>>>>> thank you. I will have a look at your logs later today or tomorrow.
>>>>>>>> Could you also provide the metadata file of the savepoints in question? It
>>>>>>>> is located in the parent directory of that savepoint and should follow this
>>>>>>>> naming ptterns "savepoints_.*_savepoint_.*__metadata".
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Konstantin
>>>>>>>>
>>>>>>>> On Thu, Apr 11, 2019 at 2:39 PM Stefan Richter <
>>>>>>>> s.richter@ververica.com> wrote:
>>>>>>>>
>>>>>>>>> No, it also matters for savepoints. I think the doc here is
>>>>>>>>> misleading, it is currently synchronous for all cases of RocksDB keyed
>>>>>>>>> state and heap timers.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Stefan
>>>>>>>>>
>>>>>>>>> On 11. Apr 2019, at 14:30, Juho Autio <juho.autio@rovio.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Thanks Till. Any way, that's irrelevant in case of a savepoint,
>>>>>>>>> right?
>>>>>>>>>
>>>>>>>>> On Thu, Apr 11, 2019 at 2:54 PM Till Rohrmann <
>>>>>>>>> trohrmann@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Juho,
>>>>>>>>>>
>>>>>>>>>> yes, it means that the snapshotting of the timer state does not
>>>>>>>>>> happen asynchronously but synchronously within the Task executor thread.
>>>>>>>>>> During this operation, your operator won't make any progress, potentially
>>>>>>>>>> causing backpressure for upstream operators.
>>>>>>>>>>
>>>>>>>>>> If you want to use fully asynchronous snapshots while also using
>>>>>>>>>> timer state, you should use the RocksDB backed timers.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 11, 2019 at 10:32 AM Juho Autio <juho.autio@rovio.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ok, I'm testing that
>>>>>>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB in the meanwhile.
>>>>>>>>>>>
>>>>>>>>>>> Btw, what does this actually mean (from
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html
>>>>>>>>>>> ):
>>>>>>>>>>>
>>>>>>>>>>> > The combination RocksDB state backend / with incremental
>>>>>>>>>>> checkpoint / with heap-based timers currently does NOT support asynchronous
>>>>>>>>>>> snapshots for the timers state. Other state like keyed state is still
>>>>>>>>>>> snapshotted asynchronously. Please note that this is not a regression from
>>>>>>>>>>> previous versions and will be resolved with FLINK-10026.
>>>>>>>>>>>
>>>>>>>>>>> Is it just that snapshots are not asynchronous, so they cause
>>>>>>>>>>> some pauses? Does "not supported" here mean just some performance impact,
>>>>>>>>>>> or also correctness?
>>>>>>>>>>>
>>>>>>>>>>> Our job at hand is using RocksDB state backend and incremental
>>>>>>>>>>> checkpointing. However at least the restores that we've been testing here
>>>>>>>>>>> have been from a *savepoint*, not an incremental checkpoint.
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Apr 10, 2019 at 4:46 PM Konstantin Knauf <
>>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>
>>>>>>>>>>>> one more thing we could try in a separate experiment is to
>>>>>>>>>>>> change the timer state backend to RocksDB as well by setting
>>>>>>>>>>>> state.backend.rocksdb.timer-service.factory: ROCKSDB
>>>>>>>>>>>> in the flink-conf.yaml and see if this also leads to the loss
>>>>>>>>>>>> of records. That would narrow it down quite a bit.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>>
>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Apr 10, 2019 at 1:02 PM Konstantin Knauf <
>>>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>
>>>>>>>>>>>>> sorry for the late reply. Please continue to use the custom
>>>>>>>>>>>>> Flink build and add additional logging for TaskStateManagerImpl by adding
>>>>>>>>>>>>> the following line to your log4j configuration.
>>>>>>>>>>>>>
>>>>>>>>>>>>> log4j.logger.org.apache.flink.runtime.state.TaskStateManagerImpl=DEBUG
>>>>>>>>>>>>>
>>>>>>>>>>>>> Afterwards, do a couple of savepoint & restore until you see a
>>>>>>>>>>>>> number of restores < 80 as before and share the logs with me (at least for
>>>>>>>>>>>>> TaskStateMangerImpl & InternalTimerServiceImpl).
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Apr 4, 2019 at 9:03 AM Juho Autio <
>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Konstantin,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the follow-up.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> There are only 76 lines for restore in Job 3 instead of 80.
>>>>>>>>>>>>>>> It would be very useful to know, if these lines were lost by the log
>>>>>>>>>>>>>>> aggregation or really did not exist.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I fetched the actual taskmanager.log files to verify (we
>>>>>>>>>>>>>> store the original files on s3). Then did grep for
>>>>>>>>>>>>>> "InternalTimerServiceImpl  - Restored".
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is for "job 1. (start - end) first restore with debug
>>>>>>>>>>>>>> logging":
>>>>>>>>>>>>>> Around 2019-03-26 09:08:43,352 - 78 hits
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is for "job 3. (start-middle) 3rd restore with debug
>>>>>>>>>>>>>> logging (following day)":
>>>>>>>>>>>>>> Around 2019-03-27 07:39:06,414 - 76 hits
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So yeah, we can rely on our log delivery to Kibana.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Note that as a new piece of information I found that the same
>>>>>>>>>>>>>> job also did an automatic restore from checkpoint around 2019-03-30 20:36
>>>>>>>>>>>>>> and there were 79 hits instead of 80. So it doesn't seem to be only a
>>>>>>>>>>>>>> problem in case of savepoints, can happen with a checkpoint restore as well.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Were there any missing records in the output for the day of
>>>>>>>>>>>>>>> the Job 1 -> Job 2 transition (26th of March)?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 20190326: missing 2592
>>>>>>>>>>>>>> 20190327: missing 4270
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This even matches with the fact that on 26th 2 timers were
>>>>>>>>>>>>>> missed in restore but on 27th it was 4.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What's next? :)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Thu, Apr 4, 2019 at 12:32 AM Konstantin Knauf <
>>>>>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> one thing that makes the log output a little bit hard to
>>>>>>>>>>>>>>> analyze is the fact, that the "Snapshot" lines include Savepoints as well
>>>>>>>>>>>>>>> as Checkpoints. To identify the savepoints, I looked at the last 80 lines
>>>>>>>>>>>>>>> per job, which seems plausible given the timestamps of the lines.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So, let's compare the number of timers before and after
>>>>>>>>>>>>>>> restore:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Job 1 -> Job 2
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 23.091.002 event time timers for both. All timers for the
>>>>>>>>>>>>>>> same window. So this looks good.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Job 2 -> Job 3
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 18.565.234 timers during snapshotting. All timers for the
>>>>>>>>>>>>>>> same window.
>>>>>>>>>>>>>>> 17.636.774 timers during restore. All timers for the same
>>>>>>>>>>>>>>> window.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> There are only 76 lines for restore in Job 3 instead of 80.
>>>>>>>>>>>>>>> It would be very useful to know, if these lines were lost by the log
>>>>>>>>>>>>>>> aggregation or really did not exist.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Were there any missing records in the output for the day of
>>>>>>>>>>>>>>> the Job 1 -> Job 2 transition (26th of March)?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Mar 29, 2019 at 2:21 PM Juho Autio <
>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I created a zip with these files:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> job 1. (start - end) first restore with debug logging
>>>>>>>>>>>>>>>> job 2. (start-middle) second restore with debug logging
>>>>>>>>>>>>>>>> (same day)
>>>>>>>>>>>>>>>> job 2. (middle - end) before savepoint & cancel (following
>>>>>>>>>>>>>>>> day)
>>>>>>>>>>>>>>>> job 3. (start-middle) 3rd restore with debug logging
>>>>>>>>>>>>>>>> (following day)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It can be downloaded here:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://www.dropbox.com/s/33z0jbolueokao6/flink_debug_logs.zip?dl=0
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Thu, Mar 28, 2019 at 7:08 PM Konstantin Knauf <
>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Yes, the number is the last number in the line. Feel free
>>>>>>>>>>>>>>>>> to share all lines.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Mar 28, 2019 at 5:00 PM Juho Autio <
>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Konstantin!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I would be interested in any changes in the number of
>>>>>>>>>>>>>>>>>>> timers, not only the number of logged messages.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Sorry for the delay. I see, the count is the number of
>>>>>>>>>>>>>>>>>> timers that last number on log line. For example for this row it's 270409:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> March 26th 2019, 11:08:39.822 DEBUG
>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl Restored:
>>>>>>>>>>>>>>>>>>> TimeWindow{start=1553558400000, end=1553644800000} -> 270409
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The log lines don't contain task id – how should they be
>>>>>>>>>>>>>>>>>> compared across different snapshots? Or should I share all of these logs
>>>>>>>>>>>>>>>>>> (at least couple of snapshots around the point of restore) and you'll
>>>>>>>>>>>>>>>>>> compare them?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Tue, Mar 26, 2019 at 9:55 PM Konstantin Knauf <
>>>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I based the branch on top of the current 1.6.4 branch. I
>>>>>>>>>>>>>>>>>>> can rebase on 1.6.2 for any future iterations. I would be interested in any
>>>>>>>>>>>>>>>>>>> changes in the number of timers, not only the number of logged messages.
>>>>>>>>>>>>>>>>>>> The sum of all counts should be the same during snapshotting and restore.
>>>>>>>>>>>>>>>>>>> While a window is open, this number should always increase (when comparing
>>>>>>>>>>>>>>>>>>> multiple snapshots).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Mar 26, 2019 at 11:01 AM Juho Autio <
>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi Konstantin,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I got that debug logging working.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> You would now need to take a savepoint and restore
>>>>>>>>>>>>>>>>>>>>> sometime in the middle of the day and should be able to check
>>>>>>>>>>>>>>>>>>>>> a) if there are any timers for the very old windows,
>>>>>>>>>>>>>>>>>>>>> for which there is still some content lingering around
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> No timers for old windows were logged.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> All timers are for the same time window, for example:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> March 26th 2019, 11:08:39.822 DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl Restored:
>>>>>>>>>>>>>>>>>>>>> TimeWindow{start=1553558400000, end=1553644800000} -> 270409
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Those milliseconds correspond to:
>>>>>>>>>>>>>>>>>>>> Tue Mar 26 00:00:00 UTC 2019 – Wed Mar 27 00:00:00 UTC
>>>>>>>>>>>>>>>>>>>> 2019.
>>>>>>>>>>>>>>>>>>>> - So this seems normal
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> b) if there less timers after restore for the current
>>>>>>>>>>>>>>>>>>>>> window. The missing timers would be recreated, as soon as any additional
>>>>>>>>>>>>>>>>>>>>> records for the same key arrive within the window. This means the number of
>>>>>>>>>>>>>>>>>>>>> missing records might be less then the number of missing timers.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Grepping for "Restored" gives 78 hits. That's
>>>>>>>>>>>>>>>>>>>> suspicious because this job's parallelism is 80. The following group for
>>>>>>>>>>>>>>>>>>>> grep "Snapshot" already gives 80 hits. Ok actually that would match with
>>>>>>>>>>>>>>>>>>>> what you wrote: "missing timers would be recreated, as soon as any
>>>>>>>>>>>>>>>>>>>> additional records for the same key arrive within the window".
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I tried killing & restoring once more. This time
>>>>>>>>>>>>>>>>>>>> grepping for "Restored" gives 80 hits. Note that it's possible that some
>>>>>>>>>>>>>>>>>>>> logs had been lost around the time of restoration because I'm browsing the
>>>>>>>>>>>>>>>>>>>> logs through Kibana (ELK stack).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I will try kill & restore again tomorrow around noon &
>>>>>>>>>>>>>>>>>>>> collect the same info. Is there anything else that you'd like me to share?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> By the way, it seems that your branch* is not based on
>>>>>>>>>>>>>>>>>>>> 1.6.2 release, why so? It probably doesn't matter, but in general would be
>>>>>>>>>>>>>>>>>>>> good to minimize the scope of changes. But let's roll with this for now, I
>>>>>>>>>>>>>>>>>>>> don't want to build another package because it seems like we're able to
>>>>>>>>>>>>>>>>>>>> replicate the issue with this version :)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Juho
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> *)
>>>>>>>>>>>>>>>>>>>> https://github.com/apache/flink/compare/release-1.6.2...knaufk:logging-timers
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Wed, Mar 20, 2019 at 2:20 PM Konstantin Knauf <
>>>>>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I created a branch [1] which logs the number of event
>>>>>>>>>>>>>>>>>>>>> time timers per namespace during snapshot and restore.  Please refer to [2]
>>>>>>>>>>>>>>>>>>>>> to build Flink from sources.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> You need to set the logging level to DEBUG for
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl. If you
>>>>>>>>>>>>>>>>>>>>> use log4j this is a one-liner in your log4j.properties:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> log4j.logger.org.apache.flink.streaming.api.operators.InternalTimerServiceImpl=DEBUG
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The only additional logs will be the lines added in
>>>>>>>>>>>>>>>>>>>>> the branch. The lines are of the following format (<Window> -> <Number of
>>>>>>>>>>>>>>>>>>>>> Timers>), e.g.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=1553083589256, end=1553083589258} -> 1
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=1553083589256, end=1553083589258} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=1553083589456, end=1553083589458} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=1553083589356, end=1553083589358} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=1553083589482, end=1553083589484} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=1553083589456, end=1553083589458} -> 1
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=1553083589256, end=1553083589258} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=1553083589356, end=1553083589358} -> 1
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=1553083589456, end=1553083589458} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=1553083589482, end=1553083589484} -> 1
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=1553083589356, end=1553083589358} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Snapshot: TimeWindow{start=1553083589482, end=1553083589484} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=1553083589256, end=1553083589258} -> 1
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=1553083589456, end=1553083589458} -> 1
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=1553083589356, end=1553083589358} -> 1
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=1553083589482, end=1553083589484} -> 1
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=1553083589256, end=1553083589258} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=1553083589456, end=1553083589458} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=1553083589356, end=1553083589358} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=1553083589482, end=1553083589484} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=1553083589256, end=1553083589258} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=1553083589456, end=1553083589458} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=1553083589356, end=1553083589358} -> 2
>>>>>>>>>>>>>>>>>>>>> DEBUG
>>>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>>>>>>>>>>>>>>>>>>> Restored: TimeWindow{start=1553083589482, end=1553083589484} -> 2
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> You would now need to take a savepoint and restore
>>>>>>>>>>>>>>>>>>>>> sometime in the middle of the day and should be able to check
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> a) if there are any timers for the very old windows,
>>>>>>>>>>>>>>>>>>>>> for which there is still some content lingering around
>>>>>>>>>>>>>>>>>>>>> b) if there less timers after restore for the current
>>>>>>>>>>>>>>>>>>>>> window. The missing timers would be recreated, as soon as any additional
>>>>>>>>>>>>>>>>>>>>> records for the same key arrive within the window. This means the number of
>>>>>>>>>>>>>>>>>>>>> missing records might be less then the number of missing timers.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Looking forward to the results!
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>> https://github.com/knaufk/flink/tree/logging-timers
>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#build-flink
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Tue, Mar 19, 2019 at 2:06 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks, answers below.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> * Which Flink version do you need this for?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 1.6.2
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> * You use RocksDBStatebackend, correct? If so, which
>>>>>>>>>>>>>>>>>>>>>> value do your set for "state.backend.rocksdb.timer-service.factory" in the
>>>>>>>>>>>>>>>>>>>>>> flink-conf.yaml.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Yes, RocksDBStatebackend. We don't
>>>>>>>>>>>>>>>>>>>>>> set state.backend.rocksdb.timer-service.factory at all, so whatever is the
>>>>>>>>>>>>>>>>>>>>>> default in Flink 1.6.2? Based on the docs it seems that it would be "heap"
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/large_state_tuning.html
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Mon, Mar 18, 2019 at 6:26 PM Konstantin Knauf <
>>>>>>>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I will prepare a Flink branch for you, which logs
>>>>>>>>>>>>>>>>>>>>>>> the number of event time timers per window before snapshot and after
>>>>>>>>>>>>>>>>>>>>>>> restore. With this we should be able to check, if timers are lost during
>>>>>>>>>>>>>>>>>>>>>>> savepoints.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Two questions:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> * Which Flink version do you need this for? 1.6?
>>>>>>>>>>>>>>>>>>>>>>> * You use RocksDBStatebackend, correct? If so, which
>>>>>>>>>>>>>>>>>>>>>>> value do your set for "state.backend.rocksdb.timer-service.factory" in the
>>>>>>>>>>>>>>>>>>>>>>> flink-conf.yaml.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 14, 2019 at 12:20 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hi Konstantin,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Reading timers from snapshot doesn't seem
>>>>>>>>>>>>>>>>>>>>>>>> straightforward. I wrote in private with Gyula, he gave more suggestions
>>>>>>>>>>>>>>>>>>>>>>>> (thanks!) but still it seems that it may be a rather big effort for me to
>>>>>>>>>>>>>>>>>>>>>>>> figure it out. Would you be able to help with that? If yes, there's this
>>>>>>>>>>>>>>>>>>>>>>>> existing unit test that can be extended to test reading timers:
>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/king/bravo/blob/master/bravo/src/test/java/com/king/bravo/ReducerStateReadingTest.java#L37-L38
>>>>>>>>>>>>>>>>>>>>>>>> . The test already has a state with some values in reducer window state, so
>>>>>>>>>>>>>>>>>>>>>>>> I'm assuming that it must also contain some window timers.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> This is what Gyula wrote to me:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Maybe I was wrong when I said the createOperatorStateBackendsFromSnapshot
>>>>>>>>>>>>>>>>>>>>>>>> is the way to do it.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On a second thought Timers are probably stored as
>>>>>>>>>>>>>>>>>>>>>>>> raw keyed state in the operator. I don’t remember building any utility to
>>>>>>>>>>>>>>>>>>>>>>>> read that.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> At the moment I am quite busy with other work so
>>>>>>>>>>>>>>>>>>>>>>>> wont have time to build it for you, so you might have to figure it out
>>>>>>>>>>>>>>>>>>>>>>>> yourself.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I would try to look at how keyed states are read:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Look at the implementation of:
>>>>>>>>>>>>>>>>>>>>>>>> createOperatorStateBackendsFromSnapshot()
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Instead of getManagedOperatorState you want to try
>>>>>>>>>>>>>>>>>>>>>>>> getRawKeyedState and also look at how Flink restores it internally for
>>>>>>>>>>>>>>>>>>>>>>>> Timers
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I would start looking around here I guess:
>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L238
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/e8daa49a593edc401cd44761b25b1324b11be4a6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java#L199
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Mar 12, 2019 at 5:41 PM Gyula Fóra <
>>>>>>>>>>>>>>>>>>>>>>>> gyula.fora@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Should be possible to read timer states by:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> OperatorStateReader#createOperatorStateBackendFromSnapshot
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Then you have to get the timer state out of the
>>>>>>>>>>>>>>>>>>>>>>>>> OperatorStateBackend, but keep in mind that this will restore the operator
>>>>>>>>>>>>>>>>>>>>>>>>> states in memory.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Mar 12, 2019 at 4:29 PM Konstantin Knauf <
>>>>>>>>>>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> okay, so it seems that although the watermark
>>>>>>>>>>>>>>>>>>>>>>>>>> passed the endtime of the event time windows,  the window was not triggered
>>>>>>>>>>>>>>>>>>>>>>>>>> for some of the keys.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> The timers, which would trigger the firing of the
>>>>>>>>>>>>>>>>>>>>>>>>>> window, are also part of the keyed state and are snapshotted/restored. I
>>>>>>>>>>>>>>>>>>>>>>>>>> would like to check if timers (as opposed to the window content itself) are
>>>>>>>>>>>>>>>>>>>>>>>>>> maybe lost during the savepoint & restore procedure. Using Bravo, are you
>>>>>>>>>>>>>>>>>>>>>>>>>> also able to inspect the timer state of the savepoints? In particular, I
>>>>>>>>>>>>>>>>>>>>>>>>>> would be interested if for two subsequent savepoints all timers (i.e. one
>>>>>>>>>>>>>>>>>>>>>>>>>> timer per window and key including the missing keys) are present in the
>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> @Gyula Fóra <gyula.fora@gmail.com>: Does Bravo
>>>>>>>>>>>>>>>>>>>>>>>>>> support reading timer state as well?
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 7, 2019 at 6:41 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Right, the window operator is the one by name
>>>>>>>>>>>>>>>>>>>>>>>>>>> "DistinctFunction".
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> http
>>>>>>>>>>>>>>>>>>>>>>>>>>> http://10.1.59.75:20888/proxy/application_1551956351667_0001/jobs/3e4ffaadbd84af3488286863f00d4f23/vertices/19ede2f818524a7f310857e537fa6808/metrics\?get\=0.currentInputWatermark,1.currentInputWatermark,2.currentInputWatermark,3.currentInputWatermark,4.currentInputWatermark,5.currentInputWatermark,6.currentInputWatermark,7.currentInputWatermark,8.currentInputWatermark,9.currentInputWatermark,10.currentInputWatermark,11.currentInputWatermark,12.currentInputWatermark,13.currentInputWatermark,14.currentInputWatermark,15.currentInputWatermark,16.currentInputWatermark,17.currentInputWatermark,18.currentInputWatermark,19.currentInputWatermark,20.currentInputWatermark,21.currentInputWatermark,22.currentInputWatermark,23.currentInputWatermark,24.currentInputWatermark,25.currentInputWatermark,26.currentInputWatermark,27.currentInputWatermark,28.currentInputWatermark,29.currentInputWatermark,30.currentInputWatermark,31.currentInputWatermark,32.currentInputWatermark,33.currentInputWatermark,34.currentInputWatermark,35.currentInputWatermark,36.currentInputWatermark,37.currentInputWatermark,38.currentInputWatermark,39.currentInputWatermark,40.currentInputWatermark,41.currentInputWatermark,42.currentInputWatermark,43.currentInputWatermark,44.currentInputWatermark,45.currentInputWatermark,46.currentInputWatermark,47.currentInputWatermark,48.currentInputWatermark,49.currentInputWatermark,50.currentInputWatermark,51.currentInputWatermark,52.currentInputWatermark,53.currentInputWatermark,54.currentInputWatermark,55.currentInputWatermark,56.currentInputWatermark,57.currentInputWatermark,58.currentInputWatermark,59.currentInputWatermark,60.currentInputWatermark,61.currentInputWatermark,62.currentInputWatermark,63.currentInputWatermark,64.currentInputWatermark,65.currentInputWatermark,66.currentInputWatermark,67.currentInputWatermark,68.currentInputWatermark,69.currentInputWatermark,70.currentInputWatermark,71.currentInputWatermark,72.currentInputWatermark,73.currentInputWatermark,74.currentInputWatermark,75.currentInputWatermark,76.currentInputWatermark,77.currentInputWatermark,78.currentInputWatermark,79.currentInputWatermark
>>>>>>>>>>>>>>>>>>>>>>>>>>> | jq '.[].value' --raw-output | uniq -c
>>>>>>>>>>>>>>>>>>>>>>>>>>>   80 1551980102743
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> date -r "$((1551980102743/1000))"
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thu Mar  7 19:35:02 EET 2019
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> To me that makes sense – how would the window be
>>>>>>>>>>>>>>>>>>>>>>>>>>> triggered at all, if not all sub-tasks have a high enough watermark, so
>>>>>>>>>>>>>>>>>>>>>>>>>>> that the operator level watermark can be advanced.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 7, 2019 at 5:33 PM Konstantin Knauf <
>>>>>>>>>>>>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> great, we are getting closer :)  Could you
>>>>>>>>>>>>>>>>>>>>>>>>>>>> please check the "Watermarks" tab the Flink UI of this job and check if the
>>>>>>>>>>>>>>>>>>>>>>>>>>>> current watermark for all parallel subtasks of the WindowOperator is close
>>>>>>>>>>>>>>>>>>>>>>>>>>>> to the current date/time?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Mar 7, 2019 at 3:01 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Wow, indeed the missing data from previous
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> date is still found in the savepoint!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Actually what I now found is that there is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still data from even older dates in the state:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> %%spark
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state_json_next_day.groupBy(state_json_next_day.ts.substr(1,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 10).alias('day')).count().orderBy('day').show(n=1000)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +----------+--------+
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> |       day|   count|
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +----------+--------+
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2018-08-22|    4206|
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ..
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (manually truncated)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ..
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2019-02-03|       4|
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2019-02-14|   12881|
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2019-02-15|    1393|
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2019-02-25|    8774|
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2019-03-06|    9293|
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> |2019-03-07|28113105|
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +----------+--------+
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Of course that's the expected situation after
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> we have learned that some window contents are left untriggered.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't have the logs any more, but I think
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on 2018-08-22 I have reset the state, and since then it's been always
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> kept/restored from savepoint. I can also see some dates there on which I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> didn't cancel the stream. But I can't be sure if it has gone through some
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> automatic restart by flink. So we can't rule out that some window contents
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wouldn't sometimes also be missed during normal operation. However,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint restoration at least makes the problem more prominent. I have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> previously mentioned that I would suspect this to be some kind of race
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> condition that is affected by load on the cluster. Reason for my suspicion
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is that during savepoint restoration the cluster is also catching up kafka
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> offsets on full speed, so it is considerably more loaded than usually.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Otherwise this problem might not have much to do with savepoints of course.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Are you able to investigate the problem in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink code based on this information?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Many thanks,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 6, 2019 at 1:41 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the investigation & summary.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As you suggested, I will next take savepoints
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on two subsequent days & check the reducer state for both days.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 6, 2019 at 1:18 PM Konstantin
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Knauf <konstantin@ververica.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (Moving the discussion back to the ML)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after looking into your code, we are still
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> pretty much in the dark with respect what is going wrong.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Let me try to summarize, what we know given
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your experiments so far:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 1) the lost records were processed and put
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> into state *before* the restart of the job, not afterwards
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2) the lost records are part of the state
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after the restore (because they are contained in subsequent savepoints)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3) the sinks are not the problem (because
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the metrics of the WindowOperator showed that the missing records have not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> been sent to the sinks)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 4) it is not the batch job used for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reference, which is wrong, because of 1)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 5) records are only lost when restarting
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from a savepoint (not during normal operations)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One explanation would be, that one of the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> WindowOperators did not fire (for whatever reason) and the missing records
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are still in the window's state when you run your test. Could you please
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check, whether this is the case by taking a savepoint on the next day and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check if the missing records are contained in it.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Feb 18, 2019 at 8:32 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Konstantin, thanks.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I gathered the additional info as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussed. No surprises there.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * do you know if all lost records are
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contained in the last savepoint you took before the window fired? This
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would mean that no records are lost after the last restore.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Indeed this is the case. I saved the list
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of all missing IDs, analyzed the savepoint with Bravo, and the savepoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state (already) contained all IDs that were eventually missed in output.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * could you please check the numRecordsOut
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metric for the WindowOperator (FlinkUI -> TaskMetrics -> Select TaskChain
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> containing WindowOperator -> find metric)? Is the count reported there
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> correct (no missing data)?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The number matches with output rows. The
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sum of numRecordsOut metrics was 45755630, and count(*) of the output on s3
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> resulted in the same number. Batch output has a bit more IDs of course
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (this time it was 1194). You wrote "Is the count reported there correct (no
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> missing data)?" but I have slightly different viewpoint; I agree that the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reported count is correct (in flink's scope, because the number is the same
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as what's in output file). But I think "no missing data" doesn't belong
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> here. Data is missing, but it's consistently missing from both output files
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and numRecordsOut metrics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Next thing I'll work on is preparing the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> code to be shared..
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Btw, I used this script to count the sum of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> numRecordsOut (I'm going to look into enabling Sl4jReporter eventually) :
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> JOB_URL=
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> http://10.1.56.245:20888/proxy/application_1550217512987_0001/jobs/068813ab8e6cbebaf7d306a0f41993c2
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunctionID=`http $JOB_URL \
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> | jq '.vertices[] | select(.name ==
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "DistinctFunction") | .id' --raw-output`
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> echo
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "DistinctFunctionID=$DistinctFunctionID"
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> http
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> $JOB_URL/vertices/19ede2f818524a7f310857e537fa6808/metrics | jq '.[] | .id'
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --raw-output | grep "[0-9][0-9]*\\.numRecordsOut$" \
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> | xargs -I@ sh -c "http GET
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> $JOB_URL/vertices/19ede2f818524a7f310857e537fa6808/metrics?get=@ | jq
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> '.[0].value' --raw-output" > numRecordsOut.txt
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> # " eval_math( '+'.join( file.readlines ) )
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> paste -sd+ numRecordsOut.txt | bc
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Feb 14, 2019 at 2:44 PM Konstantin
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Knauf <konstantin@ververica.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * does the output of the streaming job
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contain any data, which is not contained in the batch
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> No.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * do you know if all lost records are
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contained in the last savepoint you took before the window fired? This
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would mean that no records are lost after the last restore.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I haven't built the tooling required to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check all IDs like that, but yes, that's my understanding currently. To
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check that I would need to:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - kill the stream only once on a given
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> day (so that there's only one savepoint creation & restore)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - next day or later: save all missing ids
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from batch output comparison
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - next day or later: read the savepoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with bravo & check that it contains all of those missing IDs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> However I haven't built the tooling for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that yet. Do you think it's necessary to verify that this assumption holds?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> It would be another data point and might
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> help us to track down the problem. Wether it is worth doing it, depends on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the result, i.e. wether the current assumption would be falsified or not,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> but we only know that in retrospect ;)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * could you please check the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> numRecordsOut metric for the WindowOperator (FlinkUI -> TaskMetrics ->
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Select TaskChain containing WindowOperator -> find metric)? Is the count
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reported there correct (no missing data)?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is that metric the result of window
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trigger? If yes, you must mean that I check the value of that metric on the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> next day after restore, so that it only contains the count for the output
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of previous day's window? The counter is reset to 0 when job starts (even
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> when state is restored), right?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, this metric would be incremented when
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the window is triggered. Yes, please check this metric after the window,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> during which the restore happened, is fired.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If you don't have a MetricsReporter
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> configured so far, I recommend to quickly register a Sl4jReporter to log
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> out all metrics every X seconds (maybe even minutes for your use case):
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then you don't need to go trough the WebUI and can keep a history of the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> metrics.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Otherwise, do you have any suggestions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for how to instrument the code to narrow down further where the data gets
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lost? To me it would make sense to proceed with this, because the problem
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> seems hard to reproduce outside of our environment.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Let's focus on checking this metric above,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to make sure that the WindowOperator is actually emitting less records than
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the overall number of keys in the state as your experiments suggest, and on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sharing the code.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Feb 14, 2019 at 10:57 AM
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you are right the problem has actually
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> been narrowed down quite a bit over time. Nevertheless, sharing the code
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (incl. flink-conf.yaml) might be a good idea. Maybe something strikes the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> eye, that we have not thought about so far. If you don't feel comfortable
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sharing the code on the ML, feel free to send me a PM.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Besides that, three more questions:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * does the output of the streaming job
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contain any data, which is not contained in the batch output?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * do you know if all lost records are
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contained in the last savepoint you took before the window fired? This
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would mean that no records are lost after the last restore.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> * could you please check the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> numRecordsOut metric for the WindowOperator (FlinkUI -> TaskMetrics ->
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Select TaskChain containing WindowOperator -> find metric)? Is the count
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reported there correct (no missing data)?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Feb 13, 2019 at 3:19 PM Gyula
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Fóra <gyula.fora@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry not posting on the mail list was
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> my mistake :/
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, 13 Feb 2019 at 15:01, Juho
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Autio <juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for stepping in, did you post
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> outside of the mailing list on purpose btw?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This I did long time ago:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To rule out for good any questions
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> about sink behaviour, the job was killed and started with an additional
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka sink.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The same number of ids were missed in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> both outputs: KafkaSink & BucketingSink.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (I wrote about that On Oct 1, 2018 in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this email thread)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> After that I did the savepoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> analysis with Bravo.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Currently I'm indeed trying to get
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions how to debug further, for example, where to add additional
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> kafka output, to catch where the data gets lost. That would probably be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> somewhere in Flink's internals.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I could try to share the full code
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also, but IMHO the problem has been quite well narrowed down, considering
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that data can be found in savepoint, savepoint is successfully restored,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and after restoring the data doesn't go to "user code" (like the reducer)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any more.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Feb 13, 2019 at 3:47 PM Gyula
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Fóra <gyula.fora@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think the reason you are not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> getting much answers here is because it is very hard to debug this problem
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> remotely.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Seemingly you do very normal
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> operations, the state contains all the required data and nobody else has
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> hit a similar problem for ages.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My best guess would be some bug with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the deduplication or output writing logic but without a complete code
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> example its very hard to say anything useful.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Did you try writing it to Kafka to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> see if the output is there? (that way we could rule out the dedup probllem)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Feb 13, 2019 at 2:37 PM Juho
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Autio <juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stefan (or anyone!), please, could I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have some feedback on the findings that I reported on Dec 21, 2018? This is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still a major blocker..
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Jan 31, 2019 at 11:46 AM
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio <juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hello, is there anyone that could
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> help with this?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 11, 2019 at 8:14 AM
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio <juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stefan, would you have time to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> comment?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wednesday, January 2, 2019,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio <juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bump – does anyone know if Stefan
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> will be available to comment the latest findings? Thanks.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Dec 21, 2018 at 2:33 PM
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio <juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stefan, I managed to analyze
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint with bravo. It seems that the data that's missing from output
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> *is* found in savepoint.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I simplified my test case to the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> following:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - job 1 has bee running for ~10
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> days
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - savepoint X created & job 1
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cancelled
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - job 2 started with restore
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from savepoint X
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then I waited until the next day
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so that job 2 has triggered the 24 hour window.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then I analyzed the output &
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - compare job 2 output with the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output of a batch pyspark script => find 4223 missing rows
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - pick one of the missing rows
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (say, id Z)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - read savepoint X with bravo,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> filter for id Z => Z was found in the savepoint!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> How can it be possible that the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value is in state but doesn't end up in output after state has been
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restored & window is eventually triggered?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also did similar analysis on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the previous case where I savepointed & restored the job multiple times (5)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> within the same 24-hour window. A missing id that I drilled down to, was
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> found in all of those savepoints, yet missing from the output that gets
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> written at the end of the day. This is even more surprising: that the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> missing ID was written to the new savepoints also after restoring. Is the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reducer state somehow decoupled from the window contents?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Big thanks to bravo-developer
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula for guiding me through to be able read the reducer state!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/king/bravo/pull/11
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula also had an idea for how
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to troubleshoot the missing data in a scalable way: I could add some "side
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> effect kafka output" on individual operators. This should allow tracking
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more closely at which point the data gets lost. However, maybe this would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have to be in some Flink's internal components, and I'm not sure which
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> those would be.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 19, 2018 at 11:52 AM
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio <juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Stefan,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bravo doesn't currently support
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reading a reducer state. I gave it a try but couldn't get to a working
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> implementation yet. If anyone can provide some insight on how to make this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> work, please share at github:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/king/bravo/pull/11
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 3:32 PM
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I was glad to find that bravo
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> had now been updated to support installing bravo to a local maven repo.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I was able to load a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoint created by my job, thanks to the example provided in bravo
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> README, but I'm still missing the essential piece.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> My code was:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>         OperatorStateReader
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reader = new OperatorStateReader(env2, savepoint, "DistinctFunction");
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>         DontKnowWhatTypeThisIs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reducingState = reader.readKeyedStates(what should I put here?);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't know how to read the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> values collected from reduce() calls in the state. Is there a way to access
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the reducing state of the window with bravo? I'm a bit confused how this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> works, because when I check with debugger, flink internally uses
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a ReducingStateDescriptor with name=window-contents, but still reading
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> operator state for "DistinctFunction" didn't at least throw an exception
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ("window-contents" threw – obviously there's no operator by that name).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 15, 2018 at 2:25
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Stefan,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry but it doesn't seem
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> immediately clear to me what's a good way to use
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/king/bravo
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> How are people using it?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would you for example modify build.gradle somehow to publish the bravo as a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> library locally/internally? Or add code directly in the bravo project
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (locally) and run it from there (using an IDE, for example)? Also it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> doesn't seem like the bravo gradle project supports building a flink job
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> jar, but if it does, how do I do it?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 9:30
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Good then, I'll try to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> analyze the savepoints with Bravo. Thanks!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > How would you assume that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> backpressure would influence your updates? Updates to each local state
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still happen event-by-event, in a single reader/writing thread.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sure, just an ignorant guess
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by me. I'm not familiar with most of Flink's internals. Any way high
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> backpressure is not a seen on this job after it has caught up the lag, so
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at I thought it would be worth mentioning.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 6:24
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PM Stefan Richter <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> s.richter@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Am 04.10.2018 um 16:08
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> schrieb Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > you could take a look at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bravo [1] to query your savepoints and to check if the state in the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint complete w.r.t your expectations
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks. I'm not 100% if
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this is the case, but to me it seemed like the missed ids were being logged
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> by the reducer soon after the job had started (after restoring a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint). But on the other hand, after that I also made another savepoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> & restored that, so what I could check is: does that next savepoint have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the missed ids that were logged (a couple of minutes before the savepoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was created, so there should've been more than enough time to add them to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the state before the savepoint was triggered) or not. Any way, if I would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be able to verify with Bravo that the ids are missing from the savepoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (even though reduced logged that it saw them), would that help in figuring
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> out where they are lost? Is there some major difference compared to just
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> looking at the final output after window has been triggered?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that makes a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> difference. For example, you can investigate if there is a state loss or a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem with the windowing. In the savepoint you could see which keys
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> exists and to which windows they are assigned. Also just to make sure there
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is no misunderstanding: only elements that are in the state at the start of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a savepoint are expected to be part of the savepoint; all elements between
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> start and completion of the savepoint are not expected to be part of the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > I also doubt that the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem is about backpressure after restore, because the job will only
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> continue running after the state restore is already completed.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I'm not suspecting
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that the state restoring would be the problem either. My concern was about
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> backpressure possibly messing with the updates of reducing state? I would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tend to suspect that updating the state consistently is what fails, where
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> heavy load / backpressure might be a factor.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> How would you assume that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> backpressure would influence your updates? Updates to each local state
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> still happen event-by-event, in a single reader/writing thread.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 4:18
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> PM Stefan Richter <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> s.richter@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you could take a look at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bravo [1] to query your savepoints and to check if the state in the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint complete w.r.t your expectations. I somewhat doubt that there is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a general problem with the state/savepoints because many users are
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> successfully running it on a large state and I am not aware of any data
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> loss problems, but nothing is impossible. What the savepoint does is also
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> straight forward: iterate a db snapshot and write all key/value pairs to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> disk, so all data that was in the db at the time of the savepoint, should
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> show up. I also doubt that the problem is about backpressure after restore,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because the job will only continue running after the state restore is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> already completed. Did you check if you are using exactly-one-semantics or
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at-least-once semantics? Also did you check that the kafka consumer start
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> position is configured properly [2]? Are watermarks generated as expected
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after restore?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One more unrelated
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> high-level comment that I have: for a granularity of 24h windows, I wonder
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if it would not make sense to use a batch job instead?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stefan
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/king/bravo
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Am 04.10.2018 um 14:53
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> schrieb Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestions!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > In general, it would be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tremendously helpful to have a minimal working example which allows to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reproduce the problem.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Definitely. The problem
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with reproducing has been that this only seems to happen in the bigger
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> production data volumes.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> That's why I'm hoping to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> find a way to debug this with the production data. With that it seems to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consistently cause some misses every time the job is killed/restored.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > check if it happens for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> shorter windows, like 1h etc
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What would be the benefit
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of that compared to 24h window?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > simplify the job to not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use a reduce window but simply a time window which outputs the window
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> events. Then counting the input and output events should allow you to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> verify the results. If you are not seeing missing events, then it could
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have something to do with the reducing state used in the reduce function.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hm, maybe, but not sure
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how useful that would be, because it wouldn't yet prove that it's related
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to reducing, because not having a reduce function could also mean smaller
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> load on the job, which might alone be enough to make the problem not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> manifest.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is there a way to debug
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> what goes into the reducing state (including what gets removed or
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> overwritten and what restored), if that makes sense..? Maybe some suitable
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> logging could be used to prove that the lost data is written to the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reducing state (or at least asked to be written), but not found any more
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> when the window closes and state is flushed?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On configuration once
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more, we're using RocksDB state backend with asynchronous incremental
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpointing. The state is restored from savepoints though, we haven't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> been using those checkpoints in these tests (although they could be used in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> case of crashes – but we haven't had those now).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3:25 PM Till Rohrmann <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> trohrmann@apache.org>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> another idea to further
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> narrow down the problem could be to simplify the job to not use a reduce
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window but simply a time window which outputs the window events. Then
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> counting the input and output events should allow you to verify the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> results. If you are not seeing missing events, then it could have something
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to do with the reducing state used in the reduce function.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In general, it would be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tremendously helpful to have a minimal working example which allows to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reproduce the problem.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2:02 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can you try to reduce
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the job to minimal reproducible example and share the job and input?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - some simple records as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> input, e.g. tuples of primitive types saved as cvs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - minimal deduplication
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> job which processes them and misses records
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - check if it happens
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for shorter windows, like 1h etc
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - setup which you use
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for the job, ideally locally reproducible or cloud
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 4 Oct 2018, at 11:13,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry to insist, but we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> seem to be blocked for any serious usage of state in Flink if we can't rely
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on it to not miss data in case of restore.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would anyone have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions for how to troubleshoot this? So far I have verified with DEBUG
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> logs that our reduce function gets to process also the data that is missing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from window output.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 1, 2018 at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 11:56 AM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Andrey,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To rule out for good
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any questions about sink behaviour, the job was killed and started with an
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> additional Kafka sink.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The same number of ids
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> were missed in both outputs: KafkaSink & BucketingSink.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I wonder what would be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the next steps in debugging?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 21, 2018 at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 3:49 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, Andrey.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > so it means that the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint does not loose at least some dropped records.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm not sure what you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mean by that? I mean, it was known from the beginning, that not everything
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is lost before/after restoring a savepoint, just some records around the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> time of restoration. It's not 100% clear whether records are lost before
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> making a savepoint or after restoring it. Although, based on the new DEBUG
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> logs it seems more like losing some records that are seen ~soon after
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restoring. It seems like Flink would be somehow confused either about the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restored state vs. new inserts to state. This could also be somehow linked
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to the high back pressure on the kafka source while the stream is catching
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> up.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > If it is feasible
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for your setup, I suggest to insert one more map function after reduce and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> before sink.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > etc.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Isn't that the same
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> thing that we discussed before? Nothing is sent to BucketingSink before the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window closes, so I don't see how it would make any difference if we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> replace the BucketingSink with a map function or another sink type. We
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> don't create or restore savepoints during the time when BucketingSink gets
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> input or has open buckets – that happens at a much later time of day. I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would focus on figuring out why the records are lost while the window is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> open. But I don't know how to do that. Would you have any additional
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggestions?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 21, 2018
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at 3:30 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so it means that the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint does not loose at least some dropped records.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If it is feasible for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your setup, I suggest to insert one more map function after reduce and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> before sink.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The map function
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should be called right after window is triggered but before flushing to s3.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The result of reduce
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (deduped record) could be logged there.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This should allow to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check whether the processed distinct records were buffered in the state
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after the restoration from the savepoint or not. If they were buffered we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should see that there was an attempt to write them to the sink from the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Another suggestion is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to try to write records to some other sink or to both.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> E.g. if you can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> access file system of workers, maybe just into local files and check
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> whether the records are also dropped there.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 20 Sep 2018, at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 15:37, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Andrey!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I was finally able to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> gather the DEBUG logs that you suggested. In short, the reducer logged that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it processed at least some of the ids that were missing from the output.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "At least some",
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because I didn't have the job running with DEBUG logs for the full 24-hour
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window period. So I was only able to look up if I can find
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> *some* of the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> missing ids in the DEBUG logs. Which I did indeed.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I changed the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunction.java to do this:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     public
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Map<String, String> reduce(Map<String, String> value1, Map<String, String>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value2) {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LOG.debug("DistinctFunction.reduce returns: {}={}", value1.get("field"),
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value1.get("id"));
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> vi
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flink-1.6.0/conf/log4j.properties
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then I ran the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> following kind of test:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Cancelled the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on-going job with savepoint created at ~Sep 18 08:35 UTC 2018
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Started a new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cluster & job with DEBUG enabled at ~09:13, restored from that previous
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cluster's savepoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Ran until caught up
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> offsets
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Cancelled the job
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with a new savepoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Started a new job
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> _without_ DEBUG, which restored the new savepoint, let it keep running so
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that it will eventually write the output
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then on the next day,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after results had been flushed when the 24-hour window closed, I compared
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the results again with a batch version's output. And found some missing ids
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> as usual.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I drilled down to one
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> specific missing id (I'm replacing the actual value with AN12345 below),
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which was not found in the stream output, but was found in batch output &
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> flink DEBUG logs.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Related to that id, I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> gathered the following information:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18~09:13:21,000
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> job started & savepoint is restored
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 09:14:29,085 missing id is processed for the first time, proved by this log
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> line:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 09:14:29,085 DEBUG com.rovio.ds.flink.uniqueid.DistinctFunction
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>       - DistinctFunction.reduce returns: s.aid1=AN12345
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 09:15:14,264 first synchronous part of checkpoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 09:15:16,544 first asynchronous part of checkpoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more occurrences of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoints (~1 min checkpointing time + ~1 min delay before next)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> /
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more occurrences of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunction.reduce
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 09:23:45,053 missing id is processed for the last time
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18~10:20:00,000
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint created & job cancelled
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To be noted, there
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was high backpressure after restoring from savepoint until the stream
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> caught up with the kafka offsets. Although, our job uses assign timestamps
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> & watermarks on the flink kafka consumer itself, so event time of all
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> partitions is synchronized. As expected, we don't get any late data in the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> late data side output.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> From this we can see
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that the missing ids are processed by the reducer, but they must get lost
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> somewhere before the 24-hour window is triggered.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it's worth
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mentioning once more that the stream doesn't miss any ids if we let it's
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> running without interruptions / state restoring.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What's next?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Aug 29, 2018
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at 3:49 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > only when the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 24-hour window triggers, BucketingSink gets a burst of input
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is of course
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> totally true, my understanding is the same. We cannot exclude problem there
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for sure, just savepoints are used a lot w/o problem reports
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and BucketingSink is known to be problematic with s3. That is why, I asked
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > You also wrote
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that the timestamps of lost event are 'probably' around the time of the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint, if it is not yet for sure I would also check it.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Although, bucketing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink might loose any data at the end of the day (also from the middle). The
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fact, that it is always around the time of taking a savepoint and not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> random, is surely suspicious and possible savepoint failures need to be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> investigated.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the s3
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem, s3 doc says:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > The caveat is that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if you make a HEAD or GET request to the key name (to find if the object
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> exists) before creating the object, Amazon S3 provides
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'eventual consistency' for read-after-write.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The algorithm you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggest is how it is roughly implemented now
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (BucketingSink.openNewPartFile). My understanding is that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'eventual consistency’ means that even if you just created file (its name
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is key) it can be that you do not get it in the list or exists (HEAD)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> returns false and you risk to rewrite the previous part.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The BucketingSink was
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> designed for a standard file system. s3 is used over a file system wrapper
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> atm but does not always provide normal file system guarantees. See also
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> last example in [1].
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 29 Aug 2018, at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 12:11, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey, thank you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> very much for the debugging suggestions, I'll try them.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In the meanwhile two
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more questions, please:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > Just to keep in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mind this problem with s3 and exclude it for sure. I would also check
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> whether the size of missing events is around the batch size of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BucketingSink or not.Fair enough, but I also want to focus on debugging the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> most probable subject first. So what do you think about this – true or
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> false: only when the 24-hour window triggers, BucketinSink gets a burst of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> input. Around the state restoring point (middle of the day) it doesn't get
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any input, so it can't lose anything either. Isn't this true, or have I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> totally missed how Flink works in triggering window results? I would not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expect there to be any optimization that speculatively triggers early
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> results of a regular time window to the downstream operators.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > The old
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BucketingSink has in general problem with s3. Internally BucketingSink
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> queries s3 as a file system to list already written file parts (batches)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and determine index of the next part to start. Due to eventual consistency
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of checking file existence in s3 [1], the BucketingSink can rewrite the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> previously written part and basically loose it.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I was wondering,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> what does S3's "read-after-write consistency" (mentioned on the page you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> linked) actually mean. It seems that this might be possible:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - LIST keys, find
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> current max index
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - choose next index
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> = max + 1
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - HEAD next index:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> if it exists, keep adding + 1 until key doesn't exist on S3
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> But definitely
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sounds easier if a sink keeps track of files in a way that's guaranteed to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be consistent.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Aug 27, 2018
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> at 2:04 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Hi,true, StreamingFileSink does not support s3 in 1.6.0, it is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> planned for the next 1.7 release, sorry for confusion.The old BucketingSink
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> has in general problem with s3. Internally BucketingSink queries s3 as a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> file system to list already written file parts (batches) and determine
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> index of the next part to start. Due to eventual consistency of checking
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> file existence in s3 [1], the BucketingSink can rewrite the previously
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> written part and basically loose it. It should be fixed for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> StreamingFileSink in 1.7 where Flink keeps its own track of written parts
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and does not rely on s3 as a file system. I also include Kostas, he might
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> add more details. Just to keep in mind this problem with s3 and exclude it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for sure  I would also check whether the size of missing events is around
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the batch size of BucketingSink or not. You also wrote that the timestamps
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of lost event are 'probably' around the time of the savepoint, if it is not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> yet for sure I would also check it.Have you already checked the log files
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of job manager and task managers for the job running before and after the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restore from the check point? Is everything successful there, no errors,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> relevant warnings or exceptions?As the next step, I would suggest to log
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> all encountered events in DistinctFunction.reduce if possible for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> production data and check whether the missed events are eventually
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> processed before or after the savepoint. The following log message
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> indicates a border between the events that should be included into the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint (logged before) or not:“{} ({}, synchronous part) in thread {}
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> took {} ms” (template)Also check if the savepoint has been overall
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> completed:"{} ({}, asynchronous part) in thread {} took {}
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ms."Best,Andrey[1]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.htmlOn
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 24 Aug 2018, at 20:41, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Hi,Using StreamingFileSink is not a convenient option for production
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use for us as it doesn't support s3*. I could use StreamingFileSink just to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> verify, but I don't see much point in doing so. Please consider my previous
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> comment:> I realized that BucketingSink must not play any role in this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem. This is because only when the 24-hour window triggers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BucketingSink gets a burst of input. Around the state restoring point
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose anything
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> either (right?).I could also use a kafka sink instead, but I can't imagine
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how there could be any difference. It's very real that the sink doesn't get
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any input for a long time until the 24-hour window closes, and then it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> quickly writes out everything because it's not that much data eventually
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for the distinct values.Any ideas for debugging what's happening around the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint & restoration time?*) I actually implemented StreamingFileSink as
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> an alternative sink. This was before I came to realize that most likely the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink component has nothing to do with the data loss problem. I tried it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with s3n:// path just to see an exception being thrown. In the source code
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I indeed then found an explicit check for the target path scheme to be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "hdfs://". On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Ok, I think before further debugging the window reduced state, could
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of the previous 'BucketingSink’?Cheers,Andrey[1]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.htmlOn
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 24 Aug 2018, at 18:03, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Yes, sorry for my confusing comment. I just meant that it seems like
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> there's a bug somewhere now that the output is missing some data.> I would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wait and check the actual output in s3 because it is the main result of the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> jobYes, and that's what I have already done. There seems to be always some
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data loss with the production data volumes, if the job has been restarted
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on that day.Would you have any suggestions for how to debug this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> further?Many thanks for stepping in.On Fri, Aug 24, 2018 at 6:37 PM Andrey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Zagrebin <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Hi Juho,So it is a per key deduplication job.Yes, I would wait and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check the actual output in s3 because it is the main result of the job and>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The late data around the time of taking savepoint might be not included
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> into the savepoint but it should be behind the snapshotted offset in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Kafka.is not a bug,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it is a possible behaviour.The savepoint is a snapshot of the data in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transient which is already consumed from Kafka.Basically the full contents
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of the window result is split between the savepoint and what can come after
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the savepoint'ed offset in Kafka but before the window result is written
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> into s3. Allowed lateness should not affect it, I am just saying that the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> final result in s3 should include all records after it. This is what should
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be guaranteed but not the contents of the intermediate
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint.Cheers,AndreyOn 24 Aug 2018, at 16:52, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Thanks for your answer!I check for the missed data from the final
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output on s3. So I wait until the next day, then run the same thing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> re-implemented in batch, and compare the output.> The late data around the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> time of taking savepoint might be not included into the savepoint but it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should be behind the snapshotted offset in Kafka.Yes, I would definitely
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> expect that. It seems like there's a bug somewhere.> Then it should just
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> come later after the restore and should be reduced within the allowed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lateness into the final result which is saved into s3.Well, as far as I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know, allowed lateness doesn't play any role here, because I started
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> running the job with allowedLateness=0, and still get the data loss, while
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> my late data output doesn't receive anything.> Also, is this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `DistinctFunction.reduce` just an example or the actual implementation,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> basically saving just one of records inside the 24h window in s3? then what
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is missing there?Yes, it's the actual implementation. Note that there's a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> keyBy before the DistinctFunction. So there's one record for each key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (which is the combination of a couple of fields). In practice I've seen
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that we're missing ~2000-4000 elements on each restore, and the total
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output is obviously much more than that.Here's the full code for the key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> selector:public class MapKeySelector implements
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KeySelector<Map<String,String>, Object> {    private final String[]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fields;    public MapKeySelector(String... fields) {        this.fields =
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fields;    }    @Override    public Object getKey(Map<String, String>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> event) throws Exception {        Tuple key =
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance();        for (int i = 0; i
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < fields.length; i++) {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key.setField(event.getOrDefault(fields[i], ""), i);        }        return
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key;    }}And a more exact example on how it's used:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME",
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "KEY_VALUE"))                .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .reduce(new DistinctFunction())On Fri, Aug 24, 2018 at 5:26 PM Andrey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Zagrebin <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Hi Juho,Where exactly does the data miss? When do you notice that? Do
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you check it:- debugging `DistinctFunction.reduce` right after resume in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the middle of the day or - some distinct records miss in the final output
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually triggered and saved
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> into s3 at the end of the day? is this the main output?The late data around
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the time of taking savepoint might be not included into the savepoint but
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it should be behind the snapshotted offset in Kafka. Then it should just
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> come later after the restore and should be reduced within the allowed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lateness into the final result which is saved into s3.Also, is this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `DistinctFunction.reduce` just an example or the actual implementation,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> basically saving just one of records inside the 24h window in s3? then what
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is missing there?Cheers,AndreyOn 23 Aug 2018, at 15:42, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:I changed to allowedLateness=0, no change, still missing data when
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restoring from savepoint.On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:I realized that BucketingSink must not play any role in this problem.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is because only when the 24-hour window triggers, BucketinSink gets a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> burst of input. Around the state restoring point (middle of the day) it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> doesn't get any input, so it can't lose anything either (right?).I will
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> next try removing the allowedLateness entirely from the equation.In the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> meanwhile, please let me know if you have any suggestions for debugging the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lost data, for example what logs to enable.We use FlinkKafkaConsumer010
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> btw. Are there any known issues with that, that could contribute to lost
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data when restoring a savepoint?On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Some data is silently lost on my Flink stream job when state is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restored from a savepoint.Do you have any debugging hints to find out where
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> exactly the data gets dropped?My job gathers distinct values using a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 24-hour window. It doesn't have any custom state management.When I cancel
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the job with savepoint and restore from that savepoint, some data is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> missed. It seems to be losing just a small amount of data. The event time
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of lost data is probably around the time of savepoint. In other words the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rest of the time window is not entirely missed – collection works correctly
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also for (most of the) events that come in after restoring.When the job
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> processes a full 24-hour window without interruptions it doesn't miss
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> anything.Usually the problem doesn't happen in test environments that have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> smaller parallelism and smaller data volumes. But in production volumes the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> job seems to be consistently missing at least something on every
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restore.This issue has consistently happened since the job was initially
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> created. It was at first run on an older version of Flink 1.5-SNAPSHOT and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it still happens on both Flink 1.5.2 & 1.6.0.I'm wondering if this could be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for example some synchronization issue between the kafka consumer offsets
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> vs. what's been written by BucketingSink?1. Job content, simplified
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   kafkaStream                .flatMap(new ExtractFieldsFunction())
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>       .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .timeWindow(Time.days(1))                .allowedLateness(allowedLateness)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>               .sideOutputLateData(lateDataTag)                .reduce(new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunction())                .addSink(sink)                // use a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fixed number of output partitions                .setParallelism(8))/** *
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Usage: .keyBy("the", "distinct", "fields").reduce(new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunction()) */public class DistinctFunction implements
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> {    @Override    public
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Map<String, String> reduce(Map<String, String> value1, Map<String, String>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value2) {        return value1;    }}2. State configurationboolean
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> enableIncrementalCheckpointing = true;String statePath = "
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> s3n://bucket/savepoints";new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> RocksDBStateBackend(statePath,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> enableIncrementalCheckpointing);Checkpointing Mode Exactly OnceInterval 1m
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 0sTimeout 10m 0sMinimum Pause Between Checkpoints 1m 0sMaximum Concurrent
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Checkpoints 1Persist Checkpoints Externally Enabled (retain on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cancellation)3. BucketingSink configurationWe use BucketingSink, I don't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think there's anything special here, if not the fact that we're writing to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> S3.        String outputPath = "
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> s3://bucket/output";
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>       BucketingSink<Map<String, String>> sink = new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .setBucketer(new ProcessdateBucketer())
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .setBatchSize(batchSize)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink.setWriter(new IdJsonWriter());4. Kafka & event timeMy flink job reads
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> kafka consumer to synchronize watermarks accross all kafka partitions. We
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also write late data to side output, but nothing is written there – if it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would, it could explain missed data in the main output (I'm also sure that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> our late data writing works, because we previously had some actual late
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data which ended up there).5. allowedLatenessIt may be or may not be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> relevant that I have also enabled allowedLateness with 1 minute lateness on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the 24-hour window:If that makes sense, I could try removing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> allowedLateness entirely? That would be just to rule out that Flink doesn't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have a bug that's related to restoring state in combination with the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> allowedLateness feature. After all, all of our data should be in a good
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> enough order to not be late, given the max out of orderness used on kafka
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consumer timestamp extractor.Thank you in advance!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> *Juho Autio*
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Senior Data Engineer
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Engineering, Games
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Rovio Entertainment Corporation
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Mobile: + 358 (0)45 313 0122
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> www.rovio.com
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> *This message and its attachments may
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> contain confidential information and is intended solely for the attention
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and use of the named addressee(s). If you are not the intended recipient
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and / or you have received this message in error, please contact the sender
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> immediately and delete all material you have received in this message. You
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> are hereby notified that any use of the information, which you have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> received in error in whatsoever form, is strictly prohibited. Thank you for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your co-operation.*
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <https://flink-forward.org/> - The
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Apache Flink Conference
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Time
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 115, 10115 Berlin, Germany
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Charlottenburg: HRB 158244 B
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dr. Stephan Ewen
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <https://flink-forward.org/> - The Apache
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink Conference
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Time
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 10115 Berlin, Germany
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> HRB 158244 B
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Dr. Stephan Ewen
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <https://flink-forward.org/> - The Apache
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink Conference
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 10115 Berlin, Germany
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> HRB 158244 B
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stephan Ewen
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>>>>>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>>>>>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> - The Apache Flink Conference
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 10115 Berlin, Germany
>>>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB
>>>>>>>>>>>>>>>>>>>>>>>>>>>> 158244 B
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr.
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stephan Ewen
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>>>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>>>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/>
>>>>>>>>>>>>>>>>>>>>>>>>>> - The Apache Flink Conference
>>>>>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115
>>>>>>>>>>>>>>>>>>>>>>>>>> Berlin, Germany
>>>>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB
>>>>>>>>>>>>>>>>>>>>>>>>>> 158244 B
>>>>>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr.
>>>>>>>>>>>>>>>>>>>>>>>>>> Stephan Ewen
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> -
>>>>>>>>>>>>>>>>>>>>>>> The Apache Flink Conference
>>>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115
>>>>>>>>>>>>>>>>>>>>>>> Berlin, Germany
>>>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244
>>>>>>>>>>>>>>>>>>>>>>> B
>>>>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan
>>>>>>>>>>>>>>>>>>>>>>> Ewen
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The
>>>>>>>>>>>>>>>>>>>>> Apache Flink Conference
>>>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115
>>>>>>>>>>>>>>>>>>>>> Berlin, Germany
>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan
>>>>>>>>>>>>>>>>>>>>> Ewen
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The
>>>>>>>>>>>>>>>>>>> Apache Flink Conference
>>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin,
>>>>>>>>>>>>>>>>>>> Germany
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The
>>>>>>>>>>>>>>>>> Apache Flink Conference
>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin,
>>>>>>>>>>>>>>>>> Germany
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The
>>>>>>>>>>>>>>> Apache Flink Conference
>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin,
>>>>>>>>>>>>>>> Germany
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache
>>>>>>>>>>>>> Flink Conference
>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin,
>>>>>>>>>>>>> Germany
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>
>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>> --
>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache
>>>>>>>>>>>> Flink Conference
>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>> --
>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>>>>>>> --
>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>
>>>>>>>> +49 160 91394525
>>>>>>>>
>>>>>>>> <https://www.ververica.com/>
>>>>>>>>
>>>>>>>> Follow us @VervericaData
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>>>> Conference
>>>>>>>>
>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>
>>>>>>>> --
>>>>>>>>
>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>>>
>>>>>>>> --
>>>>>>>> Data Artisans GmbH
>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>>>>
>>>>>>>
>>>>>
>>>>> --
>>>>>
>>>>> Konstantin Knauf | Solutions Architect
>>>>>
>>>>> +49 160 91394525
>>>>>
>>>>> <https://www.ververica.com/>
>>>>>
>>>>> Follow us @VervericaData
>>>>>
>>>>> --
>>>>>
>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>> Conference
>>>>>
>>>>> Stream Processing | Event Driven | Real Time
>>>>>
>>>>> --
>>>>>
>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>
>>>>> --
>>>>> Data Artisans GmbH
>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>
>>>>
>
> --
>
> Konstantin Knauf | Solutions Architect
>
> +49 160 91394525
>
>
> Planned Absences: 17.04.2019 - 26.04.2019
>
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Data Artisans GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>

Mime
View raw message