flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Rohan Thimmappa <rohan.thimma...@gmail.com>
Subject Re: Class loader premature closure - NoClassDefFoundError: org/apache/kafka/clients/NetworkClient
Date Tue, 07 May 2019 13:57:51 GMT
It is a blocker for exactly once support from flink kafka producer.

This issue reported and closed. but still reproducible
https://issues.apache.org/jira/browse/FLINK-10455

On Mon, May 6, 2019 at 10:20 AM Slotterback, Chris <
Chris_Slotterback@comcast.com> wrote:

> Hey Flink users,
>
>
>
> Currently using Flink 1.7.2 with a job using FlinkKafkaProducer with its
> write semantic set to Semantic.EXACTLY_ONCE. When there is a job failure
> and restart (in our case from checkpoint timeout), it begins a failure loop
> that requires a cancellation and resubmission to fix. The expected and
> desired outcome should be a recovery from failure and the job restarts
> successfully. Some digging revealed an issue where the class loader closes
> before the connection to kafka is fully terminated resulting in a
> NoClassDefFoundError. A description of what is happening has already been
> described here:
> https://heap.io/blog/engineering/missing-scala-class-noclassdeffounderror,
> though we are experiencing this with kafka, not Redis:
>
>
>
> 5/3/19
>
> 3:14:18.780 PM
>
> 2019-05-03 15:14:18,780 ERROR
> org.apache.kafka.common.utils.KafkaThread                     - Uncaught
> exception in thread 'kafka-producer-network-thread | producer-80':
>
> java.lang.NoClassDefFoundError: org/apache/kafka/clients/NetworkClient$1
>
> at
> org.apache.kafka.clients.NetworkClient.processDisconnection(NetworkClient.java:658)
>
> at
> org.apache.kafka.clients.NetworkClient.handleDisconnections(NetworkClient.java:805)
>
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:520)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:226)
>
> at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Collapse
>
> date_hour =       15
>
>
>
> Interestingly, this only happens when we extend the FlinkKafkaProducer for
> the purposes of setting the write semantic to EXACTLY_ONCE. When running
> with the default FlinkKafkaProducer (using Semantic.AT_LEAST_ONCE), the
> class loader has no issues disconnecting the kafka client on job failure,
> and the job recovers just fine. We are not doing anything particularly
> strange in our extended producer as far as I can tell:
>
>
>
> public class *CustomFlinkKafkaProducer*<IN> *extends* *FlinkKafkaProducer*<IN>
> {
>
>
>
>   public *CustomFlinkKafkaProducer*(Properties properties, String topicId,
>
>       AvroKeyedSerializer<IN> serializationSchema) {
>
>     super(
>
>         topicId,
>
>         serializationSchema,
>
>         properties,
>
>         Optional.of(new FlinkFixedPartitioner<>()),
>
>         *Semantic.EXACTLY_ONCE*,
>
>         DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
>
>   }
>
>   public static Properties getPropertiesFromBrokerList(String brokerList) {
>
>     […]
>
>   }
>
> }
>
>
>
>
>


-- 
Thanks
Rohan

Mime
View raw message