flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vishal Santoshi <vishal.santo...@gmail.com>
Subject Re: Regarding BucketingSink
Date Wed, 21 Feb 2018 13:52:58 GMT
Thank you Fabian,

    What is more important ( and I think you might have addressed it in
your post so sorry for being a little obtuse ) is that deleting them does
not violate "at-least-once" delivery.  And if that is a definite than we
are fine with it, though we will test it further.

Thanks and Regards.



On Wed, Feb 21, 2018 at 5:34 AM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi Vishal, hi Mu,
>
> After the savepoint state has been written, the sink might start new
> .in-progress files. These files are not part of the savepoint but renamed
> to .pending in close().
> On restore all pending files that are part of the savepoint are moved into
> final state (and possibly truncated). See handlePendingInProgressFiles()
> method.
> Pending files that are not part of the savepoint (because they were
> created later between taking the savepoint and shutting the job down) are
> not touched and remain as .pending files.
>
> These should be the .pending files that you observe. Since they contain
> data that is not part of the savepoint, it should be save to delete them.
> If you keep them, you will have at-least-once output.
>
> Best, Fabian
>
>
> 2018-02-21 5:04 GMT+01:00 Mu Kong <kong.mu.biz@gmail.com>:
>
>> Hi Aljoscha,
>>
>> Thanks for confirming that fact that Flink doesn't clean up pending files.
>> Is that safe to clean(remove) all the pending files after cancel(w/ or
>> w/o savepointing) or failover?
>> If we do that, will we lose some data?
>>
>> Thanks!
>>
>> Best,
>> Mu
>>
>>
>>
>>
>> On Wed, Feb 21, 2018 at 5:27 AM, Vishal Santoshi <
>> vishal.santoshi@gmail.com> wrote:
>>
>>> Sorry, but just wanted to confirm that  the assertion "at-least-once"
>>> delivery  true if there is a dangling pending file ?
>>>
>>> On Mon, Feb 19, 2018 at 2:21 PM, Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> That is fine, till flink assure at-least-once semantics ?
>>>>
>>>> If the contents of a .pending file, through the turbulence ( restarts
>>>> etc )  are assured to be in another file than anything starting with "_"
>>>> underscore will by default ignored by hadoop ( hive or MR etc ).
>>>>
>>>>
>>>>
>>>> On Mon, Feb 19, 2018 at 11:03 AM, Aljoscha Krettek <aljoscha@apache.org
>>>> > wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Sorry for the confusion. The framework (Flink) does currently not do
>>>>> any cleanup of pending files, yes.
>>>>>
>>>>> Best,
>>>>> Aljoscha
>>>>>
>>>>>
>>>>> On 19. Feb 2018, at 17:01, Vishal Santoshi <vishal.santoshi@gmail.com>
>>>>> wrote:
>>>>>
>>>>> >> You should only have these dangling pending files after a
>>>>> failure-recovery cycle, as you noticed. My suggestion would be to
>>>>> periodically clean up older pending files.
>>>>>
>>>>> A little confused. Is that what the framework should do, or us as part
>>>>> of some cleanup job ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 19, 2018 at 10:47 AM, Aljoscha Krettek <
>>>>> aljoscha@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> The BucketingSink does not clean up pending files on purpose. In
a
>>>>>> distributed setting, and especially with rescaling of Flink operators,
it
>>>>>> is sufficiently hard to figure out which of the pending files you
actually
>>>>>> can delete and which of them you have to leave because they will
get moved
>>>>>> to "final" as part of recovering from a checkpoint on some other
parallel
>>>>>> instance of the sink.
>>>>>>
>>>>>> You should only have these dangling pending files after a
>>>>>> failure-recovery cycle, as you noticed. My suggestion would be to
>>>>>> periodically clean up older pending files.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>>
>>>>>> On 19. Feb 2018, at 16:37, Till Rohrmann <trohrmann@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>> Hi Vishal,
>>>>>>
>>>>>> what pending files should indeed get eventually finalized. This
>>>>>> happens on a checkpoint complete notification. Thus, what you report
seems
>>>>>> not right. Maybe Aljoscha can shed a bit more light into the problem.
>>>>>>
>>>>>> In order to further debug the problem, it would be really helpful
to
>>>>>> get access to DEBUG log files of a TM which runs the BucketingSink.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Fri, Feb 16, 2018 at 5:26 AM, Mu Kong <kong.mu.biz@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Vishal,
>>>>>>>
>>>>>>> I have the same concern about save pointing in BucketingSink.
>>>>>>> As for your question, I think before the pending files get cleared
>>>>>>> in handleRestoredBucketState .
>>>>>>> They are finalized in notifyCheckpointComplete
>>>>>>>
>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L628
>>>>>>>
>>>>>>> I'm looking into this part of the source code now, since we are
>>>>>>> experiencing some unclosed files after check pointing.
>>>>>>> It would be great if you can share more if you find something
new
>>>>>>> about your problem, which might help with our problem.
>>>>>>>
>>>>>>> Best regards,
>>>>>>> Mu
>>>>>>>
>>>>>>> On Thu, Feb 15, 2018 at 11:34 AM, Vishal Santoshi <
>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>
>>>>>>>> -rw-r--r--   3 root hadoop         11 2018-02-14 18:48
>>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-18.valid-length
>>>>>>>> -rw-r--r--   3 root hadoop   54053518 2018-02-14 19:15
>>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-19.pending
>>>>>>>> -rw-r--r--   3 root hadoop         11 2018-02-14 21:17
>>>>>>>> /kraken/sessions_uuid_csid_v3/dt=2018-02-14/_part-0-20.valid-length
>>>>>>>>
>>>>>>>>
>>>>>>>> This is strange, we had a few retries b'coz of an OOM on
one of the
>>>>>>>> TMs and we see this situation. 2 files ( on either sides
)  that were dealt
>>>>>>>> with fine but a dangling .pending file. I am sure this is
not what is meant
>>>>>>>> to be.   We I think have an edge condition and looking at
the code it is
>>>>>>>> not obvious. May be some one who wrote the code can shed
some light as to
>>>>>>>> how can this happen.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Feb 9, 2018 at 2:01 PM, Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> without --allowNonRestoredState, on a suspend/resume
we do see
>>>>>>>>> the length file along with the finalized file ( finalized
during resume )
>>>>>>>>>
>>>>>>>>> -rw-r--r--   3 root hadoop         10 2018-02-09 13:57
>>>>>>>>> /vishal/sessionscid/dt=2018-02-09/_part-0-28.valid-length
>>>>>>>>>
>>>>>>>>> that does makes much more sense.
>>>>>>>>>
>>>>>>>>> I guess we should document --allowNonRestoredState better
? It
>>>>>>>>> seems it actually drops state ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Feb 9, 2018 at 1:37 PM, Vishal Santoshi <
>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> This is 1.4 BTW.  I am not sure that I am reading
this correctly
>>>>>>>>>> but the lifecycle of cancel/resume is 2 steps
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 1. Cancel job with SP
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> closeCurrentPartFile
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L549
>>>>>>>>>>
>>>>>>>>>> is called from close()
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L416
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> and that moves files to pending state.  That I would
presume is
>>>>>>>>>> called when one does a cancel.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2. The restore on resume
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L369
>>>>>>>>>>
>>>>>>>>>> calls
>>>>>>>>>>
>>>>>>>>>> handleRestoredBucketState
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/flink/blob/master/flink-connectors
>>>>>>>>>> /flink-connector-filesystem/src/main/java/org/apache/flink/s
>>>>>>>>>> treaming/connectors/fs/bucketing/BucketingSink.java#L704
>>>>>>>>>>
>>>>>>>>>> clears the pending files from state without finalizing
them?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> That does not seem to be right. I must be reading
the code
>>>>>>>>>> totally wrong ?
>>>>>>>>>>
>>>>>>>>>> I am not sure also whether --allowNonRestoredState
is skipping
>>>>>>>>>> getting the state . At least https://ci.apache.org/pr
>>>>>>>>>> ojects/flink/flink-docs-release-1.4/ops/cli.html#savepoints
is
>>>>>>>>>> not exactly clear what it does if we add an operator
( GDF I think will add
>>>>>>>>>> a new operator in the DAG without state even if stateful,
in my case the
>>>>>>>>>> Map operator is not even stateful )
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks and please bear with me if this is all something
pretty
>>>>>>>>>> simple.
>>>>>>>>>>
>>>>>>>>>> Vishal
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Feb 9, 2018 at 11:54 AM, Vishal Santoshi
<
>>>>>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> What should be the behavior of BucketingSink
vis a vis state (
>>>>>>>>>>> pending , inprogess and finalization ) when we
suspend and resume ?
>>>>>>>>>>>
>>>>>>>>>>> So I did this
>>>>>>>>>>>
>>>>>>>>>>> * I had a pipe writing to hdfs suspend and resume
using
>>>>>>>>>>> --allowNonRestoredState as in I had added a harmless
>>>>>>>>>>> MapOperator ( stateless ).
>>>>>>>>>>>
>>>>>>>>>>> * I see that a file on hdfs, the file being written
to ( before
>>>>>>>>>>> the cancel with save point )  go into a pending
state
>>>>>>>>>>> _part-0-21.pending
>>>>>>>>>>>
>>>>>>>>>>> * I see a new file being written to in the resumed
pipe
>>>>>>>>>>> _part-0-22.in-progress.
>>>>>>>>>>>
>>>>>>>>>>> What  I do not see is the file in  _part-0-21.pending
being
>>>>>>>>>>> finalized ( as in renamed to a just part-0-21.
I would have assumed that
>>>>>>>>>>> would be the case in this controlled suspend/resume
circumstance. Further
>>>>>>>>>>> it is a rename and hdfs mv is not an expensive
operation.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Am I understanding the process correct and it
yes any pointers ?
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>>
>>>>>>>>>>> Vishal
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message