flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jürgen Thomann <juergen.thom...@innogames.com>
Subject Re: Behavior of the cancel command
Date Sat, 29 Apr 2017 07:15:30 GMT
Hi Aljoscha,

In my case the valid-length file created contains a value which e.g. 
says 100 MB are valid to read for exactly once but the file with the 
data is only 95 MB large. As I understand it the valid-length file 
contains the length of the file at the checkpoint. This does also not 
happen for all files (3 HDFS sinks each with a parallelism of 2). For 
some parts the file size and the value in the valid-length file match 

After looking now over the checkpoint code in BucketingSink I looked 
into the hsync behavior again and found the following page: 
After this I downloaded the file with the hdfs dfs tool and actually the 
file is now even larger than the valid-length file. I checked this 
against the things I did before (Impala and hive select count query, and 
Hue download of files and wc -l) and this 3 ways result in the same 
amount of lines but hdfs dfs -cat <filename> | wc -l gives a much larger 

So my conclusion would be that the data is written and not exactly lost 
as I thought, but for my use case not visible because the files are not 
properly closed during cancel and the namenode is not aware of the 
flushed data. So I could imagine 2 ways out of this: 1. implement the 
hsync as stated at the Stack Overflow page or 2. ensure that files are 
properly closed during cancel.


On 28.04.2017 17:38, Aljoscha Krettek wrote:
> Hi Jürgen,
> Is there missing data with respect to what should have been written at 
> the time of the cancel or when the last checkpoint (or in that case, 
> the savepoint) was performed. I’m asking because the cancel command is 
> only sent out once the savepoint has been completed, as can be seen at 
> [1]. If the savepoint is complete this also means that the snapshot 
> method of the BucketingSink must have done it’s work, i.e. that it 
> also flushed all files, which is done in [2]. There’s always the 
> possibility of a bug, however, so we’ll have to look into this together.
> Best,
> Aljoscha
> [1] 
> https://github.com/apache/flink/blob/c22efce098c14e8f08bad1e0065dbd02df6e4dbb/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala#L607-L607
> [2] 
> https://github.com/apache/flink/blob/b4c60a942fe07e355dd49ed2aab3c0a7ae94285d/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
>> On 27. Apr 2017, at 10:27, Jürgen Thomann 
>> <juergen.thomann@innogames.com 
>> <mailto:juergen.thomann@innogames.com>> wrote:
>> Hi,
>> I had some time ago problems with writing data to Hadoop with the 
>> BucketingSink and losing data in case of cancel with savepoint 
>> because flush/sync command was interrupted. I tried changing Hadoop 
>> settings as suggested but had no luck at the end and looked into the 
>> Flink code. If I understand the code correctly it behaves the 
>> following way:
>> 1. Start a Watchdog thread if we have a cancellation timeout set
>> 2. invoke cancel on the sink/task, but do not wait for it to finish
>> 3. destroy buffer pool and a release resources
>> 4. send initial interrupt to the sink/task
>> 5. call join on the sink/task and ignore InterruptedException
>> 6. let the watchdog send more interrupts if needed and throw fatal 
>> error if timeout is reached
>> In my case the BucketingSink does not has enough time to flush 
>> everything before the initial interrupt is sent and some files are 
>> not closed properly which causes the missing data in Hadoop in my 
>> understanding.
>> Is my understanding correct and if yes, do you know a way to get 
>> around this behavior to let the close function finish the sync for 
>> all files?
>> Best,
>> Jürgen

View raw message