flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Konstantin Knauf <konstan...@ververica.com>
Subject Re: Data loss when restoring from savepoint
Date Thu, 28 Mar 2019 17:08:44 GMT
Hi Juho,

Yes, the number is the last number in the line. Feel free to share all
lines.

Best,

Konstantin

On Thu, Mar 28, 2019 at 5:00 PM Juho Autio <juho.autio@rovio.com> wrote:

> Hi Konstantin!
>
> I would be interested in any changes in the number of timers, not only the
>> number of logged messages.
>
>
> Sorry for the delay. I see, the count is the number of timers that last
> number on log line. For example for this row it's 270409:
>
> March 26th 2019, 11:08:39.822 DEBUG
>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl Restored:
>> TimeWindow{start=1553558400000, end=1553644800000} -> 270409
>
>
> The log lines don't contain task id – how should they be compared across
> different snapshots? Or should I share all of these logs (at least couple
> of snapshots around the point of restore) and you'll compare them?
>
> Thanks.
>
> On Tue, Mar 26, 2019 at 9:55 PM Konstantin Knauf <konstantin@ververica.com>
> wrote:
>
>> Hi Juho,
>>
>> I based the branch on top of the current 1.6.4 branch. I can rebase on
>> 1.6.2 for any future iterations. I would be interested in any changes in
>> the number of timers, not only the number of logged messages. The sum of
>> all counts should be the same during snapshotting and restore. While a
>> window is open, this number should always increase (when comparing multiple
>> snapshots).
>>
>> Best,
>>
>> Konstantin
>>
>>
>>
>>
>>
>>
>> On Tue, Mar 26, 2019 at 11:01 AM Juho Autio <juho.autio@rovio.com> wrote:
>>
>>> Hi Konstantin,
>>>
>>> I got that debug logging working.
>>>
>>> You would now need to take a savepoint and restore sometime in the
>>>> middle of the day and should be able to check
>>>> a) if there are any timers for the very old windows, for which there is
>>>> still some content lingering around
>>>>
>>>
>>> No timers for old windows were logged.
>>>
>>> All timers are for the same time window, for example:
>>>
>>> March 26th 2019, 11:08:39.822 DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl Restored:
>>>> TimeWindow{start=1553558400000, end=1553644800000} -> 270409
>>>
>>>
>>> Those milliseconds correspond to:
>>> Tue Mar 26 00:00:00 UTC 2019 – Wed Mar 27 00:00:00 UTC 2019.
>>> - So this seems normal
>>>
>>>
>>>> b) if there less timers after restore for the current window. The
>>>> missing timers would be recreated, as soon as any additional records for
>>>> the same key arrive within the window. This means the number of missing
>>>> records might be less then the number of missing timers.
>>>
>>>
>>> Grepping for "Restored" gives 78 hits. That's suspicious because this
>>> job's parallelism is 80. The following group for grep "Snapshot" already
>>> gives 80 hits. Ok actually that would match with what you wrote: "missing
>>> timers would be recreated, as soon as any additional records for the same
>>> key arrive within the window".
>>>
>>> I tried killing & restoring once more. This time grepping for "Restored"
>>> gives 80 hits. Note that it's possible that some logs had been lost around
>>> the time of restoration because I'm browsing the logs through Kibana (ELK
>>> stack).
>>>
>>> I will try kill & restore again tomorrow around noon & collect the same
>>> info. Is there anything else that you'd like me to share?
>>>
>>> By the way, it seems that your branch* is not based on 1.6.2 release,
>>> why so? It probably doesn't matter, but in general would be good to
>>> minimize the scope of changes. But let's roll with this for now, I don't
>>> want to build another package because it seems like we're able to replicate
>>> the issue with this version :)
>>>
>>> Thanks,
>>> Juho
>>>
>>> *)
>>> https://github.com/apache/flink/compare/release-1.6.2...knaufk:logging-timers
>>>
>>> On Wed, Mar 20, 2019 at 2:20 PM Konstantin Knauf <
>>> konstantin@ververica.com> wrote:
>>>
>>>> Hi Juho,
>>>>
>>>> I created a branch [1] which logs the number of event time timers per
>>>> namespace during snapshot and restore.  Please refer to [2] to build Flink
>>>> from sources.
>>>>
>>>> You need to set the logging level to DEBUG for
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl. If you
>>>> use log4j this is a one-liner in your log4j.properties:
>>>>
>>>> log4j.logger.org.apache.flink.streaming.api.operators.InternalTimerServiceImpl=DEBUG
>>>>
>>>> The only additional logs will be the lines added in the branch. The
>>>> lines are of the following format (<Window> -> <Number of Timers>), e.g.
>>>>
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Snapshot: TimeWindow{start=1553083589256, end=1553083589258} -> 1
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Snapshot: TimeWindow{start=1553083589256, end=1553083589258} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Snapshot: TimeWindow{start=1553083589456, end=1553083589458} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Snapshot: TimeWindow{start=1553083589356, end=1553083589358} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Snapshot: TimeWindow{start=1553083589482, end=1553083589484} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Snapshot: TimeWindow{start=1553083589456, end=1553083589458} -> 1
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Snapshot: TimeWindow{start=1553083589256, end=1553083589258} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Snapshot: TimeWindow{start=1553083589356, end=1553083589358} -> 1
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Snapshot: TimeWindow{start=1553083589456, end=1553083589458} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Snapshot: TimeWindow{start=1553083589482, end=1553083589484} -> 1
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Snapshot: TimeWindow{start=1553083589356, end=1553083589358} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Snapshot: TimeWindow{start=1553083589482, end=1553083589484} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Restored: TimeWindow{start=1553083589256, end=1553083589258} -> 1
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Restored: TimeWindow{start=1553083589456, end=1553083589458} -> 1
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Restored: TimeWindow{start=1553083589356, end=1553083589358} -> 1
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Restored: TimeWindow{start=1553083589482, end=1553083589484} -> 1
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Restored: TimeWindow{start=1553083589256, end=1553083589258} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Restored: TimeWindow{start=1553083589456, end=1553083589458} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Restored: TimeWindow{start=1553083589356, end=1553083589358} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Restored: TimeWindow{start=1553083589482, end=1553083589484} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Restored: TimeWindow{start=1553083589256, end=1553083589258} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Restored: TimeWindow{start=1553083589456, end=1553083589458} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Restored: TimeWindow{start=1553083589356, end=1553083589358} -> 2
>>>> DEBUG
>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl  -
>>>> Restored: TimeWindow{start=1553083589482, end=1553083589484} -> 2
>>>>
>>>> You would now need to take a savepoint and restore sometime in the
>>>> middle of the day and should be able to check
>>>>
>>>> a) if there are any timers for the very old windows, for which there is
>>>> still some content lingering around
>>>> b) if there less timers after restore for the current window. The
>>>> missing timers would be recreated, as soon as any additional records for
>>>> the same key arrive within the window. This means the number of missing
>>>> records might be less then the number of missing timers.
>>>>
>>>> Looking forward to the results!
>>>>
>>>> Cheers,
>>>>
>>>> Konstantin
>>>>
>>>> [1] https://github.com/knaufk/flink/tree/logging-timers
>>>> [2]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/start/building.html#build-flink
>>>>
>>>> On Tue, Mar 19, 2019 at 2:06 PM Juho Autio <juho.autio@rovio.com>
>>>> wrote:
>>>>
>>>>> Thanks, answers below.
>>>>>
>>>>> * Which Flink version do you need this for?
>>>>>
>>>>> 1.6.2
>>>>>
>>>>> * You use RocksDBStatebackend, correct? If so, which value do your set
>>>>> for "state.backend.rocksdb.timer-service.factory" in the flink-conf.yaml.
>>>>>
>>>>> Yes, RocksDBStatebackend. We don't
>>>>> set state.backend.rocksdb.timer-service.factory at all, so whatever is the
>>>>> default in Flink 1.6.2? Based on the docs it seems that it would be "heap"
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/large_state_tuning.html
>>>>>
>>>>> On Mon, Mar 18, 2019 at 6:26 PM Konstantin Knauf <
>>>>> konstantin@ververica.com> wrote:
>>>>>
>>>>>> Hi Juho,
>>>>>>
>>>>>> I will prepare a Flink branch for you, which logs the number of event
>>>>>> time timers per window before snapshot and after restore. With this we
>>>>>> should be able to check, if timers are lost during savepoints.
>>>>>>
>>>>>> Two questions:
>>>>>>
>>>>>> * Which Flink version do you need this for? 1.6?
>>>>>> * You use RocksDBStatebackend, correct? If so, which value do your
>>>>>> set for "state.backend.rocksdb.timer-service.factory" in the
>>>>>> flink-conf.yaml.
>>>>>>
>>>>>> Cheers,
>>>>>>
>>>>>> Konstantin
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 14, 2019 at 12:20 PM Juho Autio <juho.autio@rovio.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Konstantin,
>>>>>>>
>>>>>>> Reading timers from snapshot doesn't seem straightforward. I wrote
>>>>>>> in private with Gyula, he gave more suggestions (thanks!) but still it
>>>>>>> seems that it may be a rather big effort for me to figure it out. Would you
>>>>>>> be able to help with that? If yes, there's this existing unit test that can
>>>>>>> be extended to test reading timers:
>>>>>>> https://github.com/king/bravo/blob/master/bravo/src/test/java/com/king/bravo/ReducerStateReadingTest.java#L37-L38
>>>>>>> . The test already has a state with some values in reducer window state, so
>>>>>>> I'm assuming that it must also contain some window timers.
>>>>>>>
>>>>>>> This is what Gyula wrote to me:
>>>>>>>
>>>>>>> Maybe I was wrong when I said the createOperatorStateBackendsFromSnapshot
>>>>>>> is the way to do it.
>>>>>>>
>>>>>>> On a second thought Timers are probably stored as raw keyed state in
>>>>>>> the operator. I don’t remember building any utility to read that.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> At the moment I am quite busy with other work so wont have time to
>>>>>>> build it for you, so you might have to figure it out yourself.
>>>>>>>
>>>>>>> I would try to look at how keyed states are read:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Look at the implementation of:
>>>>>>> createOperatorStateBackendsFromSnapshot()
>>>>>>>
>>>>>>> Instead of getManagedOperatorState you want to try getRawKeyedState
>>>>>>> and also look at how Flink restores it internally for Timers
>>>>>>>
>>>>>>> I would start looking around here I guess:
>>>>>>> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L238
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/apache/flink/blob/e8daa49a593edc401cd44761b25b1324b11be4a6/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java#L199
>>>>>>>
>>>>>>> On Tue, Mar 12, 2019 at 5:41 PM Gyula Fóra <gyula.fora@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Should be possible to read timer states by:
>>>>>>>> OperatorStateReader#createOperatorStateBackendFromSnapshot
>>>>>>>>
>>>>>>>> Then you have to get the timer state out of the
>>>>>>>> OperatorStateBackend, but keep in mind that this will restore the operator
>>>>>>>> states in memory.
>>>>>>>>
>>>>>>>> Gyula
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Mar 12, 2019 at 4:29 PM Konstantin Knauf <
>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Juho,
>>>>>>>>>
>>>>>>>>> okay, so it seems that although the watermark passed the endtime
>>>>>>>>> of the event time windows,  the window was not triggered for some of the
>>>>>>>>> keys.
>>>>>>>>>
>>>>>>>>> The timers, which would trigger the firing of the window, are also
>>>>>>>>> part of the keyed state and are snapshotted/restored. I would like to check
>>>>>>>>> if timers (as opposed to the window content itself) are maybe lost during
>>>>>>>>> the savepoint & restore procedure. Using Bravo, are you also able to
>>>>>>>>> inspect the timer state of the savepoints? In particular, I would be
>>>>>>>>> interested if for two subsequent savepoints all timers (i.e. one timer per
>>>>>>>>> window and key including the missing keys) are present in the savepoint.
>>>>>>>>>
>>>>>>>>> @Gyula Fóra <gyula.fora@gmail.com>: Does Bravo support reading
>>>>>>>>> timer state as well?
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>>
>>>>>>>>> Konstantin
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Mar 7, 2019 at 6:41 PM Juho Autio <juho.autio@rovio.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Right, the window operator is the one by name "DistinctFunction".
>>>>>>>>>>
>>>>>>>>>> http
>>>>>>>>>> http://10.1.59.75:20888/proxy/application_1551956351667_0001/jobs/3e4ffaadbd84af3488286863f00d4f23/vertices/19ede2f818524a7f310857e537fa6808/metrics\?get\=0.currentInputWatermark,1.currentInputWatermark,2.currentInputWatermark,3.currentInputWatermark,4.currentInputWatermark,5.currentInputWatermark,6.currentInputWatermark,7.currentInputWatermark,8.currentInputWatermark,9.currentInputWatermark,10.currentInputWatermark,11.currentInputWatermark,12.currentInputWatermark,13.currentInputWatermark,14.currentInputWatermark,15.currentInputWatermark,16.currentInputWatermark,17.currentInputWatermark,18.currentInputWatermark,19.currentInputWatermark,20.currentInputWatermark,21.currentInputWatermark,22.currentInputWatermark,23.currentInputWatermark,24.currentInputWatermark,25.currentInputWatermark,26.currentInputWatermark,27.currentInputWatermark,28.currentInputWatermark,29.currentInputWatermark,30.currentInputWatermark,31.currentInputWatermark,32.currentInputWatermark,33.currentInputWatermark,34.currentInputWatermark,35.currentInputWatermark,36.currentInputWatermark,37.currentInputWatermark,38.currentInputWatermark,39.currentInputWatermark,40.currentInputWatermark,41.currentInputWatermark,42.currentInputWatermark,43.currentInputWatermark,44.currentInputWatermark,45.currentInputWatermark,46.currentInputWatermark,47.currentInputWatermark,48.currentInputWatermark,49.currentInputWatermark,50.currentInputWatermark,51.currentInputWatermark,52.currentInputWatermark,53.currentInputWatermark,54.currentInputWatermark,55.currentInputWatermark,56.currentInputWatermark,57.currentInputWatermark,58.currentInputWatermark,59.currentInputWatermark,60.currentInputWatermark,61.currentInputWatermark,62.currentInputWatermark,63.currentInputWatermark,64.currentInputWatermark,65.currentInputWatermark,66.currentInputWatermark,67.currentInputWatermark,68.currentInputWatermark,69.currentInputWatermark,70.currentInputWatermark,71.currentInputWatermark,72.currentInputWatermark,73.currentInputWatermark,74.currentInputWatermark,75.currentInputWatermark,76.currentInputWatermark,77.currentInputWatermark,78.currentInputWatermark,79.currentInputWatermark
>>>>>>>>>> | jq '.[].value' --raw-output | uniq -c
>>>>>>>>>>   80 1551980102743
>>>>>>>>>>
>>>>>>>>>> date -r "$((1551980102743/1000))"
>>>>>>>>>> Thu Mar  7 19:35:02 EET 2019
>>>>>>>>>>
>>>>>>>>>> To me that makes sense – how would the window be triggered at
>>>>>>>>>> all, if not all sub-tasks have a high enough watermark, so that the
>>>>>>>>>> operator level watermark can be advanced.
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 7, 2019 at 5:33 PM Konstantin Knauf <
>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>
>>>>>>>>>>> great, we are getting closer :)  Could you please check the
>>>>>>>>>>> "Watermarks" tab the Flink UI of this job and check if the current
>>>>>>>>>>> watermark for all parallel subtasks of the WindowOperator is close to the
>>>>>>>>>>> current date/time?
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>> Konstantin
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Mar 7, 2019 at 3:01 PM Juho Autio <juho.autio@rovio.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Wow, indeed the missing data from previous date is still found
>>>>>>>>>>>> in the savepoint!
>>>>>>>>>>>>
>>>>>>>>>>>> Actually what I now found is that there is still data from even
>>>>>>>>>>>> older dates in the state:
>>>>>>>>>>>>
>>>>>>>>>>>> %%spark
>>>>>>>>>>>> state_json_next_day.groupBy(state_json_next_day.ts.substr(1,
>>>>>>>>>>>> 10).alias('day')).count().orderBy('day').show(n=1000)
>>>>>>>>>>>>
>>>>>>>>>>>> +----------+--------+
>>>>>>>>>>>> |       day|   count|
>>>>>>>>>>>> +----------+--------+
>>>>>>>>>>>> |2018-08-22|    4206|
>>>>>>>>>>>> ..
>>>>>>>>>>>> (manually truncated)
>>>>>>>>>>>> ..
>>>>>>>>>>>> |2019-02-03|       4|
>>>>>>>>>>>> |2019-02-14|   12881|
>>>>>>>>>>>> |2019-02-15|    1393|
>>>>>>>>>>>> |2019-02-25|    8774|
>>>>>>>>>>>> |2019-03-06|    9293|
>>>>>>>>>>>> |2019-03-07|28113105|
>>>>>>>>>>>> +----------+--------+
>>>>>>>>>>>>
>>>>>>>>>>>> Of course that's the expected situation after we have learned
>>>>>>>>>>>> that some window contents are left untriggered.
>>>>>>>>>>>>
>>>>>>>>>>>> I don't have the logs any more, but I think on 2018-08-22 I
>>>>>>>>>>>> have reset the state, and since then it's been always kept/restored from
>>>>>>>>>>>> savepoint. I can also see some dates there on which I didn't cancel the
>>>>>>>>>>>> stream. But I can't be sure if it has gone through some automatic restart
>>>>>>>>>>>> by flink. So we can't rule out that some window contents wouldn't sometimes
>>>>>>>>>>>> also be missed during normal operation. However, savepoint restoration at
>>>>>>>>>>>> least makes the problem more prominent. I have previously mentioned that I
>>>>>>>>>>>> would suspect this to be some kind of race condition that is affected by
>>>>>>>>>>>> load on the cluster. Reason for my suspicion is that during savepoint
>>>>>>>>>>>> restoration the cluster is also catching up kafka offsets on full speed, so
>>>>>>>>>>>> it is considerably more loaded than usually. Otherwise this problem might
>>>>>>>>>>>> not have much to do with savepoints of course.
>>>>>>>>>>>>
>>>>>>>>>>>> Are you able to investigate the problem in Flink code based on
>>>>>>>>>>>> this information?
>>>>>>>>>>>>
>>>>>>>>>>>> Many thanks,
>>>>>>>>>>>> Juho
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Mar 6, 2019 at 1:41 PM Juho Autio <juho.autio@rovio.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the investigation & summary.
>>>>>>>>>>>>>
>>>>>>>>>>>>> As you suggested, I will next take savepoints on two
>>>>>>>>>>>>> subsequent days & check the reducer state for both days.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 6, 2019 at 1:18 PM Konstantin Knauf <
>>>>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> (Moving the discussion back to the ML)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> after looking into your code, we are still pretty much in the
>>>>>>>>>>>>>> dark with respect what is going wrong.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me try to summarize, what we know given your experiments
>>>>>>>>>>>>>> so far:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1) the lost records were processed and put into state
>>>>>>>>>>>>>> *before* the restart of the job, not afterwards
>>>>>>>>>>>>>> 2) the lost records are part of the state after the restore
>>>>>>>>>>>>>> (because they are contained in subsequent savepoints)
>>>>>>>>>>>>>> 3) the sinks are not the problem (because the metrics of the
>>>>>>>>>>>>>> WindowOperator showed that the missing records have not been sent to the
>>>>>>>>>>>>>> sinks)
>>>>>>>>>>>>>> 4) it is not the batch job used for reference, which is
>>>>>>>>>>>>>> wrong, because of 1)
>>>>>>>>>>>>>> 5) records are only lost when restarting from a savepoint
>>>>>>>>>>>>>> (not during normal operations)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> One explanation would be, that one of the WindowOperators did
>>>>>>>>>>>>>> not fire (for whatever reason) and the missing records are still in the
>>>>>>>>>>>>>> window's state when you run your test. Could you please check, whether this
>>>>>>>>>>>>>> is the case by taking a savepoint on the next day and check if the missing
>>>>>>>>>>>>>> records are contained in it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Feb 18, 2019 at 8:32 PM Juho Autio <
>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Konstantin, thanks.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I gathered the additional info as discussed. No surprises
>>>>>>>>>>>>>>> there.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> * do you know if all lost records are contained in the last
>>>>>>>>>>>>>>>> savepoint you took before the window fired? This would mean that no records
>>>>>>>>>>>>>>>> are lost after the last restore.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Indeed this is the case. I saved the list of all missing
>>>>>>>>>>>>>>> IDs, analyzed the savepoint with Bravo, and the savepoint state (already)
>>>>>>>>>>>>>>> contained all IDs that were eventually missed in output.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> * could you please check the numRecordsOut metric for the
>>>>>>>>>>>>>>>> WindowOperator (FlinkUI -> TaskMetrics -> Select TaskChain containing
>>>>>>>>>>>>>>>> WindowOperator -> find metric)? Is the count reported there correct (no
>>>>>>>>>>>>>>>> missing data)?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The number matches with output rows. The sum
>>>>>>>>>>>>>>> of numRecordsOut metrics was 45755630, and count(*) of the output on s3
>>>>>>>>>>>>>>> resulted in the same number. Batch output has a bit more IDs of course
>>>>>>>>>>>>>>> (this time it was 1194). You wrote "Is the count reported there correct (no
>>>>>>>>>>>>>>> missing data)?" but I have slightly different viewpoint; I agree that the
>>>>>>>>>>>>>>> reported count is correct (in flink's scope, because the number is the same
>>>>>>>>>>>>>>> as what's in output file). But I think "no missing data" doesn't belong
>>>>>>>>>>>>>>> here. Data is missing, but it's consistently missing from both output files
>>>>>>>>>>>>>>> and numRecordsOut metrics.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Next thing I'll work on is preparing the code to be shared..
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Btw, I used this script to count the sum of numRecordsOut
>>>>>>>>>>>>>>> (I'm going to look into enabling Sl4jReporter eventually) :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> JOB_URL=
>>>>>>>>>>>>>>> http://10.1.56.245:20888/proxy/application_1550217512987_0001/jobs/068813ab8e6cbebaf7d306a0f41993c2
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> DistinctFunctionID=`http $JOB_URL \
>>>>>>>>>>>>>>> | jq '.vertices[] | select(.name == "DistinctFunction") |
>>>>>>>>>>>>>>> .id' --raw-output`
>>>>>>>>>>>>>>> echo "DistinctFunctionID=$DistinctFunctionID"
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> http
>>>>>>>>>>>>>>> $JOB_URL/vertices/19ede2f818524a7f310857e537fa6808/metrics | jq '.[] | .id'
>>>>>>>>>>>>>>> --raw-output | grep "[0-9][0-9]*\\.numRecordsOut$" \
>>>>>>>>>>>>>>> | xargs -I@ sh -c "http GET
>>>>>>>>>>>>>>> $JOB_URL/vertices/19ede2f818524a7f310857e537fa6808/metrics?get=@ | jq
>>>>>>>>>>>>>>> '.[0].value' --raw-output" > numRecordsOut.txt
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> # " eval_math( '+'.join( file.readlines ) ) "
>>>>>>>>>>>>>>> paste -sd+ numRecordsOut.txt | bc
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Thu, Feb 14, 2019 at 2:44 PM Konstantin Knauf <
>>>>>>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> * does the output of the streaming job contain any data,
>>>>>>>>>>>>>>>>>> which is not contained in the batch
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> No.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> * do you know if all lost records are contained in the
>>>>>>>>>>>>>>>>>> last savepoint you took before the window fired? This would mean that no
>>>>>>>>>>>>>>>>>> records are lost after the last restore.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I haven't built the tooling required to check all IDs like
>>>>>>>>>>>>>>>>> that, but yes, that's my understanding currently. To check that I would
>>>>>>>>>>>>>>>>> need to:
>>>>>>>>>>>>>>>>> - kill the stream only once on a given day (so that
>>>>>>>>>>>>>>>>> there's only one savepoint creation & restore)
>>>>>>>>>>>>>>>>> - next day or later: save all missing ids from batch
>>>>>>>>>>>>>>>>> output comparison
>>>>>>>>>>>>>>>>> - next day or later: read the savepoint with bravo & check
>>>>>>>>>>>>>>>>> that it contains all of those missing IDs
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> However I haven't built the tooling for that yet. Do you
>>>>>>>>>>>>>>>>> think it's necessary to verify that this assumption holds?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> It would be another data point and might help us to track
>>>>>>>>>>>>>>>> down the problem. Wether it is worth doing it, depends on the result, i.e.
>>>>>>>>>>>>>>>> wether the current assumption would be falsified or not, but we only know
>>>>>>>>>>>>>>>> that in retrospect ;)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> * could you please check the numRecordsOut metric for the
>>>>>>>>>>>>>>>>>> WindowOperator (FlinkUI -> TaskMetrics -> Select TaskChain containing
>>>>>>>>>>>>>>>>>> WindowOperator -> find metric)? Is the count reported there correct (no
>>>>>>>>>>>>>>>>>> missing data)?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Is that metric the result of window trigger? If yes, you
>>>>>>>>>>>>>>>>> must mean that I check the value of that metric on the next day after
>>>>>>>>>>>>>>>>> restore, so that it only contains the count for the output of previous
>>>>>>>>>>>>>>>>> day's window? The counter is reset to 0 when job starts (even when state is
>>>>>>>>>>>>>>>>> restored), right?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Yes, this metric would be incremented when the window is
>>>>>>>>>>>>>>>> triggered. Yes, please check this metric after the window, during which the
>>>>>>>>>>>>>>>> restore happened, is fired.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If you don't have a MetricsReporter configured so far, I
>>>>>>>>>>>>>>>> recommend to quickly register a Sl4jReporter to log out all metrics every X
>>>>>>>>>>>>>>>> seconds (maybe even minutes for your use case):
>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/monitoring/metrics.html#slf4j-orgapacheflinkmetricsslf4jslf4jreporter.
>>>>>>>>>>>>>>>> Then you don't need to go trough the WebUI and can keep a history of the
>>>>>>>>>>>>>>>> metrics.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Otherwise, do you have any suggestions for how to
>>>>>>>>>>>>>>>>> instrument the code to narrow down further where the data gets lost? To me
>>>>>>>>>>>>>>>>> it would make sense to proceed with this, because the problem seems hard to
>>>>>>>>>>>>>>>>> reproduce outside of our environment.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Let's focus on checking this metric above, to make sure
>>>>>>>>>>>>>>>> that the WindowOperator is actually emitting less records than the overall
>>>>>>>>>>>>>>>> number of keys in the state as your experiments suggest, and on sharing the
>>>>>>>>>>>>>>>> code.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Thu, Feb 14, 2019 at 10:57 AM Konstantin Knauf <
>>>>>>>>>>>>>>>>> konstantin@ververica.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> you are right the problem has actually been narrowed down
>>>>>>>>>>>>>>>>>> quite a bit over time. Nevertheless, sharing the code (incl.
>>>>>>>>>>>>>>>>>> flink-conf.yaml) might be a good idea. Maybe something strikes the eye,
>>>>>>>>>>>>>>>>>> that we have not thought about so far. If you don't feel comfortable
>>>>>>>>>>>>>>>>>> sharing the code on the ML, feel free to send me a PM.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Besides that, three more questions:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> * does the output of the streaming job contain any data,
>>>>>>>>>>>>>>>>>> which is not contained in the batch output?
>>>>>>>>>>>>>>>>>> * do you know if all lost records are contained in the
>>>>>>>>>>>>>>>>>> last savepoint you took before the window fired? This would mean that no
>>>>>>>>>>>>>>>>>> records are lost after the last restore.
>>>>>>>>>>>>>>>>>> * could you please check the numRecordsOut metric for the
>>>>>>>>>>>>>>>>>> WindowOperator (FlinkUI -> TaskMetrics -> Select TaskChain containing
>>>>>>>>>>>>>>>>>> WindowOperator -> find metric)? Is the count reported there correct (no
>>>>>>>>>>>>>>>>>> missing data)?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Konstantin
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Feb 13, 2019 at 3:19 PM Gyula Fóra <
>>>>>>>>>>>>>>>>>> gyula.fora@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Sorry not posting on the mail list was my mistake :/
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, 13 Feb 2019 at 15:01, Juho Autio <
>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks for stepping in, did you post outside of the
>>>>>>>>>>>>>>>>>>>> mailing list on purpose btw?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> This I did long time ago:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> To rule out for good any questions about sink
>>>>>>>>>>>>>>>>>>>>> behaviour, the job was killed and started with an additional Kafka sink.
>>>>>>>>>>>>>>>>>>>>> The same number of ids were missed in both outputs:
>>>>>>>>>>>>>>>>>>>>> KafkaSink & BucketingSink.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> (I wrote about that On Oct 1, 2018 in this email thread)
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> After that I did the savepoint analysis with Bravo.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Currently I'm indeed trying to get suggestions how to
>>>>>>>>>>>>>>>>>>>> debug further, for example, where to add additional kafka output, to catch
>>>>>>>>>>>>>>>>>>>> where the data gets lost. That would probably be somewhere in Flink's
>>>>>>>>>>>>>>>>>>>> internals.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I could try to share the full code also, but IMHO the
>>>>>>>>>>>>>>>>>>>> problem has been quite well narrowed down, considering that data can be
>>>>>>>>>>>>>>>>>>>> found in savepoint, savepoint is successfully restored, and after restoring
>>>>>>>>>>>>>>>>>>>> the data doesn't go to "user code" (like the reducer) any more.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Wed, Feb 13, 2019 at 3:47 PM Gyula Fóra <
>>>>>>>>>>>>>>>>>>>> gyula.fora@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hi Juho!
>>>>>>>>>>>>>>>>>>>>> I think the reason you are not getting much answers
>>>>>>>>>>>>>>>>>>>>> here is because it is very hard to debug this problem remotely.
>>>>>>>>>>>>>>>>>>>>> Seemingly you do very normal operations, the state
>>>>>>>>>>>>>>>>>>>>> contains all the required data and nobody else has hit a similar problem
>>>>>>>>>>>>>>>>>>>>> for ages.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> My best guess would be some bug with the deduplication
>>>>>>>>>>>>>>>>>>>>> or output writing logic but without a complete code example its very hard
>>>>>>>>>>>>>>>>>>>>> to say anything useful.
>>>>>>>>>>>>>>>>>>>>> Did you try writing it to Kafka to see if the output
>>>>>>>>>>>>>>>>>>>>> is there? (that way we could rule out the dedup probllem)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>> Gyula
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Wed, Feb 13, 2019 at 2:37 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Stefan (or anyone!), please, could I have some
>>>>>>>>>>>>>>>>>>>>>> feedback on the findings that I reported on Dec 21, 2018? This is still a
>>>>>>>>>>>>>>>>>>>>>> major blocker..
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Thu, Jan 31, 2019 at 11:46 AM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hello, is there anyone that could help with this?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Fri, Jan 11, 2019 at 8:14 AM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Stefan, would you have time to comment?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Wednesday, January 2, 2019, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Bump – does anyone know if Stefan will be
>>>>>>>>>>>>>>>>>>>>>>>>> available to comment the latest findings? Thanks.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Stefan, I managed to analyze savepoint with
>>>>>>>>>>>>>>>>>>>>>>>>>> bravo. It seems that the data that's missing from output
>>>>>>>>>>>>>>>>>>>>>>>>>> *is* found in savepoint.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> I simplified my test case to the following:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> - job 1 has bee running for ~10 days
>>>>>>>>>>>>>>>>>>>>>>>>>> - savepoint X created & job 1 cancelled
>>>>>>>>>>>>>>>>>>>>>>>>>> - job 2 started with restore from savepoint X
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Then I waited until the next day so that job 2
>>>>>>>>>>>>>>>>>>>>>>>>>> has triggered the 24 hour window.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Then I analyzed the output & savepoint:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> - compare job 2 output with the output of a batch
>>>>>>>>>>>>>>>>>>>>>>>>>> pyspark script => find 4223 missing rows
>>>>>>>>>>>>>>>>>>>>>>>>>> - pick one of the missing rows (say, id Z)
>>>>>>>>>>>>>>>>>>>>>>>>>> - read savepoint X with bravo, filter for id Z =>
>>>>>>>>>>>>>>>>>>>>>>>>>> Z was found in the savepoint!
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> How can it be possible that the value is in state
>>>>>>>>>>>>>>>>>>>>>>>>>> but doesn't end up in output after state has been restored & window is
>>>>>>>>>>>>>>>>>>>>>>>>>> eventually triggered?
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> I also did similar analysis on the previous case
>>>>>>>>>>>>>>>>>>>>>>>>>> where I savepointed & restored the job multiple times (5) within the same
>>>>>>>>>>>>>>>>>>>>>>>>>> 24-hour window. A missing id that I drilled down to, was found in all of
>>>>>>>>>>>>>>>>>>>>>>>>>> those savepoints, yet missing from the output that gets written at the end
>>>>>>>>>>>>>>>>>>>>>>>>>> of the day. This is even more surprising: that the missing ID was written
>>>>>>>>>>>>>>>>>>>>>>>>>> to the new savepoints also after restoring. Is the reducer state somehow
>>>>>>>>>>>>>>>>>>>>>>>>>> decoupled from the window contents?
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Big thanks to bravo-developer Gyula for guiding
>>>>>>>>>>>>>>>>>>>>>>>>>> me through to be able read the reducer state!
>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/king/bravo/pull/11
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Gyula also had an idea for how to troubleshoot
>>>>>>>>>>>>>>>>>>>>>>>>>> the missing data in a scalable way: I could add some "side effect kafka
>>>>>>>>>>>>>>>>>>>>>>>>>> output" on individual operators. This should allow tracking more closely at
>>>>>>>>>>>>>>>>>>>>>>>>>> which point the data gets lost. However, maybe this would have to be in
>>>>>>>>>>>>>>>>>>>>>>>>>> some Flink's internal components, and I'm not sure which those would be.
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>> Juho
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Stefan,
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Bravo doesn't currently support reading a
>>>>>>>>>>>>>>>>>>>>>>>>>>> reducer state. I gave it a try but couldn't get to a working implementation
>>>>>>>>>>>>>>>>>>>>>>>>>>> yet. If anyone can provide some insight on how to make this work, please
>>>>>>>>>>>>>>>>>>>>>>>>>>> share at github:
>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/king/bravo/pull/11
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I was glad to find that bravo had now been
>>>>>>>>>>>>>>>>>>>>>>>>>>>> updated to support installing bravo to a local maven repo.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I was able to load a checkpoint created by my
>>>>>>>>>>>>>>>>>>>>>>>>>>>> job, thanks to the example provided in bravo README, but I'm still missing
>>>>>>>>>>>>>>>>>>>>>>>>>>>> the essential piece.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> My code was:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>         OperatorStateReader reader = new
>>>>>>>>>>>>>>>>>>>>>>>>>>>> OperatorStateReader(env2, savepoint, "DistinctFunction");
>>>>>>>>>>>>>>>>>>>>>>>>>>>>         DontKnowWhatTypeThisIs reducingState =
>>>>>>>>>>>>>>>>>>>>>>>>>>>> reader.readKeyedStates(what should I put here?);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> I don't know how to read the values collected
>>>>>>>>>>>>>>>>>>>>>>>>>>>> from reduce() calls in the state. Is there a way to access the reducing
>>>>>>>>>>>>>>>>>>>>>>>>>>>> state of the window with bravo? I'm a bit confused how this works, because
>>>>>>>>>>>>>>>>>>>>>>>>>>>> when I check with debugger, flink internally uses a ReducingStateDescriptor
>>>>>>>>>>>>>>>>>>>>>>>>>>>> with name=window-contents, but still reading operator state for
>>>>>>>>>>>>>>>>>>>>>>>>>>>> "DistinctFunction" didn't at least throw an exception ("window-contents"
>>>>>>>>>>>>>>>>>>>>>>>>>>>> threw – obviously there's no operator by that name).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Stefan,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry but it doesn't seem immediately clear to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> me what's a good way to use
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://github.com/king/bravo.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> How are people using it? Would you for example
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> modify build.gradle somehow to publish the bravo as a library
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> locally/internally? Or add code directly in the bravo project (locally) and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> run it from there (using an IDE, for example)? Also it doesn't seem like
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the bravo gradle project supports building a flink job jar, but if it does,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> how do I do it?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Good then, I'll try to analyze the savepoints
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with Bravo. Thanks!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > How would you assume that backpressure
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would influence your updates? Updates to each local state still happen
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> event-by-event, in a single reader/writing thread.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sure, just an ignorant guess by me. I'm not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> familiar with most of Flink's internals. Any way high backpressure is not a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> seen on this job after it has caught up the lag, so at I thought it would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be worth mentioning.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <s.richter@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Am 04.10.2018 um 16:08 schrieb Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > you could take a look at Bravo [1] to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> query your savepoints and to check if the state in the savepoint complete
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> w.r.t your expectations
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks. I'm not 100% if this is the case,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> but to me it seemed like the missed ids were being logged by the reducer
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> soon after the job had started (after restoring a savepoint). But on the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> other hand, after that I also made another savepoint & restored that, so
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> what I could check is: does that next savepoint have the missed ids that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> were logged (a couple of minutes before the savepoint was created, so there
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> should've been more than enough time to add them to the state before the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint was triggered) or not. Any way, if I would be able to verify with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Bravo that the ids are missing from the savepoint (even though reduced
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> logged that it saw them), would that help in figuring out where they are
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lost? Is there some major difference compared to just looking at the final
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output after window has been triggered?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think that makes a difference. For
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> example, you can investigate if there is a state loss or a problem with the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windowing. In the savepoint you could see which keys exists and to which
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windows they are assigned. Also just to make sure there is no
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> misunderstanding: only elements that are in the state at the start of a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint are expected to be part of the savepoint; all elements between
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> start and completion of the savepoint are not expected to be part of the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > I also doubt that the problem is about
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> backpressure after restore, because the job will only continue running
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> after the state restore is already completed.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Yes, I'm not suspecting that the state
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restoring would be the problem either. My concern was about backpressure
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> possibly messing with the updates of reducing state? I would tend to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suspect that updating the state consistently is what fails, where heavy
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> load / backpressure might be a factor.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> How would you assume that backpressure would
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> influence your updates? Updates to each local state still happen
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> event-by-event, in a single reader/writing thread.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 4:18 PM Stefan
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Richter <s.richter@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you could take a look at Bravo [1] to query
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your savepoints and to check if the state in the savepoint complete w.r.t
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> your expectations. I somewhat doubt that there is a general problem with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the state/savepoints because many users are successfully running it on a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> large state and I am not aware of any data loss problems, but nothing is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> impossible. What the savepoint does is also straight forward: iterate a db
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> snapshot and write all key/value pairs to disk, so all data that was in the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> db at the time of the savepoint, should show up. I also doubt that the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem is about backpressure after restore, because the job will only
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> continue running after the state restore is already completed. Did you
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check if you are using exactly-one-semantics or at-least-once semantics?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also did you check that the kafka consumer start position is configured
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> properly [2]? Are watermarks generated as expected after restore?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> One more unrelated high-level comment that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I have: for a granularity of 24h windows, I wonder if it would not make
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sense to use a batch job instead?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Stefan
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1] https://github.com/king/bravo
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Am 04.10.2018 um 14:53 schrieb Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com>:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks for the suggestions!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > In general, it would be tremendously
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> helpful to have a minimal working example which allows to reproduce the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Definitely. The problem with reproducing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> has been that this only seems to happen in the bigger production data
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> volumes.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> That's why I'm hoping to find a way to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> debug this with the production data. With that it seems to consistently
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cause some misses every time the job is killed/restored.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > check if it happens for shorter windows,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> like 1h etc
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What would be the benefit of that compared
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to 24h window?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > simplify the job to not use a reduce
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window but simply a time window which outputs the window events. Then
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> counting the input and output events should allow you to verify the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> results. If you are not seeing missing events, then it could have something
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to do with the reducing state used in the reduce function.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hm, maybe, but not sure how useful that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would be, because it wouldn't yet prove that it's related to reducing,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because not having a reduce function could also mean smaller load on the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> job, which might alone be enough to make the problem not manifest.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Is there a way to debug what goes into the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reducing state (including what gets removed or overwritten and what
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restored), if that makes sense..? Maybe some suitable logging could be used
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to prove that the lost data is written to the reducing state (or at least
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> asked to be written), but not found any more when the window closes and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state is flushed?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On configuration once more, we're using
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> RocksDB state backend with asynchronous incremental checkpointing. The
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> state is restored from savepoints though, we haven't been using those
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> checkpoints in these tests (although they could be used in case of crashes
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> – but we haven't had those now).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 3:25 PM Till
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Rohrmann <trohrmann@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> another idea to further narrow down the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem could be to simplify the job to not use a reduce window but simply
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> a time window which outputs the window events. Then counting the input and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output events should allow you to verify the results. If you are not seeing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> missing events, then it could have something to do with the reducing state
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> used in the reduce function.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In general, it would be tremendously
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> helpful to have a minimal working example which allows to reproduce the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Till
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Zagrebin <andrey@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> can you try to reduce the job to minimal
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reproducible example and share the job and input?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> For example:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - some simple records as input, e.g.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> tuples of primitive types saved as cvs
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - minimal deduplication job which
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> processes them and misses records
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - check if it happens for shorter
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> windows, like 1h etc
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - setup which you use for the job,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ideally locally reproducible or cloud
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 4 Oct 2018, at 11:13, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Sorry to insist, but we seem to be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> blocked for any serious usage of state in Flink if we can't rely on it to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not miss data in case of restore.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Would anyone have suggestions for how to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> troubleshoot this? So far I have verified with DEBUG logs that our reduce
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> function gets to process also the data that is missing from window output.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Autio <juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Andrey,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To rule out for good any questions about
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink behaviour, the job was killed and started with an additional Kafka
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The same number of ids were missed in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> both outputs: KafkaSink & BucketingSink.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I wonder what would be the next steps in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> debugging?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Autio <juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Thanks, Andrey.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > so it means that the savepoint does
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not loose at least some dropped records.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I'm not sure what you mean by that? I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mean, it was known from the beginning, that not everything is lost
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> before/after restoring a savepoint, just some records around the time of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restoration. It's not 100% clear whether records are lost before making a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint or after restoring it. Although, based on the new DEBUG logs it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> seems more like losing some records that are seen ~soon after restoring. It
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> seems like Flink would be somehow confused either about the restored state
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> vs. new inserts to state. This could also be somehow linked to the high
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> back pressure on the kafka source while the stream is catching up.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > If it is feasible for your setup, I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggest to insert one more map function after reduce and before sink.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > etc.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Isn't that the same thing that we
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> discussed before? Nothing is sent to BucketingSink before the window
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> closes, so I don't see how it would make any difference if we replace the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BucketingSink with a map function or another sink type. We don't create or
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restore savepoints during the time when BucketingSink gets input or has
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> open buckets – that happens at a much later time of day. I would focus on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> figuring out why the records are lost while the window is open. But I don't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> know how to do that. Would you have any additional suggestions?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Zagrebin <andrey@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> so it means that the savepoint does
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not loose at least some dropped records.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> If it is feasible for your setup, I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suggest to insert one more map function after reduce and before sink.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The map function should be called
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> right after window is triggered but before flushing to s3.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The result of reduce (deduped record)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> could be logged there.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This should allow to check whether the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> processed distinct records were buffered in the state after the restoration
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from the savepoint or not. If they were buffered we should see that there
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was an attempt to write them to the sink from the state.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Another suggestion is to try to write
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> records to some other sink or to both.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> E.g. if you can access file system of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> workers, maybe just into local files and check whether the records are also
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> dropped there.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Andrey!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I was finally able to gather the DEBUG
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> logs that you suggested. In short, the reducer logged that it processed at
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> least some of the ids that were missing from the output.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "At least some", because I didn't have
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the job running with DEBUG logs for the full 24-hour window period. So I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> was only able to look up if I can find
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> *some* of the missing ids in the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DEBUG logs. Which I did indeed.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I changed the DistinctFunction.java to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> do this:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     public Map<String, String>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> reduce(Map<String, String> value1, Map<String, String> value2) {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> LOG.debug("DistinctFunction.reduce returns: {}={}", value1.get("field"),
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value1.get("id"));
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> vi flink-1.6.0/conf/log4j.properties
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then I ran the following kind of test:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Cancelled the on-going job with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint created at ~Sep 18 08:35 UTC 2018
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Started a new cluster & job with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DEBUG enabled at ~09:13, restored from that previous cluster's savepoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Ran until caught up offsets
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Cancelled the job with a new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - Started a new job _without_ DEBUG,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> which restored the new savepoint, let it keep running so that it will
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> eventually write the output
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Then on the next day, after results
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> had been flushed when the 24-hour window closed, I compared the results
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> again with a batch version's output. And found some missing ids as usual.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I drilled down to one specific missing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> id (I'm replacing the actual value with AN12345 below), which was not found
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> in the stream output, but was found in batch output & flink DEBUG logs.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Related to that id, I gathered the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> following information:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18~09:13:21,000 job started &
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint is restored
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18 09:14:29,085 missing id is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> processed for the first time, proved by this log line:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18 09:14:29,085 DEBUG
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction                  -
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18 09:15:14,264 first
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> synchronous part of checkpoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18 09:15:16,544 first
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> asynchronous part of checkpoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more occurrences of checkpoints (~1
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> min checkpointing time + ~1 min delay before next)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> /
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more occurrences of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunction.reduce
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18 09:23:45,053 missing id is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> processed for the last time
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 2018-09-18~10:20:00,000 savepoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> created & job cancelled
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> To be noted, there was high
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> backpressure after restoring from savepoint until the stream caught up with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the kafka offsets. Although, our job uses assign timestamps & watermarks on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the flink kafka consumer itself, so event time of all partitions is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> synchronized. As expected, we don't get any late data in the late data side
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> output.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> From this we can see that the missing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ids are processed by the reducer, but they must get lost somewhere before
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the 24-hour window is triggered.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I think it's worth mentioning once
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more that the stream doesn't miss any ids if we let it's running without
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> interruptions / state restoring.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> What's next?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Zagrebin <andrey@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > only when the 24-hour window
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> triggers, BucketingSink gets a burst of input
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> This is of course totally true, my
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> understanding is the same. We cannot exclude problem there for sure, just
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoints are used a lot w/o problem reports and BucketingSink is known to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be problematic with s3. That is why, I asked you:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > You also wrote that the timestamps
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of lost event are 'probably' around the time of the savepoint, if it is not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> yet for sure I would also check it.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Although, bucketing sink might loose
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any data at the end of the day (also from the middle). The fact, that it is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> always around the time of taking a savepoint and not random, is surely
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> suspicious and possible savepoint failures need to be investigated.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Regarding the s3 problem, s3 doc says:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > The caveat is that if you make a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> HEAD or GET request to the key name (to find if the object exists) before
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> creating the object, Amazon S3 provides 'eventual consistency' for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> read-after-write.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The algorithm you suggest is how it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is roughly implemented now (BucketingSink.openNewPartFile). My
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> understanding is that 'eventual consistency’ means that even if you just
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> created file (its name is key) it can be that you do not get it in the list
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> or exists (HEAD) returns false and you risk to rewrite the previous part.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The BucketingSink was designed for a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> standard file system. s3 is used over a file system wrapper atm but does
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not always provide normal file system guarantees. See also last example in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1].
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey, thank you very much for the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> debugging suggestions, I'll try them.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In the meanwhile two more questions,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> please:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > Just to keep in mind this problem
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with s3 and exclude it for sure. I would also check whether the size of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> missing events is around the batch size of BucketingSink or not.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Fair enough, but I also want to focus
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> on debugging the most probable subject first. So what do you think about
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> this – true or false: only when the 24-hour window triggers, BucketinSink
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> gets a burst of input. Around the state restoring point (middle of the day)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it doesn't get any input, so it can't lose anything either. Isn't this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> true, or have I totally missed how Flink works in triggering window
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> results? I would not expect there to be any optimization that speculatively
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> triggers early results of a regular time window to the downstream operators.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > The old BucketingSink has in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> general problem with s3. Internally BucketingSink queries s3 as a file
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> system to list already written file parts (batches) and determine index of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the next part to start. Due to eventual consistency of checking file
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> existence in s3 [1], the BucketingSink can rewrite the previously written
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> part and basically loose it.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I was wondering, what does S3's
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "read-after-write consistency" (mentioned on the page you linked) actually
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> mean. It seems that this might be possible:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - LIST keys, find current max index
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - choose next index = max + 1
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> - HEAD next index: if it exists, keep
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> adding + 1 until key doesn't exist on S3
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> But definitely sounds easier if a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink keeps track of files in a way that's guaranteed to be consistent.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey Zagrebin <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> true, StreamingFileSink does not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> support s3 in 1.6.0, it is planned for the next 1.7 release, sorry for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> confusion.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> The old BucketingSink has in general
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem with s3. Internally BucketingSink queries s3 as a file system
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> to list already written file parts
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> (batches) and determine index of the next part to start. Due to eventual
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consistency of checking file existence in s3 [1], the BucketingSink can
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> rewrite the previously written part and basically loose it. It should be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fixed for StreamingFileSink in 1.7 where Flink keeps its own track of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> written parts and does not rely on s3 as a file system.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I also include Kostas, he might add
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> more details.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Just to keep in mind this problem
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> with s3 and exclude it for sure  I would also check whether the size of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> missing events is around the batch size of BucketingSink or not. You also
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote that the timestamps of lost event are 'probably' around the time of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the savepoint, if it is not yet for sure I would also check it.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Have you already checked the log
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> files of job manager and task managers for the job running before and after
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the restore from the check point? Is everything successful there, no
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> errors, relevant warnings or exceptions?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> As the next step, I would suggest to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> log all encountered events in DistinctFunction.reduce if possible for
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> production data and check whether the missed events are eventually
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> processed before or after the savepoint. The following log message
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> indicates a border between the events that should be included into the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint (logged before) or not:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> “{} ({}, synchronous part) in thread
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> {} took {} ms” (template)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Also check if the savepoint has been
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> overall completed:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "{} ({}, asynchronous part) in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> thread {} took {} ms."
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> <juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Using StreamingFileSink is not a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> convenient option for production use for us as it doesn't support s3*. I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> could use StreamingFileSink just to verify, but I don't see much point in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> doing so. Please consider my previous comment:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> > I realized that BucketingSink must
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> not play any role in this problem. This is because only when the 24-hour
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window triggers, BucketingSink gets a burst of input. Around the state
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restoring point (middle of the day) it doesn't get any input, so it can't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lose anything either (right?).
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I could also use a kafka sink
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> instead, but I can't imagine how there could be any difference. It's very
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> real that the sink doesn't get any input for a long time until the 24-hour
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window closes, and then it quickly writes out everything because it's not
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that much data eventually for the distinct values.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Any ideas for debugging what's
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> happening around the savepoint & restoration time?
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> *) I actually implemented
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> StreamingFileSink as an alternative sink. This was before I came to realize
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that most likely the sink component has nothing to do with the data loss
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem. I tried it with s3n:// path just to see an exception being thrown.
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> In the source code I indeed then found an explicit check for the target
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> path scheme to be "hdfs://".
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Andrey Zagrebin <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Ok, I think before further
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> debugging the window reduced state, could you try the new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead of the previous
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 'BucketingSink’?Cheers,Andrey[1]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.htmlOn
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 24 Aug 2018, at 18:03, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:Yes,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sorry for my confusing comment. I just meant that it seems like there's a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> bug somewhere now that the output is missing some data.> I would wait and
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> check the actual output in s3 because it is the main result of the jobYes,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and that's what I have already done. There seems to be always some data
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> loss with the production data volumes, if the job has been restarted on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> that day.Would you have any suggestions for how to debug this further?Many
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> thanks for stepping in.On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> andrey@data-artisans.com> wrote:Hi
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Juho,So it is a per key deduplication job.Yes, I would wait and check the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> actual output in s3 because it is the main result of the job and> The late
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data around the time of taking savepoint might be not included into the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint but it should be behind the snapshotted offset in Kafka.is not a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> bug, it is a possible behaviour.The savepoint is a snapshot of the data in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> transient which is already consumed from Kafka.Basically the full contents
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of the window result is split between the savepoint and what can come after
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the savepoint'ed offset in Kafka but before the window result is written
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> into s3. Allowed lateness should not affect it, I am just saying that the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> final result in s3 should include all records after it. This is what should
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be guaranteed but not the contents of the intermediate
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint.Cheers,AndreyOn 24 Aug 2018, at 16:52, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:Thanks
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for your answer!I check for the missed data from the final output on s3. So
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> I wait until the next day, then run the same thing re-implemented in batch,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and compare the output.> The late data around the time of taking savepoint
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> might be not included into the savepoint but it should be behind the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> snapshotted offset in Kafka.Yes, I would definitely expect that. It seems
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> like there's a bug somewhere.> Then it should just come later after the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restore and should be reduced within the allowed lateness into the final
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> result which is saved into s3.Well, as far as I know, allowed lateness
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> doesn't play any role here, because I started running the job with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> allowedLateness=0, and still get the data loss, while my late data output
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> doesn't receive anything.> Also, is this `DistinctFunction.reduce` just an
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> example or the actual implementation, basically saving just one of records
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> inside the 24h window in s3? then what is missing there?Yes, it's the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> actual implementation. Note that there's a keyBy before
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the DistinctFunction. So there's one record for each key (which is the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> combination of a couple of fields). In practice I've seen that we're
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> missing ~2000-4000 elements on each restore, and the total output is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> obviously much more than that.Here's the full code for the key
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> selector:public class MapKeySelector implements
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> KeySelector<Map<String,String>, Object> {    private final String[]
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fields;    public MapKeySelector(String... fields) {        this.fields =
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fields;    }    @Override    public Object getKey(Map<String, String>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> event) throws Exception {        Tuple key =
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance();        for (int i = 0; i
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> < fields.length; i++) {
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key.setField(event.getOrDefault(fields[i], ""), i);        }        return
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> key;    }}And a more exact example on how it's used:
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME",
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "KEY_VALUE"))                .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .reduce(new DistinctFunction())On Fri, Aug 24, 2018 at 5:26 PM Andrey
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Zagrebin <andrey@data-artisans.com>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> wrote:Hi Juho,Where exactly does the data miss? When do you notice that? Do
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> you check it:- debugging `DistinctFunction.reduce` right after resume in
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the middle of the day or - some distinct records miss in the final output
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually triggered and saved
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> into s3 at the end of the day? is this the main output?The late data around
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the time of taking savepoint might be not included into the savepoint but
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> it should be behind the snapshotted offset in Kafka. Then it should just
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> come later after the restore and should be reduced within the allowed
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> lateness into the final result which is saved into s3.Also, is this
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> `DistinctFunction.reduce` just an example or the actual implementation,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> basically saving just one of records inside the 24h window in s3? then what
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> is missing there?Cheers,AndreyOn 23 Aug 2018, at 15:42, Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> changed to allowedLateness=0, no change, still missing data when restoring
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> from savepoint.On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:I
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> realized that BucketingSink must not play any role in this problem. This is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> because only when the 24-hour window triggers, BucketinSink gets a burst of
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> input. Around the state restoring point (middle of the day) it doesn't get
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any input, so it can't lose anything either (right?).I will next try
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> removing the allowedLateness entirely from the equation.In the meanwhile,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> please let me know if you have any suggestions for debugging the lost data,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> for example what logs to enable.We use FlinkKafkaConsumer010 btw. Are there
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> any known issues with that, that could contribute to lost data when
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> restoring a savepoint?On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:Some
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data is silently lost on my Flink stream job when state is restored from a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint.Do you have any debugging hints to find out where exactly the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data gets dropped?My job gathers distinct values using a 24-hour window. It
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> doesn't have any custom state management.When I cancel the job with
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> savepoint and restore from that savepoint, some data is missed. It seems to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> be losing just a small amount of data. The event time of lost data is
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> probably around the time of savepoint. In other words the rest of the time
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> window is not entirely missed – collection works correctly also for (most
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> of the) events that come in after restoring.When the job processes a full
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 24-hour window without interruptions it doesn't miss anything.Usually the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> problem doesn't happen in test environments that have smaller parallelism
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> and smaller data volumes. But in production volumes the job seems to be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consistently missing at least something on every restore.This issue has
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consistently happened since the job was initially created. It was at first
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> run on an older version of Flink 1.5-SNAPSHOT and it still happens on both
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Flink 1.5.2 & 1.6.0.I'm wondering if this could be for example some
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets vs. what's been
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> written by BucketingSink?1. Job content, simplified        kafkaStream
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>           .flatMap(new ExtractFieldsFunction())                .keyBy(new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> MapKeySelector(1, 2, 3, 4))                .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>         .allowedLateness(allowedLateness)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .sideOutputLateData(lateDataTag)                .reduce(new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunction())                .addSink(sink)                // use a
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> fixed number of output partitions                .setParallelism(8))/** *
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Usage: .keyBy("the", "distinct", "fields").reduce(new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DistinctFunction()) */public class DistinctFunction implements
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> {    @Override    public
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Map<String, String> reduce(Map<String, String> value1, Map<String, String>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> value2) {        return value1;    }}2. State configurationboolean
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> enableIncrementalCheckpointing = true;String statePath =
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> "s3n://bucket/savepoints";new RocksDBStateBackend(statePath,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> enableIncrementalCheckpointing);Checkpointing Mode Exactly OnceInterval 1m
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> 0sTimeout 10m 0sMinimum Pause Between Checkpoints 1m 0sMaximum Concurrent
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> Checkpoints 1Persist Checkpoints Externally Enabled (retain on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> cancellation)3. BucketingSink configurationWe use BucketingSink, I don't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> think there's anything special here, if not the fact that we're writing to
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> S3.        String outputPath = "s3://bucket/output";
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String,
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> String>>(outputPath)                .setBucketer(new
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> ProcessdateBucketer())                .setBatchSize(batchSize)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>   .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> sink.setWriter(new IdJsonWriter());4. Kafka & event timeMy flink job reads
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor on the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> kafka consumer to synchronize watermarks accross all kafka partitions. We
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> also write late data to side output, but nothing is written there – if it
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> would, it could explain missed data in the main output (I'm also sure that
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> our late data writing works, because we previously had some actual late
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> data which ended up there).5. allowedLatenessIt may be or may not be
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> relevant that I have also enabled allowedLateness with 1 minute lateness on
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> the 24-hour window:If that makes sense, I could try removing
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> allowedLateness entirely? That would be just to rule out that Flink doesn't
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> have a bug that's related to restoring state in combination with the
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> allowedLateness feature. After all, all of our data should be in a good
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> enough order to not be late, given the max out of orderness used on kafka
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> consumer timestamp extractor.Thank you in advance!
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>> *Juho Autio*
>>>>>>>>>>>>>>>>>>>> Senior Data Engineer
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Data Engineering, Games
>>>>>>>>>>>>>>>>>>>> Rovio Entertainment Corporation
>>>>>>>>>>>>>>>>>>>> Mobile: + 358 (0)45 313 0122
>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com
>>>>>>>>>>>>>>>>>>>> www.rovio.com
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> *This message and its attachments may contain
>>>>>>>>>>>>>>>>>>>> confidential information and is intended solely for the attention and use
>>>>>>>>>>>>>>>>>>>> of the named addressee(s). If you are not the intended recipient and / or
>>>>>>>>>>>>>>>>>>>> you have received this message in error, please contact the sender
>>>>>>>>>>>>>>>>>>>> immediately and delete all material you have received in this message. You
>>>>>>>>>>>>>>>>>>>> are hereby notified that any use of the information, which you have
>>>>>>>>>>>>>>>>>>>> received in error in whatsoever form, is strictly prohibited. Thank you for
>>>>>>>>>>>>>>>>>>>> your co-operation.*
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The
>>>>>>>>>>>>>>>>>> Apache Flink Conference
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin,
>>>>>>>>>>>>>>>>>> Germany
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The
>>>>>>>>>>>>>>>> Apache Flink Conference
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin,
>>>>>>>>>>>>>>>> Germany
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache
>>>>>>>>>>>>>> Flink Conference
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin,
>>>>>>>>>>>>>> Germany
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>>>
>>>>>>>>>>> +49 160 91394525
>>>>>>>>>>>
>>>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>>>
>>>>>>>>>>> Follow us @VervericaData
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache
>>>>>>>>>>> Flink Conference
>>>>>>>>>>>
>>>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>>
>>>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Data Artisans GmbH
>>>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>>>>
>>>>>>>>> +49 160 91394525
>>>>>>>>>
>>>>>>>>> <https://www.ververica.com/>
>>>>>>>>>
>>>>>>>>> Follow us @VervericaData
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>>>>> Conference
>>>>>>>>>
>>>>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>>
>>>>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Data Artisans GmbH
>>>>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Konstantin Knauf | Solutions Architect
>>>>>>
>>>>>> +49 160 91394525
>>>>>>
>>>>>> <https://www.ververica.com/>
>>>>>>
>>>>>> Follow us @VervericaData
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>>>> Conference
>>>>>>
>>>>>> Stream Processing | Event Driven | Real Time
>>>>>>
>>>>>> --
>>>>>>
>>>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>>>
>>>>>> --
>>>>>> Data Artisans GmbH
>>>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>>>
>>>>>
>>>>
>>>> --
>>>>
>>>> Konstantin Knauf | Solutions Architect
>>>>
>>>> +49 160 91394525
>>>>
>>>> <https://www.ververica.com/>
>>>>
>>>> Follow us @VervericaData
>>>>
>>>> --
>>>>
>>>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>>>> Conference
>>>>
>>>> Stream Processing | Event Driven | Real Time
>>>>
>>>> --
>>>>
>>>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>>>
>>>> --
>>>> Data Artisans GmbH
>>>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>>>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>>>
>>>
>>
>> --
>>
>> Konstantin Knauf | Solutions Architect
>>
>> +49 160 91394525
>>
>> <https://www.ververica.com/>
>>
>> Follow us @VervericaData
>>
>> --
>>
>> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
>> Conference
>>
>> Stream Processing | Event Driven | Real Time
>>
>> --
>>
>> Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>>
>> --
>> Data Artisans GmbH
>> Registered at Amtsgericht Charlottenburg: HRB 158244 B
>> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>>
>

-- 

Konstantin Knauf | Solutions Architect

+49 160 91394525

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Mime
View raw message