flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Victor <vict...@gmail.com>
Subject Re: fllink 1.7.1 and RollingFileSink
Date Sun, 10 Feb 2019 17:09:15 GMT
I think the only rolling policy that can be used is CheckpointRollingPolicy
to ensure exactly once.

Tim

On Sun, Feb 10, 2019, 9:13 AM Vishal Santoshi <vishal.santoshi@gmail.com
wrote:

> Can StreamingFileSink be used instead of https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/connectors/filesystem_sink.html,
even though it looks it could.
>
>
> This code for example
>
>
>         StreamingFileSink
>                 .forRowFormat(new Path(PATH),
>                         new SimpleStringEncoder<KafkaRecord>())
>                 .withBucketAssigner(new KafkaRecordBucketAssigner(DEFAULT_FORMAT_STRING,
ZONE_ID))
>                 .withRollingPolicy(new RollingPolicy<KafkaRecord, String>() {
>                                        @Override
>                                        public boolean shouldRollOnCheckpoint(PartFileInfo<String>
partFileState) throws IOException {
>                                            return false;
>                                        }
>
>                                        @Override
>                                        public boolean shouldRollOnEvent(PartFileInfo<String>
partFileState,
>                                                                         KafkaRecord element)
throws IOException {
>                                            return partFileState.getSize() > 1024 *
1024 * 1024l;
>                                        }
>
>                                        @Override
>                                        public boolean shouldRollOnProcessingTime(PartFileInfo<String>
partFileState, long currentTime) throws IOException {
>                                            return currentTime - partFileState.getLastUpdateTime()
> 10 * 60 * 1000l ||
>                                                    currentTime - partFileState.getCreationTime()
> 120 * 60 * 1000l;
>                                        }
>                                    }
>                 )
>                 .build();
>
>
> few things I see and am not sure I follow about the new RollingFileSink  vis a vis BucketingSink
>
>
> 1. I do not ever see the inprogress file go to the pending state, as in renamed as pending,
as was the case in Bucketing Sink.  I would assume that it would be pending and then
>
>    finalized on checkpoint for exactly once semantics ?
>
>
> 2. I see dangling inprogress files at the end of the day. I would assume that the withBucketCheckInterval
set to 1 minute by default, the shouldRollOnProcessingTime should kick in ?
>
>  3. The inprogress files are  like .part-4-45.inprogress.3ed08b67-2b56-4d31-a233-b1c146cfcf14
. What is that additional suffix ?
>
>
>
>
> I have the following set up on the env
>
> env.enableCheckpointing(10 * 60000);
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> env.setRestartStrategy(fixedDelayRestart(4, org.apache.flink.api.common.time.Time.minutes(1)));
> StateBackend stateBackEnd = new MemoryStateBackend();
> env.setStateBackend(stateBackEnd);
>
>
> Regards.
>
>
>
>
>

Mime
View raw message