flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arvid Heise <ar...@ververica.com>
Subject Re: NoClassDefFoundError when an exception is throw inside invoke in a Custom Sink
Date Tue, 11 Feb 2020 10:06:58 GMT
Hi David,

this seems to be a bug in our s3 plugin. The joda dependency should be
bundled there.

Are you using s3 as a plugin by any chance? Which flink version are you
using?

If you are using s3 as a plugin, you could put joda in your plugin folder
like this

flink-dist
├── conf
├── lib
...
└── plugins
    └── s3
        ├── joda.jar
        └── flink-s3-fs-hadoop.jar

If flink-s3-fs-hadoop.jar is in lib, you could try adding joda into that.

Adding joda to your user code will unfortunately not work.

Best,

Arvid

On Thu, Feb 6, 2020 at 11:16 PM David Magalhães <speeddragon@gmail.com>
wrote:

> Hi Andrey, thanks for your reply.
>
> The class is on the jar created with `*sbt assembly*` that is
> submitted to Flink to start a Job.
>
> unzip -l target/jar/myapp-0.0.1-SNAPSHOT.jar | grep DateTimeParserBucket
>      1649  05-27-2016 10:24
> org/joda/time/format/DateTimeParserBucket$SavedField.class
>      1984  05-27-2016 10:24
> org/joda/time/format/DateTimeParserBucket$SavedState.class
>      8651  05-27-2016 10:24
> org/joda/time/format/DateTimeParserBucket.class
>
> Shouldn't this be enough ?
>
> I think it uses is when nothing happens, but as soon it have some
> exceptions, looks like it "forgets" it.
>
> Like I said before, this is kind of intermittent.
>
> Thanks,
> David
>
> On Thu, Feb 6, 2020 at 5:46 PM Andrey Zagrebin <azagrebin@apache.org>
> wrote:
>
>> Hi David,
>>
>> This looks like a problem with resolution of maven dependencies or
>> something.
>> The custom WindowParquetGenericRecordListFileSink probably transitively
>> depends on org/joda/time/format/DateTimeParserBucket
>> and it is missing on the runtime classpath of Flink.
>>
>> Best,
>> Andrey
>>
>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <speeddragon@gmail.com>
>> wrote:
>>
>>> I'm implementing an exponential backoff inside a custom sink that uses
>>> an AvroParquetWriter to write to S3. I've change the number of attempts to
>>> 0 inside the core-site.xml, and I'm capturing the timeout exception, doing
>>> a Thread.sleep for X seconds. This is working as intended, and when S3 is
>>> offline, it waits until it is online.
>>>
>>> I also want to test that the back pressure and the checkpoints are
>>> working as intended, and for the first one, I can see the back pressure in
>>> Flink UI going up, and recover as expected and not reading more data from
>>> Kafka.
>>>
>>> For the checkpoints, and I've added inside the sink invoke function a
>>> randomly exception (1 in 100, to simulate that a problem has happen, and
>>> need to recover from the last good checkpoint), but something strange
>>> happens. I can see the job is being canceled and created again, and running
>>> fine, other times after a X number of times of being created and canceled,
>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>> forever.
>>>
>>> Do you guys have any thoughts?
>>>
>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>> exception while processing timer.
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>> org/joda/time/format/DateTimeParserBucket}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>> ... 7 more
>>> Caused by: java.lang.NoClassDefFoundError:
>>> org/joda/time/format/DateTimeParserBucket
>>> at
>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>> at
>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>> at
>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>> at
>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>> at
>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>> at
>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>> at
>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>> at
>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>> at
>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>> at
>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>> at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>> at
>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>> at
>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>> at scala.util.Try$.apply(Try.scala:209)
>>> at
>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>> at
>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>> at
>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>> at
>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>> ... 7 more
>>>
>>>
>>>
>> On Wed, Feb 5, 2020 at 5:22 PM David Magalhães <speeddragon@gmail.com>
>> wrote:
>>
>>> I'm implementing an exponential backoff inside a custom sink that uses
>>> an AvroParquetWriter to write to S3. I've change the number of attempts to
>>> 0 inside the core-site.xml, and I'm capturing the timeout exception, doing
>>> a Thread.sleep for X seconds. This is working as intended, and when S3 is
>>> offline, it waits until it is online.
>>>
>>> I also want to test that the back pressure and the checkpoints are
>>> working as intended, and for the first one, I can see the back pressure in
>>> Flink UI going up, and recover as expected and not reading more data from
>>> Kafka.
>>>
>>> For the checkpoints, and I've added inside the sink invoke function a
>>> randomly exception (1 in 100, to simulate that a problem has happen, and
>>> need to recover from the last good checkpoint), but something strange
>>> happens. I can see the job is being canceled and created again, and running
>>> fine, other times after a X number of times of being created and canceled,
>>> it gives a *NoClassDefFoundError*, and it will keep giving that
>>> forever.
>>>
>>> Do you guys have any thoughts?
>>>
>>> org.apache.flink.streaming.runtime.tasks.AsynchronousException: Caught
>>> exception while processing timer.
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$StreamTaskAsyncExceptionHandler.handleAsyncException(StreamTask.java:958)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask.handleAsyncException(StreamTask.java:932)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:285)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
>>> at
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: TimerException{java.lang.NoClassDefFoundError:
>>> org/joda/time/format/DateTimeParserBucket}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:284)
>>> ... 7 more
>>> Caused by: java.lang.NoClassDefFoundError:
>>> org/joda/time/format/DateTimeParserBucket
>>> at
>>> org.joda.time.format.DateTimeFormatter.parseMillis(DateTimeFormatter.java:825)
>>> at com.amazonaws.util.DateUtils.parseRFC822Date(DateUtils.java:193)
>>> at
>>> com.amazonaws.services.s3.internal.ServiceUtils.parseRfc822Date(ServiceUtils.java:78)
>>> at
>>> com.amazonaws.services.s3.internal.AbstractS3ResponseHandler.populateObjectMetadata(AbstractS3ResponseHandler.java:115)
>>> at
>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:32)
>>> at
>>> com.amazonaws.services.s3.internal.S3MetadataResponseHandler.handle(S3MetadataResponseHandler.java:25)
>>> at
>>> com.amazonaws.http.AmazonHttpClient.handleResponse(AmazonHttpClient.java:1072)
>>> at
>>> com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:746)
>>> at
>>> com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:489)
>>> at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:310)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3785)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1050)
>>> at
>>> com.amazonaws.services.s3.AmazonS3Client.getObjectMetadata(AmazonS3Client.java:1027)
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:904)
>>> at
>>> org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1553)
>>> at org.apache.hadoop.fs.s3a.S3AFileSystem.create(S3AFileSystem.java:555)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:929)
>>> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:910)
>>> at
>>> org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
>>> at
>>> org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:246)
>>> at org.apache.parquet.hadoop.ParquetWriter.<init>(ParquetWriter.java:280)
>>> at
>>> org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:535)
>>> at
>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.$anonfun$invoke$1(WindowParquetGenericRecordListFileSink.scala:59)
>>> at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
>>> at scala.util.Try$.apply(Try.scala:209)
>>> at
>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:54)
>>> at
>>> com.myapp.kratos.writer.WindowParquetGenericRecordListFileSink.invoke(WindowParquetGenericRecordListFileSink.scala:18)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>> at
>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:45)
>>> at
>>> com.myapp.kratos.window.GenericRecordAggregatorWindowFunction.apply(GenericRecordAggregatorWindowFunction.scala:20)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:44)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableWindowFunction.process(InternalIterableWindowFunction.java:32)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
>>> at
>>> org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onProcessingTime(WindowOperator.java:503)
>>> at
>>> org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.onProcessingTime(InternalTimerServiceImpl.java:260)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$TriggerTask.run(SystemProcessingTimeService.java:281)
>>> ... 7 more
>>>
>>>
>>>
>>>

Mime
View raw message