flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mu Kong <kong.mu....@gmail.com>
Subject Re: Regarding BucketingSink
Date Wed, 21 Feb 2018 04:04:33 GMT
Hi Aljoscha,

Thanks for confirming that fact that Flink doesn't clean up pending files.
Is that safe to clean(remove) all the pending files after cancel(w/ or w/o
savepointing) or failover?
If we do that, will we lose some data?

Thanks!

Best,
Mu




On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi <vishal.santoshi@gmail.com>
wrote:

> Sorry, but just wanted to confirm that  the assertion "at-least-once"
> delivery  true if there is a dangling pending file ?
>
> On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <
> vishal.santoshi@gmail.com> wrote:
>
>> That is fine, till flink assure at-least-once semantics ?
>>
>> If the contents of a .pending file, through the turbulence ( restarts etc
>> )  are assured to be in another file than anything starting with "_"
>> underscore will by default ignored by hadoop ( hive or MR etc ).
>>
>>
>>
>> On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <aljoscha@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> Sorry for the confusion. The framework (Flink) does currently not do any
>>> cleanup of pending files, yes.
>>>
>>> Best,
>>> Aljoscha
>>>
>>>
>>> On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santoshi@gmail.com>
>>> wrote:
>>>
>>> >> You should only have these dangling pending files after a
>>> failure-recovery cycle, as you noticed. My suggestion would be to
>>> periodically clean up older pending files.
>>>
>>> A little confused. Is that what the framework should do, or us as part
>>> of some cleanup job ?
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <aljoscha@apache.org>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> The BucketingSink does not clean up pending files on purpose. In a
>>>> distributed setting, and especially with rescaling of Flink operators, it
>>>> is sufficiently hard to figure out which of the pending files you actually
>>>> can delete and which of them you have to leave because they will get moved
>>>> to "final" as part of recovering from a checkpoint on some other parallel
>>>> instance of the sink.
>>>>
>>>> You should only have these dangling pending files after a
>>>> failure-recovery cycle, as you noticed. My suggestion would be to
>>>> periodically clean up older pending files.
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrmann@apache.org> wrote:
>>>>
>>>> Hi Vishal,
>>>>
>>>> what pending files should indeed get eventually finalized. This happens
>>>> on a checkpoint complete notification. Thus, what you report seems not
>>>> right. Maybe Aljoscha can shed a bit more light into the problem.
>>>>
>>>> In order to further debug the problem, it would be really helpful to
>>>> get access to DEBUG log files of a TM which runs the BucketingSink.
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu.biz@gmail.com> wrote:
>>>>
>>>>> Hi Vishal,
>>>>>
>>>>> I have the same concern about save pointing in BucketingSink.
>>>>> As for your question, I think before the pending files get cleared in
>>>>> handleRestoredBucketState .
>>>>> They are finalized in notifyCheckpointComplete
>>>>>
>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L628
>>>>>
>>>>> I'm looking into this part of the source code now, since we are
>>>>> experiencing some unclosed files after check pointing.
>>>>> It would be great if you can share more if you find something new
>>>>> about your problem, which might help with our problem.
>>>>>
>>>>> Best regards,
>>>>> Mu
>>>>>
>>>>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>
>>>>>> -rw-r--r--   3 root hadoop         11 2018-02-14 18:48
>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>>>>>> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15
>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>>>>>> -rw-r--r--   3 root hadoop         11 2018-02-14 21:17
>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>>>>>>
>>>>>>
>>>>>> This is strange, we had a few retries b'coz of an OOM on one of the
>>>>>> TMs and we see this situation. 2 files ( on either sides )  that
were dealt
>>>>>> with fine but a dangling .pending file. I am sure this is not what
is meant
>>>>>> to be.   We I think have an edge condition and looking at the code
it is
>>>>>> not obvious. May be some one who wrote the code can shed some light
as to
>>>>>> how can this happen.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> without --allowNonRestoredState, on a suspend/resume we do see
the
>>>>>>> length file along with the finalized file ( finalized during
resume )
>>>>>>>
>>>>>>> -rw-r--r--   3 root hadoop         10 2018-02-09 13:57
>>>>>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>>>>>>>
>>>>>>> that does makes much more sense.
>>>>>>>
>>>>>>> I guess we should document --allowNonRestoredState better ? It
>>>>>>> seems it actually drops state ?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> This is 1.4 BTW.  I am not sure that I am reading this correctly
>>>>>>>> but the lifecycle of cancel/resume is 2 steps
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 1. Cancel job with SP
>>>>>>>>
>>>>>>>>
>>>>>>>> closeCurrentPartFile
>>>>>>>>
>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549
>>>>>>>>
>>>>>>>> is called from close()
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416
>>>>>>>>
>>>>>>>>
>>>>>>>> and that moves files to pending state.  That I would presume
is
>>>>>>>> called when one does a cancel.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2. The restore on resume
>>>>>>>>
>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369
>>>>>>>>
>>>>>>>> calls
>>>>>>>>
>>>>>>>> handleRestoredBucketState
>>>>>>>>
>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704
>>>>>>>>
>>>>>>>> clears the pending files from state without finalizing them?
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> That does not seem to be right. I must be reading the code
totally
>>>>>>>> wrong ?
>>>>>>>>
>>>>>>>> I am not sure also whether --allowNonRestoredState is skipping
>>>>>>>> getting the state . At least https://ci.apache.org/pr
>>>>>>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints
is not
>>>>>>>> exactly clear what it does if we add an operator ( GDF I
think will add a
>>>>>>>> new operator in the DAG without state even if stateful, in
my case the Map
>>>>>>>> operator is not even stateful )
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks and please bear with me if this is all something pretty
>>>>>>>> simple.
>>>>>>>>
>>>>>>>> Vishal
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> What should be the behavior of BucketingSink vis a vis
state (
>>>>>>>>> pending , inprogess and finalization ) when we suspend
and resume ?
>>>>>>>>>
>>>>>>>>> So I did this
>>>>>>>>>
>>>>>>>>> * I had a pipe writing to hdfs suspend and resume using
>>>>>>>>> --allowNonRestoredState as in I had added a harmless
MapOperator
>>>>>>>>> ( stateless ).
>>>>>>>>>
>>>>>>>>> * I see that a file on hdfs, the file being written to
( before
>>>>>>>>> the cancel with save point )  go into a pending state
>>>>>>>>> _part-0-21.pending
>>>>>>>>>
>>>>>>>>> * I see a new file being written to in the resumed pipe
>>>>>>>>> _part-0-22.in-progress.
>>>>>>>>>
>>>>>>>>> What  I do not see is the file in  _part-0-21.pending
being
>>>>>>>>> finalized ( as in renamed to a just part-0-21. I would
have assumed that
>>>>>>>>> would be the case in this controlled suspend/resume circumstance.
Further
>>>>>>>>> it is a rename and hdfs mv is not an expensive operation.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Am I understanding the process correct and it yes any
pointers ?
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>>
>>>>>>>>> Vishal
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>
>

Mime
View raw message