flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From David Magalhães <speeddra...@gmail.com>
Subject Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink
Date Tue, 18 Feb 2020 13:59:50 GMT
Thanks for the feedback Arvid. Currently isn't an issue, but I will look
back into it in the future.

On Tue, Feb 18, 2020 at 1:51 PM Arvid Heise <arvid@ververica.com> wrote:

> Hi David,
>
> sorry for replying late. I was caught up on other incidents.
>
> I double-checked all the information that you provided and conclude that
> you completely bypass our filesystems and plugins.
>
> What you are using is AvroParquetWriter, which brings in the hadoop
> dependencies, including raw hadoop s3. It becomes obvious since the Path
> you are using is not coming from Flink namespace.
> The class issues that come from that are hard to debug. You are
> effectively bundling another hadoop, so if you also have a specific Hadoop
> version on your cluster (e.g. on EMR), then there can be ambiguities and
> the seen error happens.
>
> What I'd recommend you do is a completely different approach. Assuming you
> just want exponential backoff for all s3 write accesses, you could wrap the
> S3AFileSystem and create your own s3 plugin. That would work with any given
> format for future cases.
>
> If you want to stick to your approach, you should use
> org.apache.flink.formats.parquet.ParquetWriterFactory, which uses your
> mentioned StreamOutputFile.
>
> Best,
>
> Arvid
>
> On Thu, Feb 13, 2020 at 12:04 AM David Magalhães <speeddragon@gmail.com>
> wrote:
>
>> Hi Arvid, I use a docker image. Here is the Dockerfile:
>>
>> FROM flink:1.9.1-scala_2.12
>>
>> RUN mkdir /opt/flink/plugins/flink-s3-fs-hadoop
>> RUN cp /opt/flink/opt/flink-s3-fs-hadoop-1.9.1.jar
>> /opt/flink/plugins/flink-s3-fs-hadoop/
>>
>> Please let me know if you need more information.
>>
>> On Wed, Feb 12, 2020 at 9:15 PM Arvid Heise <arvid@ververica.com> wrote:
>>
>>> Hi David,
>>>
>>> can you double-check the folder structure of your plugin? It should
>>> reside in its own subfolder. Here is an example.
>>>
>>> flink-dist
>>> ├── conf
>>> ├── lib
>>> ...
>>> └── plugins
>>>     └── s3
>>>         └── flink-s3-fs-hadoop.jar
>>>
>>> I will investigate your error deeply in the next few days but I'd like
>>> to have a final confirmation about the folder structure.
>>>
>>>
>>> On Wed, Feb 12, 2020 at 8:56 PM David Magalhães <speeddragon@gmail.com>
>>> wrote:
>>>
>>>> Hi Robert, I couldn't found any previous mention before the
>>>> NoClassDefFoundError.
>>>> Here is the full log [1] if you want to look for something more
>>>> specific.
>>>>
>>>> [1] https://www.dropbox.com/s/l8tba6vft08flke/joda.out?dl=0
>>>>
>>>> On Wed, Feb 12, 2020 at 12:45 PM Robert Metzger <rmetzger@apache.org>
>>>> wrote:
>>>>
>>>>> According to this answer [1] the first exception "mentioning"
>>>>> org/joda/time/format/DateTimeParserBucket should be a different one.
>>>>> Can you go through the logs to make sure it is really a
>>>>> ClassNotFoundException, and not a ExceptionInInitializerError or something
>>>>> else?
>>>>>
>>>>> [1]https://stackoverflow.com/a/5756989/568695
>>>>>
>>>>> On Wed, Feb 12, 2020 at 12:36 PM David Magalhães <
>>>>> speeddragon@gmail.com> wrote:
>>>>>
>>>>>> Hi Arvid,
>>>>>>
>>>>>> I'm using flink-s3-fs-hadoop-1.9.1.jar in plugins folder. Like I
said
>>>>>> previously, this works normally until an exception is throw inside
the
>>>>>> sink. It will try to recover again, but sometimes doesn't recover
giving
>>>>>> this error.
>>>>>>
>>>>>> To write to S3 I use *AvroParquetWriter* with the following code:
>>>>>>
>>>>>> val writer = AvroParquetWriter
>>>>>>          .builder[GenericRecord](new Path(finalFilePath))
>>>>>>
>>>>>> *Path* is from *org.apache.hadoop.fs*, the other option is to use*
org.apache.flink.formats.parquet.StreamOutputFile
>>>>>> *which will use flink S3 plugin, right ? Not sure how can I convert
>>>>>> from Path to StreamOuputFile. I know that when I've used StreamingFileSink,
>>>>>> I used StreamOuputFile.
>>>>>>
>>>>>> On Wed, Feb 12, 2020 at 10:03 AM Arvid Heise <arvid@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi David,
>>>>>>>
>>>>>>> upon closer reviewing your stacktrace, it seems like you are
trying
>>>>>>> to access S3 without our S3 plugin. That's in general not recommended
at
>>>>>>> all.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Arvid
>>>>>>>
>>>>>>> On Tue, Feb 11, 2020 at 11:06 AM Arvid Heise <arvid@ververica.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi David,
>>>>>>>>
>>>>>>>> this seems to be a bug in our s3 plugin. The joda dependency
should
>>>>>>>> be bundled there.
>>>>>>>>
>>>>>>>> Are you using s3 as a plugin by any chance? Which flink version
are
>>>>>>>> you using?
>>>>>>>>
>>>>>>>> If you are using s3 as a plugin, you could put joda in your
plugin
>>>>>>>> folder like this
>>>>>>>>
>>>>>>>> flink-dist
>>>>>>>> ├── conf
>>>>>>>> ├── lib
>>>>>>>> ...
>>>>>>>> └── plugins
>>>>>>>>     └── s3
>>>>>>>>         ├── joda.jar
>>>>>>>>         └── flink-s3-fs-hadoop.jar
>>>>>>>>
>>>>>>>> If flink-s3-fs-hadoop.jar is in lib, you could try adding
joda into
>>>>>>>> that.
>>>>>>>>
>>>>>>>> Adding joda to your user code will unfortunately not work.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Arvid
>>>>>>>>
>>>>>>>> On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <
>>>>>>>> speeddragon@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Andrey, thanks for your reply.
>>>>>>>>>
>>>>>>>>> The class is on the jar created with `*sbt assembly*`
that is
>>>>>>>>> submitted to Flink to start a Job.
>>>>>>>>>
>>>>>>>>> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep
>>>>>>>>> DateTimeParserBucket
>>>>>>>>>      1649  05-27-2016 10:24
>>>>>>>>> org/joda/time/format/DateTimeParserBucket$SavedField.class
>>>>>>>>>      1984  05-27-2016 10:24
>>>>>>>>> org/joda/time/format/DateTimeParserBucket$SavedState.class
>>>>>>>>>      8651  05-27-2016 10:24
>>>>>>>>> org/joda/time/format/DateTimeParserBucket.class
>>>>>>>>>
>>>>>>>>> Shouldn't this be enough ?
>>>>>>>>>
>>>>>>>>> I think it uses is when nothing happens, but as soon
it have some
>>>>>>>>> exceptions, looks like it "forgets" it.
>>>>>>>>>
>>>>>>>>> Like I said before, this is kind of intermittent.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> David
>>>>>>>>>
>>>>>>>>> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <
>>>>>>>>> azagrebin@apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> Hi David,
>>>>>>>>>>
>>>>>>>>>> This looks like a problem with resolution of maven
dependencies
>>>>>>>>>> or something.
>>>>>>>>>> The custom WindowParquetGenericRecordListFileSink
probably
>>>>>>>>>> transitively depends on org/joda/time/format/DateTimeParserBucket
>>>>>>>>>> and it is missing on the runtime classpath of Flink.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Andrey
>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <
>>>>>>>>>> speeddragon@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm implementing an exponential backoff inside
a custom sink
>>>>>>>>>>> that uses an AvroParquetWriter to write to S3.
I've change the number of
>>>>>>>>>>> attempts to 0 inside the core-site.xml, and I'm
capturing the timeout
>>>>>>>>>>> exception, doing a Thread.sleep for X seconds.
This is working as intended,
>>>>>>>>>>> and when S3 is offline, it waits until it is
online.
>>>>>>>>>>>
>>>>>>>>>>> I also want to test that the back pressure and
the checkpoints
>>>>>>>>>>> are working as intended, and for the first one,
I can see the back pressure
>>>>>>>>>>> in Flink UI going up, and recover as expected
and not reading more data
>>>>>>>>>>> from Kafka.
>>>>>>>>>>>
>>>>>>>>>>> For the checkpoints, and I've added inside the
sink invoke
>>>>>>>>>>> function a randomly exception (1 in 100, to simulate
that a problem has
>>>>>>>>>>> happen, and need to recover from the last good
checkpoint), but something
>>>>>>>>>>> strange happens. I can see the job is being canceled
and created again, and
>>>>>>>>>>> running fine, other times after a X number of
times of being created and
>>>>>>>>>>> canceled, it gives a *NoClassDefFoundError*,
and it will keep
>>>>>>>>>>> giving that forever.
>>>>>>>>>>>
>>>>>>>>>>> Do you guys have any thoughts?
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>>>>>> Caught exception while processing timer.
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>>>>>> ... 7 more
>>>>>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>>>>>> at
>>>>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>>>>>> at
>>>>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>>>>>> ... 7 more
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <
>>>>>>>>>> speeddragon@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm implementing an exponential backoff inside
a custom sink
>>>>>>>>>>> that uses an AvroParquetWriter to write to S3.
I've change the number of
>>>>>>>>>>> attempts to 0 inside the core-site.xml, and I'm
capturing the timeout
>>>>>>>>>>> exception, doing a Thread.sleep for X seconds.
This is working as intended,
>>>>>>>>>>> and when S3 is offline, it waits until it is
online.
>>>>>>>>>>>
>>>>>>>>>>> I also want to test that the back pressure and
the checkpoints
>>>>>>>>>>> are working as intended, and for the first one,
I can see the back pressure
>>>>>>>>>>> in Flink UI going up, and recover as expected
and not reading more data
>>>>>>>>>>> from Kafka.
>>>>>>>>>>>
>>>>>>>>>>> For the checkpoints, and I've added inside the
sink invoke
>>>>>>>>>>> function a randomly exception (1 in 100, to simulate
that a problem has
>>>>>>>>>>> happen, and need to recover from the last good
checkpoint), but something
>>>>>>>>>>> strange happens. I can see the job is being canceled
and created again, and
>>>>>>>>>>> running fine, other times after a X number of
times of being created and
>>>>>>>>>>> canceled, it gives a *NoClassDefFoundError*,
and it will keep
>>>>>>>>>>> giving that forever.
>>>>>>>>>>>
>>>>>>>>>>> Do you guys have any thoughts?
>>>>>>>>>>>
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException:
>>>>>>>>>>> Caught exception while processing timer.
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>>>>>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>>>>>>>>>> org/joda/time/format/DateTimeParserBucket}
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>>>>>>>>>> ... 7 more
>>>>>>>>>>> Caused by: java.lang.NoClassDefFoundError:
>>>>>>>>>>> org/joda/time/format/DateTimeParserBucket
>>>>>>>>>>> at
>>>>>>>>>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>>>>>>>>>> at
>>>>>>>>>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>>>>>>>>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>>>>>>>>>> at
>>>>>>>>>>> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>>>>>>>>>> at scala.util.Try$.apply(Try.scala:209)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>>>>>>>>>> at
>>>>>>>>>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>>>>>>>>>> ... 7 more
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>

Mime
View raw message