flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roshan Punnoose <rosh...@gmail.com>
Subject Re: Parquet S3 Sink Part files are not rolling over with checkpoint
Date Thu, 09 Apr 2020 11:49:13 GMT
Nope just the s3a. I'll keep looking around to see if there is anything
else I can see. If you think of anything else to try, let me know.

On Thu, Apr 9, 2020, 7:41 AM Kostas Kloudas <kkloudas@gmail.com> wrote:

> It should not be a problem because from what you posted, you are using
> "s3a" as the scheme for s3.
> Are you using "s3p" for Presto? This should also be done in order for
> Flink to understand where to use the one or the other.
>
> On Thu, Apr 9, 2020 at 1:30 PM Roshan Punnoose <roshanp@gmail.com> wrote:
> >
> > Lastly, could it be the way I built the flink image for kube? I added
> both the presto and Hadoop plugins
> >
> > On Thu, Apr 9, 2020, 7:29 AM Roshan Punnoose <roshanp@gmail.com> wrote:
> >>
> >> Sorry realized this came off the user list by mistake. Adding the
> thread back in.
> >>
> >> On Thu, Apr 9, 2020, 7:26 AM Roshan Punnoose <roshanp@gmail.com> wrote:
> >>>
> >>> Yes sorry, no errors on the task manager. However, I am new to flink
> so don't know all the places to look for the logs. Been looking at the task
> manager logs and don't see any exceptions there. Not sure where to look for
> s3 exceptions in particular.
> >>>
> >>> On Thu, Apr 9, 2020, 7:16 AM Kostas Kloudas <kkloudas@gmail.com>
> wrote:
> >>>>
> >>>> Yes, this is why I reached out for further information.
> >>>>
> >>>> Incrementing the part counter is the responsibility of the
> >>>> StreamingFileSink, whose code is FS-agnostic, so it should also fail
> >>>> in the local FS.
> >>>> Now if it is on the S3 side, it would help if you have any more info,
> >>>> for example any logs from S3, to see if anything went wrong on their
> >>>> end.
> >>>>
> >>>> So your logs refer to normal execution, i.e. no failures and no
> >>>> restarting, right?
> >>>>
> >>>> Cheers,
> >>>> Kostas
> >>>>
> >>>> On Thu, Apr 9, 2020 at 12:53 PM Roshan Punnoose <roshanp@gmail.com>
> wrote:
> >>>> >
> >>>> > Surprisingly the same code running against the local filesystem
> works perfectly. The part counter increments correctly.
> >>>> >
> >>>> > On Thu, Apr 9, 2020, 5:51 AM Kostas Kloudas <kkloudas@gmail.com>
> wrote:
> >>>> >>
> >>>> >> Hi Roshan,
> >>>> >>
> >>>> >> Your logs refer to a simple run without any failures or re-running
> >>>> >> from a savepoint, right?
> >>>> >>
> >>>> >> I am asking because I am trying to reproduce it by running
a
> modified
> >>>> >> ParquetStreamingFileSinkITCase [1] and so far I cannot.
> >>>> >> The ITCase runs against the local filesystem, and not S3, but
I
> added
> >>>> >> the OutputFileConfig and it seems that the part counter is
> increases
> >>>> >> as expected.
> >>>> >>
> >>>> >> Is there any other information that would help us reproduce
the
> issue?
> >>>> >>
> >>>> >> Cheers,
> >>>> >> Kostas
> >>>> >>
> >>>> >> [1]
> https://github.com/apache/flink/blob/master/flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/avro/ParquetStreamingFileSinkITCase.java
> >>>> >>
> >>>> >> On Thu, Apr 9, 2020 at 3:37 AM Roshan Punnoose <roshanp@gmail.com>
> wrote:
> >>>> >> >
> >>>> >> > Hi,
> >>>> >> >
> >>>> >> > I am trying to get the parquet writer to write to s3;
however,
> the files do not seem to be rolling over. The same file "part-0-0.parquet"
> is being created each time. Like the 'partCounter" is not being updated?
> Maybe the Bucket is being recreated each time? I don't really know... Here
> are some logs:
> >>>> >> >
> >>>> >> > 2020-04-09 01:28:10,350 INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
> 0 checkpointing for checkpoint with id=2 (max part counter=2).
> >>>> >> > 2020-04-09 01:28:10,589 INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
> 0 received completion notification for checkpoint with id=2.
> >>>> >> > 2020-04-09 01:28:10,589 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
> >>>> >> > 2020-04-09 01:29:10,350 INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
> 0 checkpointing for checkpoint with id=3 (max part counter=3).
> >>>> >> > 2020-04-09 01:29:10,520 INFO
> org.apache.flink.streaming.api.functions.sink.filesystem.Buckets - Subtask
> 0 received completion notification for checkpoint with id=3.
> >>>> >> > 2020-04-09 01:29:10,521 INFO
> org.apache.flink.fs.s3.common.writer.S3Committer - Committing
> bro_conn/2020-04-09--01/part-0-0.parquet with MPU ID
> Y2DNQ7KQbuuS9valZK43L_8m6yCpHQehyXyrC.DxO26DsiMPi27m9yCa4IKHd3ad88QcQSkDRF6jOuAHQz3jnDdQutVD3bUNuVHJyO6I.o.h.wN_fXoBix6lwbotzZCI
> >>>> >> > And a part of my code:
> >>>> >> >
> >>>> >> > ```
> >>>> >> >
> >>>> >> > StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> >>>> >> >
> >>>> >> > //        env.setParallelism(2);
> >>>> >> >         env.enableCheckpointing(60000L);
> >>>> >> > ///PROPERTIES Added
> >>>> >> >         Schema schema = bro_conn.getClassSchema();
> >>>> >> >
> >>>> >> >         OutputFileConfig config = OutputFileConfig
> >>>> >> >                 .builder()
> >>>> >> >                 .withPartSuffix(".parquet")
> >>>> >> >                 .build();
> >>>> >> >
> >>>> >> >         final StreamingFileSink<GenericRecord> sink
=
> StreamingFileSink
> >>>> >> >                 .forBulkFormat(new
> Path("s3a://<bucket>/bro_conn/"),
> ParquetAvroWriters.forGenericRecord(schema))
> >>>> >> > //
> .withRollingPolicy(OnCheckpointRollingPolicy.build())
> >>>> >> >                 .withOutputFileConfig(config)
> >>>> >> > //                .withBucketAssigner(new
> PartitioningBucketAssigner())
> >>>> >> >                 .build();
> >>>> >> >
> >>>> >> >         DataStream<String> kinesis = env.addSource(new
> FlinkKinesisConsumer<>(
> >>>> >> >                 "kinesis", new SimpleStringSchema(),
> consumerConfig));
> >>>> >> >
> >>>> >> >         kinesis.flatMap(new JsonAvroParser())
> >>>> >> >                 .addSink(sink);
> >>>> >> >
> >>>> >> >
> >>>> >> >         env.execute("Bro Conn");
> >>>> >> >
> >>>> >> > ```
> >>>> >> >
> >>>> >> > I'm using Flink 1.10.0, and running in Kubernetes. I also
> created a custom image to add the presto/hadoop plugin.
> >>>> >> >
> >>>> >> > Thanks again!
>

Mime
View raw message