flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roshan Punnoose <rosh...@gmail.com>
Subject Re: Parquet S3 Sink Part files are not rolling over with checkpoint
Date Thu, 09 Apr 2020 12:50:16 GMT
Btw, I ran the same exact code on a local Flink cluster run with
`./bin/start-cluster.sh` on my local machine. With `s3a` it did not work,
the part files do not roll over; however, with the local filesystem it
works perfectly. Should I be looking at the S3Committer in Flink to see if
there is something odd going on?

On Thu, Apr 9, 2020 at 7:49 AM Roshan Punnoose <roshanp@gmail.com> wrote:

> Nope just the s3a. I'll keep looking around to see if there is anything
> else I can see. If you think of anything else to try, let me know.
>
> On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas <kkloudas@gmail.com> wrote:
>
>> It should not be a problem because from what you posted, you are using
>> "s3a" as the scheme for s3.
>> Are you using "s3p" for Presto? This should also be done in order for
>> Flink to understand where to use the one or the other.
>>
>> On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose <roshanp@gmail.com> wrote:
>> >
>> > Lastly, could it be the way I built the flink image for kube? I added
>> both the presto and Hadoop plugins
>> >
>> > On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <roshanp@gmail.com> wrote:
>> >>
>> >> Sorry realized this came off the user list by mistake. Adding the
>> thread back in.
>> >>
>> >> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <roshanp@gmail.com>
>> wrote:
>> >>>
>> >>> Yes sorry, no errors on the task manager. However, I am new to flink
>> so don't know all the places to look for the logs. Been looking at the task
>> manager logs and don't see any exceptions there. Not sure where to look for
>> s3 exceptions in particular.
>> >>>
>> >>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <kkloudas@gmail.com>
>> wrote:
>> >>>>
>> >>>> Yes, this is why I reached out for further information.
>> >>>>
>> >>>> Incrementing the part counter is the responsibility of the
>> >>>> StreamingFileSink, whose code is FS-agnostic, so it should also
fail
>> >>>> in the local FS.
>> >>>> Now if it is on the S3 side, it would help if you have any more
info,
>> >>>> for example any logs from S3, to see if anything went wrong on their
>> >>>> end.
>> >>>>
>> >>>> So your logs refer to normal execution, i.e. no failures and no
>> >>>> restarting, right?
>> >>>>
>> >>>> Cheers,
>> >>>> Kostas
>> >>>>
>> >>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <roshanp@gmail.com>
>> wrote:
>> >>>> >
>> >>>> > Surprisingly the same code running against the local filesystem
>> works perfectly. The part counter increments correctly.
>> >>>> >
>> >>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <kkloudas@gmail.com>
>> wrote:
>> >>>> >>
>> >>>> >> Hi Roshan,
>> >>>> >>
>> >>>> >> Your logs refer to a simple run without any failures or
re-running
>> >>>> >> from a savepoint, right?
>> >>>> >>
>> >>>> >> I am asking because I am trying to reproduce it by running
a
>> modified
>> >>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
>> >>>> >> The ITCase runs against the local filesystem, and not S3,
but I
>> added
>> >>>> >> the OutputFileConfig and it seems that the part counter
is
>> increases
>> >>>> >> as expected.
>> >>>> >>
>> >>>> >> Is there any other information that would help us reproduce
the
>> issue?
>> >>>> >>
>> >>>> >> Cheers,
>> >>>> >> Kostas
>> >>>> >>
>> >>>> >> [1]
>> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
>> >>>> >>
>> >>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <roshanp@gmail.com>
>> wrote:
>> >>>> >> >
>> >>>> >> > Hi,
>> >>>> >> >
>> >>>> >> > I am trying to get the parquet writer to write to
s3; however,
>> the files do not seem to be rolling over. The same file "part-0-0.parquet"
>> is being created each time. Like the 'partCounter" is not being updated?
>> Maybe the Bucket is being recreated each time? I don't really know... Here
>> are some logs:
>> >>>> >> >
>> >>>> >> > 2020-04-09 01:28:10,350 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 checkpointing for checkpoint with id=2 (max part counter=2).
>> >>>> >> > 2020-04-09 01:28:10,589 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 received completion notification for checkpoint with id=2.
>> >>>> >> > 2020-04-09 01:28:10,589 INFO
>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>> >>>> >> > 2020-04-09 01:29:10,350 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 checkpointing for checkpoint with id=3 (max part counter=3).
>> >>>> >> > 2020-04-09 01:29:10,520 INFO
>> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
>> 0 received completion notification for checkpoint with id=3.
>> >>>> >> > 2020-04-09 01:29:10,521 INFO
>> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
>> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
>> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
>> >>>> >> > And a part of my code:
>> >>>> >> >
>> >>>> >> > ```
>> >>>> >> >
>> >>>> >> > StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> >>>> >> >
>> >>>> >> > //        env.setParallelism(2);
>> >>>> >> >         env.enableCheckpointing(60000L);
>> >>>> >> > ///PROPERTIES Added
>> >>>> >> >         Schema schema = bro_conn.getClassSchema();
>> >>>> >> >
>> >>>> >> >         OutputFileConfig config = OutputFileConfig
>> >>>> >> >                 .builder()
>> >>>> >> >                 .withPartSuffix(".parquet")
>> >>>> >> >                 .build();
>> >>>> >> >
>> >>>> >> >         final StreamingFileSink<GenericRecord>
sink =
>> StreamingFileSink
>> >>>> >> >                 .forBulkFormat(new
>> Path("s3a://<bucket>/bro_conn/"),
>> ParquetAvroWriters.forGenericRecord(schema))
>> >>>> >> > //
>> .withRollingPolicy(OnCheckpointRollingPolicy.build())
>> >>>> >> >                 .withOutputFileConfig(config)
>> >>>> >> > //                .withBucketAssigner(new
>> PartitioningBucketAssigner())
>> >>>> >> >                 .build();
>> >>>> >> >
>> >>>> >> >         DataStream<String> kinesis = env.addSource(new
>> FlinkKinesisConsumer<>(
>> >>>> >> >                 "kinesis", new SimpleStringSchema(),
>> consumerConfig));
>> >>>> >> >
>> >>>> >> >         kinesis.flatMap(new JsonAvroParser())
>> >>>> >> >                 .addSink(sink);
>> >>>> >> >
>> >>>> >> >
>> >>>> >> >         env.execute("Bro Conn");
>> >>>> >> >
>> >>>> >> > ```
>> >>>> >> >
>> >>>> >> > I'm using Flink 1.10.0, and running in Kubernetes.
I also
>> created a custom image to add the presto/hadoop plugin.
>> >>>> >> >
>> >>>> >> > Thanks again!
>>
>

Mime
View raw message