flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vishal Santoshi <vishal.santo...@gmail.com>
Subject Re: fllink 1.7.1 and RollingFileSink
Date Sun, 10 Feb 2019 18:17:26 GMT
Thanks for the quick reply.

I am confused. If this was a more full featured BucketingSink ,I would
imagine that  based on shouldRollOnEvent and shouldRollOnEvent, an in
progress file could go into pending phase and on checkpoint the pending
part file would be  finalized. For exactly once any files ( in progress
file ) will have a length of the file  snapshotted to the checkpoint  and
used to truncate the file ( if supported ) or dropped as a part-length file
( if truncate not supported )  if a resume from a checkpoint was to happen,
to indicate what part of the the finalized file ( finalized when resumed )
was valid . and  I had always assumed ( and there is no doc otherwise )
that shouldRollOnCheckpoint would be similar to the other 2 apart from the
fact it does the roll and finalize step in a single step on a checkpoint.


Am I better off using BucketingSink ?  When to use BucketingSink and when
to use RollingSink is not clear at all, even though at the surface it sure
looks RollingSink is a better version of .BucketingSink ( or not )

Regards.



On Sun, Feb 10, 2019 at 12:09 PM Timothy Victor <victtim@gmail.com> wrote:

> I think the only rolling policy that can be used is
> CheckpointRollingPolicy to ensure exactly once.
>
> Tim
>
> On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <vishal.santoshi@gmail.com
> wrote:
>
>> Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
even though it looks it could.
>>
>>
>> This code for example
>>
>>
>>         StreamingFileSink
>>                 .forRowFormat(new Path(PATH),
>>                         new SimpleStringEncoder<KafkaRecord>())
>>                 .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING,
ZONE_ID))
>>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>()
{
>>                                        @Override
>>                                        public boolean shouldRollOnCheckpoint(PartFileInfo<String>
partFileState) throws IOException {
>>                                            return false;
>>                                        }
>>
>>                                        @Override
>>                                        public boolean shouldRollOnEvent(PartFileInfo<String>
partFileState,
>>                                                                         KafkaRecord
element) throws IOException {
>>                                            return partFileState.getSize() > 1024
* 1024 * 1024l;
>>                                        }
>>
>>                                        @Override
>>                                        public boolean shouldRollOnProcessingTime(PartFileInfo<String>
partFileState, long currentTime) throws IOException {
>>                                            return currentTime - partFileState.getLastUpdateTime()
> 10 * 60 * 1000l ||
>>                                                    currentTime - partFileState.getCreationTime()
> 120 * 60 * 1000l;
>>                                        }
>>                                    }
>>                 )
>>                 .build();
>>
>>
>> few things I see and am not sure I follow about the new RollingFileSink  vis a vis
BucketingSink
>>
>>
>> 1. I do not ever see the inprogress file go to the pending state, as in renamed as
pending, as was the case in Bucketing Sink.  I would assume that it would be pending and then
>>
>>    finalized on checkpoint for exactly once semantics ?
>>
>>
>> 2. I see dangling inprogress files at the end of the day. I would assume that the
withBucketCheckInterval set to 1 minute by default, the shouldRollOnProcessingTime should
kick in ?
>>
>>  3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14
. What is that additional suffix ?
>>
>>
>>
>>
>> I have the following set up on the env
>>
>> env.enableCheckpointing(10 * 60000);
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
>> StateBackend stateBackEnd = new MemoryStateBackend();
>> env.setStateBackend(stateBackEnd);
>>
>>
>> Regards.
>>
>>
>>
>>
>>

Mime
View raw message