flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Juho Autio <juho.au...@rovio.com>
Subject Re: Data loss when restoring from savepoint
Date Thu, 31 Jan 2019 09:46:05 GMT
Hello, is there anyone that could help with this?

On Fri, Jan 11, 2019 at 8:14 AM Juho Autio <juho.autio@rovio.com> wrote:

> Stefan, would you have time to comment?
>
> On Wednesday, January 2, 2019, Juho Autio <juho.autio@rovio.com> wrote:
>
>> Bump – does anyone know if Stefan will be available to comment the latest
>> findings? Thanks.
>>
>> On Fri, Dec 21, 2018 at 2:33 PM Juho Autio <juho.autio@rovio.com> wrote:
>>
>>> Stefan, I managed to analyze savepoint with bravo. It seems that the
>>> data that's missing from output *is* found in savepoint.
>>>
>>> I simplified my test case to the following:
>>>
>>> - job 1 has bee running for ~10 days
>>> - savepoint X created & job 1 cancelled
>>> - job 2 started with restore from savepoint X
>>>
>>> Then I waited until the next day so that job 2 has triggered the 24 hour
>>> window.
>>>
>>> Then I analyzed the output & savepoint:
>>>
>>> - compare job 2 output with the output of a batch pyspark script => find
>>> 4223 missing rows
>>> - pick one of the missing rows (say, id Z)
>>> - read savepoint X with bravo, filter for id Z => Z was found in the
>>> savepoint!
>>>
>>> How can it be possible that the value is in state but doesn't end up in
>>> output after state has been restored & window is eventually triggered?
>>>
>>> I also did similar analysis on the previous case where I savepointed &
>>> restored the job multiple times (5) within the same 24-hour window. A
>>> missing id that I drilled down to, was found in all of those savepoints,
>>> yet missing from the output that gets written at the end of the day. This
>>> is even more surprising: that the missing ID was written to the new
>>> savepoints also after restoring. Is the reducer state somehow decoupled
>>> from the window contents?
>>>
>>> Big thanks to bravo-developer Gyula for guiding me through to be able
>>> read the reducer state! https://github.com/king/bravo/pull/11
>>>
>>> Gyula also had an idea for how to troubleshoot the missing data in a
>>> scalable way: I could add some "side effect kafka output" on individual
>>> operators. This should allow tracking more closely at which point the data
>>> gets lost. However, maybe this would have to be in some Flink's internal
>>> components, and I'm not sure which those would be.
>>>
>>> Cheers,
>>> Juho
>>>
>>> On Mon, Nov 19, 2018 at 11:52 AM Juho Autio <juho.autio@rovio.com>
>>> wrote:
>>>
>>>>
>>>> Hi Stefan,
>>>>
>>>> Bravo doesn't currently support reading a reducer state. I gave it a
>>>> try but couldn't get to a working implementation yet. If anyone can provide
>>>> some insight on how to make this work, please share at github:
>>>> https://github.com/king/bravo/pull/11
>>>>
>>>> Thanks.
>>>>
>>>> On Tue, Oct 23, 2018 at 3:32 PM Juho Autio <juho.autio@rovio.com>
>>>> wrote:
>>>>
>>>>> I was glad to find that bravo had now been updated to support
>>>>> installing bravo to a local maven repo.
>>>>>
>>>>> I was able to load a checkpoint created by my job, thanks to the
>>>>> example provided in bravo README, but I'm still missing the essential piece.
>>>>>
>>>>> My code was:
>>>>>
>>>>>         OperatorStateReader reader = new OperatorStateReader(env2,
>>>>> savepoint, "DistinctFunction");
>>>>>         DontKnowWhatTypeThisIs reducingState =
>>>>> reader.readKeyedStates(what should I put here?);
>>>>>
>>>>> I don't know how to read the values collected from reduce() calls in
>>>>> the state. Is there a way to access the reducing state of the window with
>>>>> bravo? I'm a bit confused how this works, because when I check with
>>>>> debugger, flink internally uses a ReducingStateDescriptor
>>>>> with name=window-contents, but still reading operator state for
>>>>> "DistinctFunction" didn't at least throw an exception ("window-contents"
>>>>> threw – obviously there's no operator by that name).
>>>>>
>>>>> Cheers,
>>>>> Juho
>>>>>
>>>>> On Mon, Oct 15, 2018 at 2:25 PM Juho Autio <juho.autio@rovio.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Stefan,
>>>>>>
>>>>>> Sorry but it doesn't seem immediately clear to me what's a good way
>>>>>> to use https://github.com/king/bravo.
>>>>>>
>>>>>> How are people using it? Would you for example modify build.gradle
>>>>>> somehow to publish the bravo as a library locally/internally? Or add code
>>>>>> directly in the bravo project (locally) and run it from there (using an
>>>>>> IDE, for example)? Also it doesn't seem like the bravo gradle project
>>>>>> supports building a flink job jar, but if it does, how do I do it?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> On Thu, Oct 4, 2018 at 9:30 PM Juho Autio <juho.autio@rovio.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Good then, I'll try to analyze the savepoints with Bravo. Thanks!
>>>>>>>
>>>>>>> > How would you assume that backpressure would influence your
>>>>>>> updates? Updates to each local state still happen event-by-event, in a
>>>>>>> single reader/writing thread.
>>>>>>>
>>>>>>> Sure, just an ignorant guess by me. I'm not familiar with most of
>>>>>>> Flink's internals. Any way high backpressure is not a seen on this job
>>>>>>> after it has caught up the lag, so at I thought it would be worth
>>>>>>> mentioning.
>>>>>>>
>>>>>>> On Thu, Oct 4, 2018 at 6:24 PM Stefan Richter <
>>>>>>> s.richter@data-artisans.com> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Am 04.10.2018 um 16:08 schrieb Juho Autio <juho.autio@rovio.com>:
>>>>>>>>
>>>>>>>> > you could take a look at Bravo [1] to query your savepoints and
>>>>>>>> to check if the state in the savepoint complete w.r.t your expectations
>>>>>>>>
>>>>>>>> Thanks. I'm not 100% if this is the case, but to me it seemed like
>>>>>>>> the missed ids were being logged by the reducer soon after the job had
>>>>>>>> started (after restoring a savepoint). But on the other hand, after that I
>>>>>>>> also made another savepoint & restored that, so what I could check is: does
>>>>>>>> that next savepoint have the missed ids that were logged (a couple of
>>>>>>>> minutes before the savepoint was created, so there should've been more than
>>>>>>>> enough time to add them to the state before the savepoint was triggered) or
>>>>>>>> not. Any way, if I would be able to verify with Bravo that the ids are
>>>>>>>> missing from the savepoint (even though reduced logged that it saw them),
>>>>>>>> would that help in figuring out where they are lost? Is there some major
>>>>>>>> difference compared to just looking at the final output after window has
>>>>>>>> been triggered?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I think that makes a difference. For example, you can investigate
>>>>>>>> if there is a state loss or a problem with the windowing. In the savepoint
>>>>>>>> you could see which keys exists and to which windows they are assigned.
>>>>>>>> Also just to make sure there is no misunderstanding: only elements that are
>>>>>>>> in the state at the start of a savepoint are expected to be part of the
>>>>>>>> savepoint; all elements between start and completion of the savepoint are
>>>>>>>> not expected to be part of the savepoint.
>>>>>>>>
>>>>>>>>
>>>>>>>> > I also doubt that the problem is about backpressure after
>>>>>>>> restore, because the job will only continue running after the state restore
>>>>>>>> is already completed.
>>>>>>>>
>>>>>>>> Yes, I'm not suspecting that the state restoring would be the
>>>>>>>> problem either. My concern was about backpressure possibly messing with the
>>>>>>>> updates of reducing state? I would tend to suspect that updating the state
>>>>>>>> consistently is what fails, where heavy load / backpressure might be a
>>>>>>>> factor.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> How would you assume that backpressure would influence your
>>>>>>>> updates? Updates to each local state still happen event-by-event, in a
>>>>>>>> single reader/writing thread.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Oct 4, 2018 at 4:18 PM Stefan Richter <
>>>>>>>> s.richter@data-artisans.com> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> you could take a look at Bravo [1] to query your savepoints and to
>>>>>>>>> check if the state in the savepoint complete w.r.t your expectations. I
>>>>>>>>> somewhat doubt that there is a general problem with the state/savepoints
>>>>>>>>> because many users are successfully running it on a large state and I am
>>>>>>>>> not aware of any data loss problems, but nothing is impossible. What the
>>>>>>>>> savepoint does is also straight forward: iterate a db snapshot and write
>>>>>>>>> all key/value pairs to disk, so all data that was in the db at the time of
>>>>>>>>> the savepoint, should show up. I also doubt that the problem is about
>>>>>>>>> backpressure after restore, because the job will only continue running
>>>>>>>>> after the state restore is already completed. Did you check if you are
>>>>>>>>> using exactly-one-semantics or at-least-once semantics? Also did you check
>>>>>>>>> that the kafka consumer start position is configured properly [2]? Are
>>>>>>>>> watermarks generated as expected after restore?
>>>>>>>>>
>>>>>>>>> One more unrelated high-level comment that I have: for a
>>>>>>>>> granularity of 24h windows, I wonder if it would not make sense to use a
>>>>>>>>> batch job instead?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Stefan
>>>>>>>>>
>>>>>>>>> [1] https://github.com/king/bravo
>>>>>>>>> [2]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>>>>>>>>
>>>>>>>>> Am 04.10.2018 um 14:53 schrieb Juho Autio <juho.autio@rovio.com>:
>>>>>>>>>
>>>>>>>>> Thanks for the suggestions!
>>>>>>>>>
>>>>>>>>> > In general, it would be tremendously helpful to have a minimal
>>>>>>>>> working example which allows to reproduce the problem.
>>>>>>>>>
>>>>>>>>> Definitely. The problem with reproducing has been that this only
>>>>>>>>> seems to happen in the bigger production data volumes.
>>>>>>>>>
>>>>>>>>> That's why I'm hoping to find a way to debug this with the
>>>>>>>>> production data. With that it seems to consistently cause some misses every
>>>>>>>>> time the job is killed/restored.
>>>>>>>>>
>>>>>>>>> > check if it happens for shorter windows, like 1h etc
>>>>>>>>>
>>>>>>>>> What would be the benefit of that compared to 24h window?
>>>>>>>>>
>>>>>>>>> > simplify the job to not use a reduce window but simply a time
>>>>>>>>> window which outputs the window events. Then counting the input and output
>>>>>>>>> events should allow you to verify the results. If you are not seeing
>>>>>>>>> missing events, then it could have something to do with the reducing state
>>>>>>>>> used in the reduce function.
>>>>>>>>>
>>>>>>>>> Hm, maybe, but not sure how useful that would be, because it
>>>>>>>>> wouldn't yet prove that it's related to reducing, because not having a
>>>>>>>>> reduce function could also mean smaller load on the job, which might alone
>>>>>>>>> be enough to make the problem not manifest.
>>>>>>>>>
>>>>>>>>> Is there a way to debug what goes into the reducing state
>>>>>>>>> (including what gets removed or overwritten and what restored), if that
>>>>>>>>> makes sense..? Maybe some suitable logging could be used to prove that the
>>>>>>>>> lost data is written to the reducing state (or at least asked to be
>>>>>>>>> written), but not found any more when the window closes and state is
>>>>>>>>> flushed?
>>>>>>>>>
>>>>>>>>> On configuration once more, we're using RocksDB state backend with
>>>>>>>>> asynchronous incremental checkpointing. The state is restored from
>>>>>>>>> savepoints though, we haven't been using those checkpoints in these tests
>>>>>>>>> (although they could be used in case of crashes – but we haven't had those
>>>>>>>>> now).
>>>>>>>>>
>>>>>>>>> On Thu, Oct 4, 2018 at 3:25 PM Till Rohrmann <trohrmann@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Juho,
>>>>>>>>>>
>>>>>>>>>> another idea to further narrow down the problem could be to
>>>>>>>>>> simplify the job to not use a reduce window but simply a time window which
>>>>>>>>>> outputs the window events. Then counting the input and output events should
>>>>>>>>>> allow you to verify the results. If you are not seeing missing events, then
>>>>>>>>>> it could have something to do with the reducing state used in the reduce
>>>>>>>>>> function.
>>>>>>>>>>
>>>>>>>>>> In general, it would be tremendously helpful to have a minimal
>>>>>>>>>> working example which allows to reproduce the problem.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>>
>>>>>>>>>> On Thu, Oct 4, 2018 at 2:02 PM Andrey Zagrebin <
>>>>>>>>>> andrey@data-artisans.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>
>>>>>>>>>>> can you try to reduce the job to minimal reproducible example
>>>>>>>>>>> and share the job and input?
>>>>>>>>>>>
>>>>>>>>>>> For example:
>>>>>>>>>>> - some simple records as input, e.g. tuples of primitive types
>>>>>>>>>>> saved as cvs
>>>>>>>>>>> - minimal deduplication job which processes them and misses
>>>>>>>>>>> records
>>>>>>>>>>> - check if it happens for shorter windows, like 1h etc
>>>>>>>>>>> - setup which you use for the job, ideally locally reproducible
>>>>>>>>>>> or cloud
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Andrey
>>>>>>>>>>>
>>>>>>>>>>> On 4 Oct 2018, at 11:13, Juho Autio <juho.autio@rovio.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Sorry to insist, but we seem to be blocked for any serious usage
>>>>>>>>>>> of state in Flink if we can't rely on it to not miss data in case of
>>>>>>>>>>> restore.
>>>>>>>>>>>
>>>>>>>>>>> Would anyone have suggestions for how to troubleshoot this? So
>>>>>>>>>>> far I have verified with DEBUG logs that our reduce function gets to
>>>>>>>>>>> process also the data that is missing from window output.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Oct 1, 2018 at 11:56 AM Juho Autio <juho.autio@rovio.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Andrey,
>>>>>>>>>>>>
>>>>>>>>>>>> To rule out for good any questions about sink behaviour, the
>>>>>>>>>>>> job was killed and started with an additional Kafka sink.
>>>>>>>>>>>>
>>>>>>>>>>>> The same number of ids were missed in both outputs: KafkaSink &
>>>>>>>>>>>> BucketingSink.
>>>>>>>>>>>>
>>>>>>>>>>>> I wonder what would be the next steps in debugging?
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:49 PM Juho Autio <
>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks, Andrey.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > so it means that the savepoint does not loose at least some
>>>>>>>>>>>>> dropped records.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm not sure what you mean by that? I mean, it was known from
>>>>>>>>>>>>> the beginning, that not everything is lost before/after restoring a
>>>>>>>>>>>>> savepoint, just some records around the time of restoration. It's not 100%
>>>>>>>>>>>>> clear whether records are lost before making a savepoint or after restoring
>>>>>>>>>>>>> it. Although, based on the new DEBUG logs it seems more like losing some
>>>>>>>>>>>>> records that are seen ~soon after restoring. It seems like Flink would be
>>>>>>>>>>>>> somehow confused either about the restored state vs. new inserts to state.
>>>>>>>>>>>>> This could also be somehow linked to the high back pressure on the kafka
>>>>>>>>>>>>> source while the stream is catching up.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > If it is feasible for your setup, I suggest to insert one
>>>>>>>>>>>>> more map function after reduce and before sink.
>>>>>>>>>>>>> > etc.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Isn't that the same thing that we discussed before? Nothing is
>>>>>>>>>>>>> sent to BucketingSink before the window closes, so I don't see how it would
>>>>>>>>>>>>> make any difference if we replace the BucketingSink with a map function or
>>>>>>>>>>>>> another sink type. We don't create or restore savepoints during the time
>>>>>>>>>>>>> when BucketingSink gets input or has open buckets – that happens at a much
>>>>>>>>>>>>> later time of day. I would focus on figuring out why the records are lost
>>>>>>>>>>>>> while the window is open. But I don't know how to do that. Would you have
>>>>>>>>>>>>> any additional suggestions?
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Sep 21, 2018 at 3:30 PM Andrey Zagrebin <
>>>>>>>>>>>>> andrey@data-artisans.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> so it means that the savepoint does not loose at least some
>>>>>>>>>>>>>> dropped records.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If it is feasible for your setup, I suggest to insert one
>>>>>>>>>>>>>> more map function after reduce and before sink.
>>>>>>>>>>>>>> The map function should be called right after window is
>>>>>>>>>>>>>> triggered but before flushing to s3.
>>>>>>>>>>>>>> The result of reduce (deduped record) could be logged there.
>>>>>>>>>>>>>> This should allow to check whether the processed distinct
>>>>>>>>>>>>>> records were buffered in the state after the restoration from the savepoint
>>>>>>>>>>>>>> or not. If they were buffered we should see that there was an attempt to
>>>>>>>>>>>>>> write them to the sink from the state.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Another suggestion is to try to write records to some other
>>>>>>>>>>>>>> sink or to both.
>>>>>>>>>>>>>> E.g. if you can access file system of workers, maybe just
>>>>>>>>>>>>>> into local files and check whether the records are also dropped there.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On 20 Sep 2018, at 15:37, Juho Autio <juho.autio@rovio.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Andrey!
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I was finally able to gather the DEBUG logs that you
>>>>>>>>>>>>>> suggested. In short, the reducer logged that it processed at least some of
>>>>>>>>>>>>>> the ids that were missing from the output.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> "At least some", because I didn't have the job running with
>>>>>>>>>>>>>> DEBUG logs for the full 24-hour window period. So I was only able to look
>>>>>>>>>>>>>> up if I can find *some* of the missing ids in the DEBUG
>>>>>>>>>>>>>> logs. Which I did indeed.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I changed the DistinctFunction.java to do this:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>     public Map<String, String> reduce(Map<String, String>
>>>>>>>>>>>>>> value1, Map<String, String> value2) {
>>>>>>>>>>>>>>         LOG.debug("DistinctFunction.reduce returns: {}={}",
>>>>>>>>>>>>>> value1.get("field"), value1.get("id"));
>>>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Then:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> vi flink-1.6.0/conf/log4j.properties
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> log4j.logger.org.apache.flink.streaming.runtime.tasks.StreamTask=DEBUG
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> log4j.logger.com.rovio.ds.flink.uniqueid.DistinctFunction=DEBUG
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Then I ran the following kind of test:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> - Cancelled the on-going job with savepoint created at ~Sep
>>>>>>>>>>>>>> 18 08:35 UTC 2018
>>>>>>>>>>>>>> - Started a new cluster & job with DEBUG enabled at ~09:13,
>>>>>>>>>>>>>> restored from that previous cluster's savepoint
>>>>>>>>>>>>>> - Ran until caught up offsets
>>>>>>>>>>>>>> - Cancelled the job with a new savepoint
>>>>>>>>>>>>>> - Started a new job _without_ DEBUG, which restored the new
>>>>>>>>>>>>>> savepoint, let it keep running so that it will eventually write the output
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Then on the next day, after results had been flushed when the
>>>>>>>>>>>>>> 24-hour window closed, I compared the results again with a batch version's
>>>>>>>>>>>>>> output. And found some missing ids as usual.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I drilled down to one specific missing id (I'm replacing the
>>>>>>>>>>>>>> actual value with AN12345 below), which was not found in the stream output,
>>>>>>>>>>>>>> but was found in batch output & flink DEBUG logs.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Related to that id, I gathered the following information:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2018-09-18~09:13:21,000 job started & savepoint is restored
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2018-09-18 09:14:29,085 missing id is processed for the first
>>>>>>>>>>>>>> time, proved by this log line:
>>>>>>>>>>>>>> 2018-09-18 09:14:29,085 DEBUG
>>>>>>>>>>>>>> com.rovio.ds.flink.uniqueid.DistinctFunction                  -
>>>>>>>>>>>>>> DistinctFunction.reduce returns: s.aid1=AN12345
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2018-09-18 09:15:14,264 first synchronous part of checkpoint
>>>>>>>>>>>>>> 2018-09-18 09:15:16,544 first asynchronous part of checkpoint
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> (
>>>>>>>>>>>>>> more occurrences of checkpoints (~1 min checkpointing time +
>>>>>>>>>>>>>> ~1 min delay before next)
>>>>>>>>>>>>>> /
>>>>>>>>>>>>>> more occurrences of DistinctFunction.reduce
>>>>>>>>>>>>>> )
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2018-09-18 09:23:45,053 missing id is processed for the last
>>>>>>>>>>>>>> time
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2018-09-18~10:20:00,000 savepoint created & job cancelled
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> To be noted, there was high backpressure after restoring from
>>>>>>>>>>>>>> savepoint until the stream caught up with the kafka offsets. Although, our
>>>>>>>>>>>>>> job uses assign timestamps & watermarks on the flink kafka consumer itself,
>>>>>>>>>>>>>> so event time of all partitions is synchronized. As expected, we don't get
>>>>>>>>>>>>>> any late data in the late data side output.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> From this we can see that the missing ids are processed by
>>>>>>>>>>>>>> the reducer, but they must get lost somewhere before the 24-hour window is
>>>>>>>>>>>>>> triggered.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I think it's worth mentioning once more that the stream
>>>>>>>>>>>>>> doesn't miss any ids if we let it's running without interruptions / state
>>>>>>>>>>>>>> restoring.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What's next?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Aug 29, 2018 at 3:49 PM Andrey Zagrebin <
>>>>>>>>>>>>>> andrey@data-artisans.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > only when the 24-hour window triggers, BucketingSink gets
>>>>>>>>>>>>>>> a burst of input
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is of course totally true, my understanding is the
>>>>>>>>>>>>>>> same. We cannot exclude problem there for sure, just savepoints are used a
>>>>>>>>>>>>>>> lot w/o problem reports and BucketingSink is known to be problematic with
>>>>>>>>>>>>>>> s3. That is why, I asked you:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > You also wrote that the timestamps of lost event are
>>>>>>>>>>>>>>> 'probably' around the time of the savepoint, if it is not yet for sure I
>>>>>>>>>>>>>>> would also check it.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Although, bucketing sink might loose any data at the end of
>>>>>>>>>>>>>>> the day (also from the middle). The fact, that it is always around the time
>>>>>>>>>>>>>>> of taking a savepoint and not random, is surely suspicious and possible
>>>>>>>>>>>>>>> savepoint failures need to be investigated.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regarding the s3 problem, s3 doc says:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > The caveat is that if you make a HEAD or GET request to
>>>>>>>>>>>>>>> the key name (to find if the object exists) before creating the object,
>>>>>>>>>>>>>>> Amazon S3 provides 'eventual consistency' for read-after-write.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The algorithm you suggest is how it is roughly implemented
>>>>>>>>>>>>>>> now (BucketingSink.openNewPartFile). My understanding is that
>>>>>>>>>>>>>>> 'eventual consistency’ means that even if you just created file (its name
>>>>>>>>>>>>>>> is key) it can be that you do not get it in the list or exists (HEAD)
>>>>>>>>>>>>>>> returns false and you risk to rewrite the previous part.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The BucketingSink was designed for a standard file system.
>>>>>>>>>>>>>>> s3 is used over a file system wrapper atm but does not always provide
>>>>>>>>>>>>>>> normal file system guarantees. See also last example in [1].
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>> https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 29 Aug 2018, at 12:11, Juho Autio <juho.autio@rovio.com>
>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Andrey, thank you very much for the debugging suggestions,
>>>>>>>>>>>>>>> I'll try them.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In the meanwhile two more questions, please:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > Just to keep in mind this problem with s3 and exclude it
>>>>>>>>>>>>>>> for sure. I would also check whether the size of missing events is around
>>>>>>>>>>>>>>> the batch size of BucketingSink or not.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Fair enough, but I also want to focus on debugging the most
>>>>>>>>>>>>>>> probable subject first. So what do you think about this – true or false:
>>>>>>>>>>>>>>> only when the 24-hour window triggers, BucketinSink gets a burst of input.
>>>>>>>>>>>>>>> Around the state restoring point (middle of the day) it doesn't get any
>>>>>>>>>>>>>>> input, so it can't lose anything either. Isn't this true, or have I totally
>>>>>>>>>>>>>>> missed how Flink works in triggering window results? I would not expect
>>>>>>>>>>>>>>> there to be any optimization that speculatively triggers early results of a
>>>>>>>>>>>>>>> regular time window to the downstream operators.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> > The old BucketingSink has in general problem with s3.
>>>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system to list already
>>>>>>>>>>>>>>> written file parts (batches) and determine index of the next part to start.
>>>>>>>>>>>>>>> Due to eventual consistency of checking file existence in s3 [1], the
>>>>>>>>>>>>>>> BucketingSink can rewrite the previously written part and basically loose
>>>>>>>>>>>>>>> it.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I was wondering, what does S3's "read-after-write
>>>>>>>>>>>>>>> consistency" (mentioned on the page you linked) actually mean. It seems
>>>>>>>>>>>>>>> that this might be possible:
>>>>>>>>>>>>>>> - LIST keys, find current max index
>>>>>>>>>>>>>>> - choose next index = max + 1
>>>>>>>>>>>>>>> - HEAD next index: if it exists, keep adding + 1 until key
>>>>>>>>>>>>>>> doesn't exist on S3
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> But definitely sounds easier if a sink keeps track of files
>>>>>>>>>>>>>>> in a way that's guaranteed to be consistent.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Juho
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <
>>>>>>>>>>>>>>> andrey@data-artisans.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> true, StreamingFileSink does not support s3 in 1.6.0, it is
>>>>>>>>>>>>>>>> planned for the next 1.7 release, sorry for confusion.
>>>>>>>>>>>>>>>> The old BucketingSink has in general problem with s3.
>>>>>>>>>>>>>>>> Internally BucketingSink queries s3 as a file system
>>>>>>>>>>>>>>>> to list already written file parts (batches) and determine
>>>>>>>>>>>>>>>> index of the next part to start. Due to eventual consistency of checking
>>>>>>>>>>>>>>>> file existence in s3 [1], the BucketingSink can rewrite the previously
>>>>>>>>>>>>>>>> written part and basically loose it. It should be fixed for
>>>>>>>>>>>>>>>> StreamingFileSink in 1.7 where Flink keeps its own track of written parts
>>>>>>>>>>>>>>>> and does not rely on s3 as a file system.
>>>>>>>>>>>>>>>> I also include Kostas, he might add more details.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Just to keep in mind this problem with s3 and exclude it
>>>>>>>>>>>>>>>> for sure  I would also check whether the size of missing events is around
>>>>>>>>>>>>>>>> the batch size of BucketingSink or not. You also wrote that the timestamps
>>>>>>>>>>>>>>>> of lost event are 'probably' around the time of the savepoint, if it is not
>>>>>>>>>>>>>>>> yet for sure I would also check it.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Have you already checked the log files of job manager and
>>>>>>>>>>>>>>>> task managers for the job running before and after the restore from the
>>>>>>>>>>>>>>>> check point? Is everything successful there, no errors, relevant warnings
>>>>>>>>>>>>>>>> or exceptions?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> As the next step, I would suggest to log all encountered
>>>>>>>>>>>>>>>> events in DistinctFunction.reduce if possible for production data and check
>>>>>>>>>>>>>>>> whether the missed events are eventually processed before or after the
>>>>>>>>>>>>>>>> savepoint. The following log message indicates a border between the events
>>>>>>>>>>>>>>>> that should be included into the savepoint (logged before) or not:
>>>>>>>>>>>>>>>> “{} ({}, synchronous part) in thread {} took {} ms”
>>>>>>>>>>>>>>>> (template)
>>>>>>>>>>>>>>>> Also check if the savepoint has been overall completed:
>>>>>>>>>>>>>>>> "{} ({}, asynchronous part) in thread {} took {} ms."
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On 24 Aug 2018, at 20:41, Juho Autio <juho.autio@rovio.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Using StreamingFileSink is not a convenient option for
>>>>>>>>>>>>>>>> production use for us as it doesn't support s3*. I could use
>>>>>>>>>>>>>>>> StreamingFileSink just to verify, but I don't see much point in doing so.
>>>>>>>>>>>>>>>> Please consider my previous comment:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> > I realized that BucketingSink must not play any role in
>>>>>>>>>>>>>>>> this problem. This is because only when the 24-hour window triggers,
>>>>>>>>>>>>>>>> BucketingSink gets a burst of input. Around the state restoring point
>>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose anything
>>>>>>>>>>>>>>>> either (right?).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I could also use a kafka sink instead, but I can't imagine
>>>>>>>>>>>>>>>> how there could be any difference. It's very real that the sink doesn't get
>>>>>>>>>>>>>>>> any input for a long time until the 24-hour window closes, and then it
>>>>>>>>>>>>>>>> quickly writes out everything because it's not that much data eventually
>>>>>>>>>>>>>>>> for the distinct values.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Any ideas for debugging what's happening around the
>>>>>>>>>>>>>>>> savepoint & restoration time?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *) I actually implemented StreamingFileSink as an
>>>>>>>>>>>>>>>> alternative sink. This was before I came to realize that most likely the
>>>>>>>>>>>>>>>> sink component has nothing to do with the data loss problem. I tried it
>>>>>>>>>>>>>>>> with s3n:// path just to see an exception being thrown. In the source code
>>>>>>>>>>>>>>>> I indeed then found an explicit check for the target path scheme to be
>>>>>>>>>>>>>>>> "hdfs://".
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>> andrey@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Ok, I think before further debugging the window reduced
>>>>>>>>>>>>>>>>> state,
>>>>>>>>>>>>>>>>> could you try the new ‘StreamingFileSink’ [1] introduced
>>>>>>>>>>>>>>>>> in Flink 1.6.0 instead of the previous 'BucketingSink’?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.autio@rovio.com>
>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Yes, sorry for my confusing comment. I just meant that it
>>>>>>>>>>>>>>>>> seems like there's a bug somewhere now that the output is missing some data.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> > I would wait and check the actual output in s3 because
>>>>>>>>>>>>>>>>> it is the main result of the job
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Yes, and that's what I have already done. There seems to
>>>>>>>>>>>>>>>>> be always some data loss with the production data volumes, if the job has
>>>>>>>>>>>>>>>>> been restarted on that day.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Would you have any suggestions for how to debug this
>>>>>>>>>>>>>>>>> further?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Many thanks for stepping in.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>> andrey@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So it is a per key deduplication job.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Yes, I would wait and check the actual output in s3
>>>>>>>>>>>>>>>>>> because it is the main result of the job and
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint might
>>>>>>>>>>>>>>>>>> be not included into the savepoint but it should be behind the snapshotted
>>>>>>>>>>>>>>>>>> offset in Kafka.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> is not a bug, it is a possible behaviour.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The savepoint is a snapshot of the data in transient
>>>>>>>>>>>>>>>>>> which is already consumed from Kafka.
>>>>>>>>>>>>>>>>>> Basically the full contents of the window result is split
>>>>>>>>>>>>>>>>>> between the savepoint and what can come after the savepoint'ed offset in
>>>>>>>>>>>>>>>>>> Kafka but before the window result is written into s3.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Allowed lateness should not affect it, I am just saying
>>>>>>>>>>>>>>>>>> that the final result in s3 should include all records after it.
>>>>>>>>>>>>>>>>>> This is what should be guaranteed but not the contents of
>>>>>>>>>>>>>>>>>> the intermediate savepoint.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On 24 Aug 2018, at 16:52, Juho Autio <
>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for your answer!
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I check for the missed data from the final output on s3.
>>>>>>>>>>>>>>>>>> So I wait until the next day, then run the same thing re-implemented in
>>>>>>>>>>>>>>>>>> batch, and compare the output.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> > The late data around the time of taking savepoint might
>>>>>>>>>>>>>>>>>> be not included into the savepoint but it should be behind the snapshotted
>>>>>>>>>>>>>>>>>> offset in Kafka.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Yes, I would definitely expect that. It seems like
>>>>>>>>>>>>>>>>>> there's a bug somewhere.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> > Then it should just come later after the restore and
>>>>>>>>>>>>>>>>>> should be reduced within the allowed lateness into the final result which
>>>>>>>>>>>>>>>>>> is saved into s3.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Well, as far as I know, allowed lateness doesn't play any
>>>>>>>>>>>>>>>>>> role here, because I started running the job with allowedLateness=0, and
>>>>>>>>>>>>>>>>>> still get the data loss, while my late data output doesn't receive anything.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> > Also, is this `DistinctFunction.reduce` just an example
>>>>>>>>>>>>>>>>>> or the actual implementation, basically saving just one of records inside
>>>>>>>>>>>>>>>>>> the 24h window in s3? then what is missing there?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Yes, it's the actual implementation. Note that there's a
>>>>>>>>>>>>>>>>>> keyBy before the DistinctFunction. So there's one record for each key
>>>>>>>>>>>>>>>>>> (which is the combination of a couple of fields). In practice I've seen
>>>>>>>>>>>>>>>>>> that we're missing ~2000-4000 elements on each restore, and the total
>>>>>>>>>>>>>>>>>> output is obviously much more than that.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Here's the full code for the key selector:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> public class MapKeySelector implements
>>>>>>>>>>>>>>>>>> KeySelector<Map<String,String>, Object> {
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     private final String[] fields;
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     public MapKeySelector(String... fields) {
>>>>>>>>>>>>>>>>>>         this.fields = fields;
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>>     public Object getKey(Map<String, String> event)
>>>>>>>>>>>>>>>>>> throws Exception {
>>>>>>>>>>>>>>>>>>         Tuple key =
>>>>>>>>>>>>>>>>>> Tuple.getTupleClass(fields.length).newInstance();
>>>>>>>>>>>>>>>>>>         for (int i = 0; i < fields.length; i++) {
>>>>>>>>>>>>>>>>>>             key.setField(event.getOrDefault(fields[i],
>>>>>>>>>>>>>>>>>> ""), i);
>>>>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>>>>         return key;
>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> And a more exact example on how it's used:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>                 .keyBy(new MapKeySelector("ID",
>>>>>>>>>>>>>>>>>> "PLAYER_ID", "FIELD", "KEY_NAME", "KEY_VALUE"))
>>>>>>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <
>>>>>>>>>>>>>>>>>> andrey@data-artisans.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Juho,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Where exactly does the data miss? When do you notice
>>>>>>>>>>>>>>>>>>> that?
>>>>>>>>>>>>>>>>>>> Do you check it:
>>>>>>>>>>>>>>>>>>> - debugging `DistinctFunction.reduce` right after resume
>>>>>>>>>>>>>>>>>>> in the middle of the day
>>>>>>>>>>>>>>>>>>> or
>>>>>>>>>>>>>>>>>>> - some distinct records miss in the final output
>>>>>>>>>>>>>>>>>>> of BucketingSink in s3 after window result is actually triggered and saved
>>>>>>>>>>>>>>>>>>> into s3 at the end of the day? is this the main output?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The late data around the time of taking savepoint might
>>>>>>>>>>>>>>>>>>> be not included into the savepoint but it should be behind the snapshotted
>>>>>>>>>>>>>>>>>>> offset in Kafka. Then it should just come later after the restore and
>>>>>>>>>>>>>>>>>>> should be reduced within the allowed lateness into the final result which
>>>>>>>>>>>>>>>>>>> is saved into s3.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Also, is this `DistinctFunction.reduce` just an example
>>>>>>>>>>>>>>>>>>> or the actual implementation, basically saving just one of records inside
>>>>>>>>>>>>>>>>>>> the 24h window in s3? then what is missing there?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>>>>>> Andrey
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On 23 Aug 2018, at 15:42, Juho Autio <
>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I changed to allowedLateness=0, no change, still missing
>>>>>>>>>>>>>>>>>>> data when restoring from savepoint.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <
>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I realized that BucketingSink must not play any role in
>>>>>>>>>>>>>>>>>>>> this problem. This is because only when the 24-hour window triggers,
>>>>>>>>>>>>>>>>>>>> BucketinSink gets a burst of input. Around the state restoring point
>>>>>>>>>>>>>>>>>>>> (middle of the day) it doesn't get any input, so it can't lose anything
>>>>>>>>>>>>>>>>>>>> either (right?).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I will next try removing the allowedLateness entirely
>>>>>>>>>>>>>>>>>>>> from the equation.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In the meanwhile, please let me know if you have any
>>>>>>>>>>>>>>>>>>>> suggestions for debugging the lost data, for example what logs to enable.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> We use FlinkKafkaConsumer010 btw. Are there any known
>>>>>>>>>>>>>>>>>>>> issues with that, that could contribute to lost data when restoring a
>>>>>>>>>>>>>>>>>>>> savepoint?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <
>>>>>>>>>>>>>>>>>>>> juho.autio@rovio.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Some data is silently lost on my Flink stream job when
>>>>>>>>>>>>>>>>>>>>> state is restored from a savepoint.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Do you have any debugging hints to find out where
>>>>>>>>>>>>>>>>>>>>> exactly the data gets dropped?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> My job gathers distinct values using a 24-hour window.
>>>>>>>>>>>>>>>>>>>>> It doesn't have any custom state management.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> When I cancel the job with savepoint and restore from
>>>>>>>>>>>>>>>>>>>>> that savepoint, some data is missed. It seems to be losing just a small
>>>>>>>>>>>>>>>>>>>>> amount of data. The event time of lost data is probably around the time of
>>>>>>>>>>>>>>>>>>>>> savepoint. In other words the rest of the time window is not entirely
>>>>>>>>>>>>>>>>>>>>> missed – collection works correctly also for (most of the) events that come
>>>>>>>>>>>>>>>>>>>>> in after restoring.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> When the job processes a full 24-hour window without
>>>>>>>>>>>>>>>>>>>>> interruptions it doesn't miss anything.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Usually the problem doesn't happen in test
>>>>>>>>>>>>>>>>>>>>> environments that have smaller parallelism and smaller data volumes. But in
>>>>>>>>>>>>>>>>>>>>> production volumes the job seems to be consistently missing at least
>>>>>>>>>>>>>>>>>>>>> something on every restore.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This issue has consistently happened since the job was
>>>>>>>>>>>>>>>>>>>>> initially created. It was at first run on an older version of Flink
>>>>>>>>>>>>>>>>>>>>> 1.5-SNAPSHOT and it still happens on both Flink 1.5.2 & 1.6.0.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I'm wondering if this could be for example some
>>>>>>>>>>>>>>>>>>>>> synchronization issue between the kafka consumer offsets vs. what's been
>>>>>>>>>>>>>>>>>>>>> written by BucketingSink?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 1. Job content, simplified
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>         kafkaStream
>>>>>>>>>>>>>>>>>>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>>>>>>>>>>>>>>>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>>>>>>>>>>>>>>>>>                 .timeWindow(Time.days(1))
>>>>>>>>>>>>>>>>>>>>>                 .allowedLateness(allowedLateness)
>>>>>>>>>>>>>>>>>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>>>>>>>>>>>>>>>>>                 .reduce(new DistinctFunction())
>>>>>>>>>>>>>>>>>>>>>                 .addSink(sink)
>>>>>>>>>>>>>>>>>>>>>                 // use a fixed number of output
>>>>>>>>>>>>>>>>>>>>> partitions
>>>>>>>>>>>>>>>>>>>>>                 .setParallelism(8))
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> /**
>>>>>>>>>>>>>>>>>>>>>  * Usage: .keyBy("the", "distinct",
>>>>>>>>>>>>>>>>>>>>> "fields").reduce(new DistinctFunction())
>>>>>>>>>>>>>>>>>>>>>  */
>>>>>>>>>>>>>>>>>>>>> public class DistinctFunction implements
>>>>>>>>>>>>>>>>>>>>> ReduceFunction<java.util.Map<String, String>> {
>>>>>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>>>>>     public Map<String, String> reduce(Map<String,
>>>>>>>>>>>>>>>>>>>>> String> value1, Map<String, String> value2) {
>>>>>>>>>>>>>>>>>>>>>         return value1;
>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 2. State configuration
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> boolean enableIncrementalCheckpointing = true;
>>>>>>>>>>>>>>>>>>>>> String statePath = "s3n://bucket/savepoints";
>>>>>>>>>>>>>>>>>>>>> new RocksDBStateBackend(statePath,
>>>>>>>>>>>>>>>>>>>>> enableIncrementalCheckpointing);
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Checkpointing Mode Exactly Once
>>>>>>>>>>>>>>>>>>>>> Interval 1m 0s
>>>>>>>>>>>>>>>>>>>>> Timeout 10m 0s
>>>>>>>>>>>>>>>>>>>>> Minimum Pause Between Checkpoints 1m 0s
>>>>>>>>>>>>>>>>>>>>> Maximum Concurrent Checkpoints 1
>>>>>>>>>>>>>>>>>>>>> Persist Checkpoints Externally Enabled (retain on
>>>>>>>>>>>>>>>>>>>>> cancellation)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 3. BucketingSink configuration
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> We use BucketingSink, I don't think there's anything
>>>>>>>>>>>>>>>>>>>>> special here, if not the fact that we're writing to S3.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>         String outputPath = "s3://bucket/output";
>>>>>>>>>>>>>>>>>>>>>         BucketingSink<Map<String, String>> sink = new
>>>>>>>>>>>>>>>>>>>>> BucketingSink<Map<String, String>>(outputPath)
>>>>>>>>>>>>>>>>>>>>>                 .setBucketer(new ProcessdateBucketer())
>>>>>>>>>>>>>>>>>>>>>                 .setBatchSize(batchSize)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>>>>>>>>>>>>>>>>>         sink.setWriter(new IdJsonWriter());
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 4. Kafka & event time
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> My flink job reads the data from Kafka, using a
>>>>>>>>>>>>>>>>>>>>> BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
>>>>>>>>>>>>>>>>>>>>> synchronize watermarks accross all kafka partitions. We also write late
>>>>>>>>>>>>>>>>>>>>> data to side output, but nothing is written there – if it would, it could
>>>>>>>>>>>>>>>>>>>>> explain missed data in the main output (I'm also sure that our late data
>>>>>>>>>>>>>>>>>>>>> writing works, because we previously had some actual late data which ended
>>>>>>>>>>>>>>>>>>>>> up there).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> 5. allowedLateness
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> It may be or may not be relevant that I have also
>>>>>>>>>>>>>>>>>>>>> enabled allowedLateness with 1 minute lateness on the 24-hour window:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> If that makes sense, I could try removing
>>>>>>>>>>>>>>>>>>>>> allowedLateness entirely? That would be just to rule out that Flink doesn't
>>>>>>>>>>>>>>>>>>>>> have a bug that's related to restoring state in combination with the
>>>>>>>>>>>>>>>>>>>>> allowedLateness feature. After all, all of our data should be in a good
>>>>>>>>>>>>>>>>>>>>> enough order to not be late, given the max out of orderness used on kafka
>>>>>>>>>>>>>>>>>>>>> consumer timestamp extractor.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thank you in advance!
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>

Mime
View raw message