flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@ververica.com>
Subject Re: fllink 1.7.1 and RollingFileSink
Date Thu, 14 Feb 2019 12:50:35 GMT
Hi Vishal,

For the StreamingFileSink vs Rolling/BucketingSink:
 - you can use the StreamingFileSink instead of the Rolling/BucketingSink.
You can see the StreamingFileSink as an evolution of the previous two.

In the StreamingFileSink the files in Pending state are not renamed, but
they keep their "*in-progress*" name. This is the reason why you do not see
.pending files anymore.

What Timothy said for bulk formats is correct. They only support
"onCheckpoint" rolling policy.

Now for the second issue about deployment, I would recommend to open a new
thread so that people can see from the title if they can help or not.
In addition, it is good to have the title indicating the content of the
topic for the community. The mailing list is searchable by search engines,
so if someone
has a similar question, the title will help to retrieve the relevant thread.

Cheers,
Kostas


On Thu, Feb 14, 2019 at 12:09 PM Vishal Santoshi <vishal.santoshi@gmail.com>
wrote:

> Thanks Fabian,
>
>  more questions
>
> 1. I had on k8s standlone job
> env.getCheckpointConfig().setFailOnCheckpointingErrors(true)// the
> default. The job failed on chkpoint and I would have imagined that under HA
> the job would restore from the last checkpoint but it did not ( The UI
> showed the job had restarted without a restore . The state was wiped out
> and the job was relaunched but with no state.
>
> 2. I had the inprogress files from that failed instance and that is
> consistent with no restored state.
>
> Thus there are few  questions
>
> 1. In k8s and with stand alone job cluster, have we tested the scenerio of
> the* container failing* ( the pod remained in tact ) and restore ?  In
> this case the pod remained up and running but it was definitely a clean
> relaunch of the container the pod was executing.
>
>
> 2. Did I have any configuration missing . given the below  ?
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(30 * 60000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.getCheckpointConfig().setFailOnCheckpointingErrors(false);
> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
> StateBackend stateBackEnd = new FsStateBackend(
>         new org.apache.flink.core.fs.Path(
>                 "........"));
> env.setStateBackend(stateBackEnd);
>
>
> 3. What is the nature of RollingFileSink ?  How does it enable exactly
> once semantics ( or does it not . ) ?
>
> Any help will be appreciated.
>
> Regards.
>
>
>
>
>
>
>
>
>
> On Mon, Feb 11, 2019 at 5:00 AM Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Hi Vishal,
>>
>> Kostas (in CC) should be able to help here.
>>
>> Best, Fabian
>>
>> Am Mo., 11. Feb. 2019 um 00:05 Uhr schrieb Vishal Santoshi <
>> vishal.santoshi@gmail.com>:
>>
>>> Any one ?
>>>
>>> On Sun, Feb 10, 2019 at 2:07 PM Vishal Santoshi <
>>> vishal.santoshi@gmail.com> wrote:
>>>
>>>> You don't have to. Thank you for the input.
>>>>
>>>> On Sun, Feb 10, 2019 at 1:56 PM Timothy Victor <victtim@gmail.com>
>>>> wrote:
>>>>
>>>>> My apologies for not seeing your use case properly.   The constraint
>>>>> on rolling policy is only applicable for bulk formats such as Parquet
as
>>>>> highlighted in the docs.
>>>>>
>>>>> As for your questions, I'll have to defer to others more familiar with
>>>>> it.   I mostly just use bulk formats such as avro and parquet.
>>>>>
>>>>> Tim
>>>>>
>>>>>
>>>>> On Sun, Feb 10, 2019, 12:40 PM Vishal Santoshi <
>>>>> vishal.santoshi@gmail.com wrote:
>>>>>
>>>>>> That said the in the DefaultRollingPolicy it seems the check is on
>>>>>> the file size ( mimics the check shouldRollOnEVent()).
>>>>>>
>>>>>> I guess the question is
>>>>>>
>>>>>> Is  the call to shouldRollOnCheckPoint.  done by the checkpointing
>>>>>> thread ?
>>>>>>
>>>>>> Are the calls to the other 2 methods shouldRollOnEVent and
>>>>>> shouldRollOnProcessingTIme done on the execution thread  as in inlined
?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sun, Feb 10, 2019 at 1:17 PM Vishal Santoshi <
>>>>>> vishal.santoshi@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks for the quick reply.
>>>>>>>
>>>>>>> I am confused. If this was a more full featured BucketingSink
,I
>>>>>>> would imagine that  based on shouldRollOnEvent and shouldRollOnEvent,
>>>>>>> an in progress file could go into pending phase and on checkpoint
>>>>>>> the pending part file would be  finalized. For exactly once any
files ( in
>>>>>>> progress file ) will have a length of the file  snapshotted to
the
>>>>>>> checkpoint  and used to truncate the file ( if supported ) or
dropped as a
>>>>>>> part-length file ( if truncate not supported )  if a resume from
a
>>>>>>> checkpoint was to happen, to indicate what part of the the finalized
file (
>>>>>>> finalized when resumed ) was valid . and  I had always assumed
( and there
>>>>>>> is no doc otherwise ) that shouldRollOnCheckpoint would be similar
>>>>>>> to the other 2 apart from the fact it does the roll and finalize
step in a
>>>>>>> single step on a checkpoint.
>>>>>>>
>>>>>>>
>>>>>>> Am I better off using BucketingSink ?  When to use BucketingSink
and
>>>>>>> when to use RollingSink is not clear at all, even though at the
surface it
>>>>>>> sure looks RollingSink is a better version of .BucketingSink
( or not )
>>>>>>>
>>>>>>> Regards.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <victtim@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I think the only rolling policy that can be used is
>>>>>>>> CheckpointRollingPolicy to ensure exactly once.
>>>>>>>>
>>>>>>>> Tim
>>>>>>>>
>>>>>>>> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <
>>>>>>>> vishal.santoshi@gmail.com wrote:
>>>>>>>>
>>>>>>>>> Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
even though it looks it could.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This code for example
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>         StreamingFileSink
>>>>>>>>>                 .forRowFormat(new Path(PATH),
>>>>>>>>>                         new SimpleStringEncoder<KafkaRecord>())
>>>>>>>>>                 .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING,
ZONE_ID))
>>>>>>>>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord,
String>() {
>>>>>>>>>                                        @Override
>>>>>>>>>                                        public boolean
shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
>>>>>>>>>                                            return false;
>>>>>>>>>                                        }
>>>>>>>>>
>>>>>>>>>                                        @Override
>>>>>>>>>                                        public boolean
shouldRollOnEvent(PartFileInfo<String> partFileState,
>>>>>>>>>                                                     
                   KafkaRecord element) throws IOException {
>>>>>>>>>                                            return partFileState.getSize()
> 1024 * 1024 * 1024l;
>>>>>>>>>                                        }
>>>>>>>>>
>>>>>>>>>                                        @Override
>>>>>>>>>                                        public boolean
shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws
IOException {
>>>>>>>>>                                            return currentTime
- partFileState.getLastUpdateTime() > 10 * 60 * 1000l ||
>>>>>>>>>                                                    currentTime
- partFileState.getCreationTime() > 120 * 60 * 1000l;
>>>>>>>>>                                        }
>>>>>>>>>                                    }
>>>>>>>>>                 )
>>>>>>>>>                 .build();
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> few things I see and am not sure I follow about the new
RollingFileSink  vis a vis BucketingSink
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 1. I do not ever see the inprogress file go to the pending
state, as in renamed as pending, as was the case in Bucketing Sink.  I would assume that it
would be pending and then
>>>>>>>>>
>>>>>>>>>    finalized on checkpoint for exactly once semantics
?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2. I see dangling inprogress files at the end of the
day. I would assume that the withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime
should kick in ?
>>>>>>>>>
>>>>>>>>>  3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14
. What is that additional suffix ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> I have the following set up on the env
>>>>>>>>>
>>>>>>>>> env.enableCheckpointing(10 * 60000);
>>>>>>>>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>>>>>>>> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
>>>>>>>>> StateBackend stateBackEnd = new MemoryStateBackend();
>>>>>>>>> env.setStateBackend(stateBackEnd);
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Regards.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>

-- 

Kostas Kloudas | Software Engineer


<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Data Artisans GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen

Mime
View raw message