flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrey Zagrebin <and...@data-artisans.com>
Subject Re: Data loss when restoring from savepoint
Date Wed, 29 Aug 2018 12:49:24 GMT
Hi Juho,

> only when the 24-hour window triggers, BucketingSink gets a burst of input

This is of course totally true, my understanding is the same. We cannot exclude problem there
for sure, just savepoints are used a lot w/o problem reports and BucketingSink is known to
be problematic with s3. That is why, I asked you:

> You also wrote that the timestamps of lost event are 'probably' around the time of the
savepoint, if it is not yet for sure I would also check it.

Although, bucketing sink might loose any data at the end of the day (also from the middle).
The fact, that it is always around the time of taking a savepoint and not random, is surely
suspicious and possible savepoint failures need to be investigated.

Regarding the s3 problem, s3 doc says:

> The caveat is that if you make a HEAD or GET request to the key name (to find if the
object exists) before creating the object, Amazon S3 provides 'eventual consistency' for read-after-write.

The algorithm you suggest is how it is roughly implemented now (BucketingSink.openNewPartFile).
My understanding is that 'eventual consistency’ means that even if you just created file
(its name is key) it can be that you do not get it in the list or exists (HEAD) returns false
and you risk to rewrite the previous part.

The BucketingSink was designed for a standard file system. s3 is used over a file system wrapper
atm but does not always provide normal file system guarantees. See also last example in [1].

Cheers,
Andrey

[1] https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82 <https://codeburst.io/quick-explanation-of-the-s3-consistency-model-6c9f325e3f82>

> On 29 Aug 2018, at 12:11, Juho Autio <juho.autio@rovio.com> wrote:
> 
> Andrey, thank you very much for the debugging suggestions, I'll try them.
> 
> In the meanwhile two more questions, please:
> 
> > Just to keep in mind this problem with s3 and exclude it for sure. I would also
check whether the size of missing events is around the batch size of BucketingSink or not.
> 
> Fair enough, but I also want to focus on debugging the most probable subject first. So
what do you think about this – true or false: only when the 24-hour window triggers, BucketinSink
gets a burst of input. Around the state restoring point (middle of the day) it doesn't get
any input, so it can't lose anything either. Isn't this true, or have I totally missed how
Flink works in triggering window results? I would not expect there to be any optimization
that speculatively triggers early results of a regular time window to the downstream operators.
> 
> > The old BucketingSink has in general problem with s3. Internally BucketingSink queries
s3 as a file system to list already written file parts (batches) and determine index of the
next part to start. Due to eventual consistency of checking file existence in s3 [1], the
BucketingSink can rewrite the previously written part and basically loose it.
> 
> I was wondering, what does S3's "read-after-write consistency" (mentioned on the page
you linked) actually mean. It seems that this might be possible:
> - LIST keys, find current max index
> - choose next index = max + 1
> - HEAD next index: if it exists, keep adding + 1 until key doesn't exist on S3
> 
> But definitely sounds easier if a sink keeps track of files in a way that's guaranteed
to be consistent.
> 
> Cheers,
> Juho
> 
> On Mon, Aug 27, 2018 at 2:04 PM Andrey Zagrebin <andrey@data-artisans.com <mailto:andrey@data-artisans.com>>
wrote:
> Hi,
> 
> true, StreamingFileSink does not support s3 in 1.6.0, it is planned for the next 1.7
release, sorry for confusion.
> The old BucketingSink has in general problem with s3. Internally BucketingSink queries
s3 as a file system 
> to list already written file parts (batches) and determine index of the next part to
start. Due to eventual consistency of checking file existence in s3 [1], the BucketingSink
can rewrite the previously written part and basically loose it. It should be fixed for StreamingFileSink
in 1.7 where Flink keeps its own track of written parts and does not rely on s3 as a file
system. 
> I also include Kostas, he might add more details. 
> 
> Just to keep in mind this problem with s3 and exclude it for sure  I would also check
whether the size of missing events is around the batch size of BucketingSink or not. You also
wrote that the timestamps of lost event are 'probably' around the time of the savepoint, if
it is not yet for sure I would also check it.
> 
> Have you already checked the log files of job manager and task managers for the job running
before and after the restore from the check point? Is everything successful there, no errors,
relevant warnings or exceptions?
> 
> As the next step, I would suggest to log all encountered events in DistinctFunction.reduce
if possible for production data and check whether the missed events are eventually processed
before or after the savepoint. The following log message indicates a border between the events
that should be included into the savepoint (logged before) or not:
> “{} ({}, synchronous part) in thread {} took {} ms” (template)
> Also check if the savepoint has been overall completed:
> "{} ({}, asynchronous part) in thread {} took {} ms."
> 
> Best,
> Andrey
> 
> [1] https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html <https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html>
> 
>> On 24 Aug 2018, at 20:41, Juho Autio <juho.autio@rovio.com <mailto:juho.autio@rovio.com>>
wrote:
>> 
>> Hi,
>> 
>> Using StreamingFileSink is not a convenient option for production use for us as it
doesn't support s3*. I could use StreamingFileSink just to verify, but I don't see much point
in doing so. Please consider my previous comment:
>> 
>> > I realized that BucketingSink must not play any role in this problem. This is
because only when the 24-hour window triggers, BucketingSink gets a burst of input. Around
the state restoring point (middle of the day) it doesn't get any input, so it can't lose anything
either (right?).
>> 
>> I could also use a kafka sink instead, but I can't imagine how there could be any
difference. It's very real that the sink doesn't get any input for a long time until the 24-hour
window closes, and then it quickly writes out everything because it's not that much data eventually
for the distinct values.
>> 
>> Any ideas for debugging what's happening around the savepoint & restoration time?
>> 
>> *) I actually implemented StreamingFileSink as an alternative sink. This was before
I came to realize that most likely the sink component has nothing to do with the data loss
problem. I tried it with s3n:// path just to see an exception being thrown. In the source
code I indeed then found an explicit check for the target path scheme to be "hdfs://". 
>> 
>> On Fri, Aug 24, 2018 at 7:49 PM Andrey Zagrebin <andrey@data-artisans.com <mailto:andrey@data-artisans.com>>
wrote:
>> Ok, I think before further debugging the window reduced state, 
>> could you try the new ‘StreamingFileSink’ [1] introduced in Flink 1.6.0 instead
of the previous 'BucketingSink’?
>> 
>> Cheers,
>> Andrey
>> 
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html>
>> 
>>> On 24 Aug 2018, at 18:03, Juho Autio <juho.autio@rovio.com <mailto:juho.autio@rovio.com>>
wrote:
>>> 
>>> Yes, sorry for my confusing comment. I just meant that it seems like there's
a bug somewhere now that the output is missing some data.
>>> 
>>> > I would wait and check the actual output in s3 because it is the main result
of the job
>>> 
>>> Yes, and that's what I have already done. There seems to be always some data
loss with the production data volumes, if the job has been restarted on that day.
>>> 
>>> Would you have any suggestions for how to debug this further?
>>> 
>>> Many thanks for stepping in.
>>> 
>>> On Fri, Aug 24, 2018 at 6:37 PM Andrey Zagrebin <andrey@data-artisans.com
<mailto:andrey@data-artisans.com>> wrote:
>>> Hi Juho,
>>> 
>>> So it is a per key deduplication job.
>>> 
>>> Yes, I would wait and check the actual output in s3 because it is the main result
of the job and
>>> 
>>> > The late data around the time of taking savepoint might be not included
into the savepoint but it should be behind the snapshotted offset in Kafka.
>>> 
>>> is not a bug, it is a possible behaviour.
>>> 
>>> The savepoint is a snapshot of the data in transient which is already consumed
from Kafka.
>>> Basically the full contents of the window result is split between the savepoint
and what can come after the savepoint'ed offset in Kafka but before the window result is written
into s3. 
>>> 
>>> Allowed lateness should not affect it, I am just saying that the final result
in s3 should include all records after it. 
>>> This is what should be guaranteed but not the contents of the intermediate savepoint.
>>> 
>>> Cheers,
>>> Andrey
>>> 
>>>> On 24 Aug 2018, at 16:52, Juho Autio <juho.autio@rovio.com <mailto:juho.autio@rovio.com>>
wrote:
>>>> 
>>>> Thanks for your answer!
>>>> 
>>>> I check for the missed data from the final output on s3. So I wait until
the next day, then run the same thing re-implemented in batch, and compare the output.
>>>> 
>>>> > The late data around the time of taking savepoint might be not included
into the savepoint but it should be behind the snapshotted offset in Kafka.
>>>> 
>>>> Yes, I would definitely expect that. It seems like there's a bug somewhere.
>>>> 
>>>> > Then it should just come later after the restore and should be reduced
within the allowed lateness into the final result which is saved into s3.
>>>> 
>>>> Well, as far as I know, allowed lateness doesn't play any role here, because
I started running the job with allowedLateness=0, and still get the data loss, while my late
data output doesn't receive anything.
>>>> 
>>>> > Also, is this `DistinctFunction.reduce` just an example or the actual
implementation, basically saving just one of records inside the 24h window in s3? then what
is missing there?
>>>> 
>>>> Yes, it's the actual implementation. Note that there's a keyBy before the
DistinctFunction. So there's one record for each key (which is the combination of a couple
of fields). In practice I've seen that we're missing ~2000-4000 elements on each restore,
and the total output is obviously much more than that.
>>>> 
>>>> Here's the full code for the key selector:
>>>> 
>>>> public class MapKeySelector implements KeySelector<Map<String,String>,
Object> {
>>>> 
>>>>     private final String[] fields;
>>>> 
>>>>     public MapKeySelector(String... fields) {
>>>>         this.fields = fields;
>>>>     }
>>>> 
>>>>     @Override
>>>>     public Object getKey(Map<String, String> event) throws Exception
{
>>>>         Tuple key = Tuple.getTupleClass(fields.length).newInstance();
>>>>         for (int i = 0; i < fields.length; i++) {
>>>>             key.setField(event.getOrDefault(fields[i], ""), i);
>>>>         }
>>>>         return key;
>>>>     }
>>>> }
>>>> 
>>>> And a more exact example on how it's used:
>>>> 
>>>>                 .keyBy(new MapKeySelector("ID", "PLAYER_ID", "FIELD", "KEY_NAME",
"KEY_VALUE"))
>>>>                 .timeWindow(Time.days(1))
>>>>                 .reduce(new DistinctFunction())
>>>> 
>>>> On Fri, Aug 24, 2018 at 5:26 PM Andrey Zagrebin <andrey@data-artisans.com
<mailto:andrey@data-artisans.com>> wrote:
>>>> Hi Juho,
>>>> 
>>>> Where exactly does the data miss? When do you notice that? 
>>>> Do you check it:
>>>> - debugging `DistinctFunction.reduce` right after resume in the middle of
the day 
>>>> or 
>>>> - some distinct records miss in the final output of BucketingSink in s3 after
window result is actually triggered and saved into s3 at the end of the day? is this the main
output?
>>>> 
>>>> The late data around the time of taking savepoint might be not included into
the savepoint but it should be behind the snapshotted offset in Kafka. Then it should just
come later after the restore and should be reduced within the allowed lateness into the final
result which is saved into s3.
>>>> 
>>>> Also, is this `DistinctFunction.reduce` just an example or the actual implementation,
basically saving just one of records inside the 24h window in s3? then what is missing there?
>>>> 
>>>> Cheers,
>>>> Andrey
>>>> 
>>>>> On 23 Aug 2018, at 15:42, Juho Autio <juho.autio@rovio.com <mailto:juho.autio@rovio.com>>
wrote:
>>>>> 
>>>>> I changed to allowedLateness=0, no change, still missing data when restoring
from savepoint.
>>>>> 
>>>>> On Tue, Aug 21, 2018 at 10:43 AM Juho Autio <juho.autio@rovio.com
<mailto:juho.autio@rovio.com>> wrote:
>>>>> I realized that BucketingSink must not play any role in this problem.
This is because only when the 24-hour window triggers, BucketinSink gets a burst of input.
Around the state restoring point (middle of the day) it doesn't get any input, so it can't
lose anything either (right?).
>>>>> 
>>>>> I will next try removing the allowedLateness entirely from the equation.
>>>>> 
>>>>> In the meanwhile, please let me know if you have any suggestions for
debugging the lost data, for example what logs to enable.
>>>>> 
>>>>> We use FlinkKafkaConsumer010 btw. Are there any known issues with that,
that could contribute to lost data when restoring a savepoint?
>>>>> 
>>>>> On Fri, Aug 17, 2018 at 4:23 PM Juho Autio <juho.autio@rovio.com <mailto:juho.autio@rovio.com>>
wrote:
>>>>> Some data is silently lost on my Flink stream job when state is restored
from a savepoint.
>>>>> 
>>>>> Do you have any debugging hints to find out where exactly the data gets
dropped?
>>>>> 
>>>>> My job gathers distinct values using a 24-hour window. It doesn't have
any custom state management.
>>>>> 
>>>>> When I cancel the job with savepoint and restore from that savepoint,
some data is missed. It seems to be losing just a small amount of data. The event time of
lost data is probably around the time of savepoint. In other words the rest of the time window
is not entirely missed – collection works correctly also for (most of the) events that come
in after restoring.
>>>>> 
>>>>> When the job processes a full 24-hour window without interruptions it
doesn't miss anything.
>>>>> 
>>>>> Usually the problem doesn't happen in test environments that have smaller
parallelism and smaller data volumes. But in production volumes the job seems to be consistently
missing at least something on every restore.
>>>>> 
>>>>> This issue has consistently happened since the job was initially created.
It was at first run on an older version of Flink 1.5-SNAPSHOT and it still happens on both
Flink 1.5.2 & 1.6.0.
>>>>> 
>>>>> I'm wondering if this could be for example some synchronization issue
between the kafka consumer offsets vs. what's been written by BucketingSink?
>>>>> 
>>>>> 1. Job content, simplified
>>>>> 
>>>>>         kafkaStream
>>>>>                 .flatMap(new ExtractFieldsFunction())
>>>>>                 .keyBy(new MapKeySelector(1, 2, 3, 4))
>>>>>                 .timeWindow(Time.days(1))
>>>>>                 .allowedLateness(allowedLateness)
>>>>>                 .sideOutputLateData(lateDataTag)
>>>>>                 .reduce(new DistinctFunction())
>>>>>                 .addSink(sink)
>>>>>                 // use a fixed number of output partitions
>>>>>                 .setParallelism(8))
>>>>> 
>>>>> /**
>>>>>  * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
>>>>>  */
>>>>> public class DistinctFunction implements ReduceFunction<java.util.Map<String,
String>> {
>>>>>     @Override
>>>>>     public Map<String, String> reduce(Map<String, String>
value1, Map<String, String> value2) {
>>>>>         return value1;
>>>>>     }
>>>>> }
>>>>> 
>>>>> 2. State configuration
>>>>> 
>>>>> boolean enableIncrementalCheckpointing = true;
>>>>> String statePath = "s3n://bucket/savepoints <>";
>>>>> new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);
>>>>> 
>>>>> Checkpointing Mode	Exactly Once
>>>>> Interval	1m 0s
>>>>> Timeout	10m 0s
>>>>> Minimum Pause Between Checkpoints	1m 0s
>>>>> Maximum Concurrent Checkpoints	1
>>>>> Persist Checkpoints Externally	Enabled (retain on cancellation)
>>>>> 
>>>>> 3. BucketingSink configuration
>>>>> 
>>>>> We use BucketingSink, I don't think there's anything special here, if
not the fact that we're writing to S3.
>>>>> 
>>>>>         String outputPath = "s3://bucket/output <>";
>>>>>         BucketingSink<Map<String, String>> sink = new BucketingSink<Map<String,
String>>(outputPath)
>>>>>                 .setBucketer(new ProcessdateBucketer())
>>>>>                 .setBatchSize(batchSize)
>>>>>                 .setInactiveBucketThreshold(inactiveBucketThreshold)
>>>>>                 .setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
>>>>>         sink.setWriter(new IdJsonWriter());
>>>>> 
>>>>> 4. Kafka & event time
>>>>> 
>>>>> My flink job reads the data from Kafka, using a BoundedOutOfOrdernessTimestampExtractor
on the kafka consumer to synchronize watermarks accross all kafka partitions. We also write
late data to side output, but nothing is written there – if it would, it could explain missed
data in the main output (I'm also sure that our late data writing works, because we previously
had some actual late data which ended up there).
>>>>> 
>>>>> 5. allowedLateness
>>>>> 
>>>>> It may be or may not be relevant that I have also enabled allowedLateness
with 1 minute lateness on the 24-hour window:
>>>>> 
>>>>> If that makes sense, I could try removing allowedLateness entirely? That
would be just to rule out that Flink doesn't have a bug that's related to restoring state
in combination with the allowedLateness feature. After all, all of our data should be in a
good enough order to not be late, given the max out of orderness used on kafka consumer timestamp
extractor.
>>>>> 
>>>>> Thank you in advance!
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>>> 
>> 
>> 
> 
> 


Mime
View raw message