flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Arvid Heise <ar...@ververica.com>
Subject Re: java.lang.AbstractMethodError when implementing KafkaSerializationSchema
Date Wed, 25 Mar 2020 07:39:33 GMT
Hi Steve,

I just noticed some inconsistency: Your class correctly contains the bridge
method (last method in javap).
Your stacktrace however mentions
*org/apache/**flink/kafka/shaded*/org/apache/kafka/clients/producer/ProducerRecord
instead of org.apache.kafka.clients.producer.ProducerRecord.

Did you perform any relocations yourself (quite unlikely)? If so, please
add an exclusion for org.apache.kafka.

If not, then we may look at some inconsistencies in different versions of
Flink and your setup. Could you double check all Flink versions ideally
down to the minor part, especially on EMR (dashboard) but also in your poms
(mvn dependency:tree)?

On Wed, Mar 25, 2020 at 2:35 AM Steve Whelan <swhelan@jwplayer.com> wrote:

> Hi Arvid,
>
> Interestingly, my job runs successfully in a docker container (image*
> flink:1.9.0-scala_2.11*) but is failing with the
> *java.lang.AbstractMethodError* on AWS EMR (non-docker). I am compiling
> with java version OpenJDK 1.8.0_242, which is the same version my EMR
> cluster is running. Though since it runs successfully locally in a docker
> container, it would point to an issue in our AWS environment setup. Oddly,
> we have been running Flink on EMR for +2 years and have never come across
> this till now.
>
> Results of javap are:
>
> public class
> com.jwplayer.flink.serialization.json.JsonRowKeyedSerializationSchema
> implements
> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema<org.apache.flink.types.Row>
> {
>   public static
> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema<org.apache.flink.types.Row>
> create(com.jwplayer.flink.config.serde.SerDeConfig);
>   public byte[] serializeKey(org.apache.flink.types.Row);
>   public byte[] serializeValue(org.apache.flink.types.Row);
>   public org.apache.kafka.clients.producer.ProducerRecord<byte[], byte[]>
> serialize(org.apache.flink.types.Row, java.lang.Long);
>   public org.apache.kafka.clients.producer.ProducerRecord
> serialize(java.lang.Object, java.lang.Long);
> }
>
>
> On Mon, Mar 23, 2020 at 9:55 AM Arvid Heise <arvid@ververica.com> wrote:
>
>> Hi Steve,
>>
>> for some reason, it seems as if the Java compiler is not generating the
>> bridge method [1].
>>
>> Could you double-check that the Java version of your build process and
>> your cluster match?
>>
>> Could you run javap on your generated class file and report back?
>>
>> [1]
>> https://docs.oracle.com/javase/tutorial/java/generics/bridgeMethods.html
>>
>> On Thu, Mar 19, 2020 at 5:13 PM Steve Whelan <swhelan@jwplayer.com>
>> wrote:
>>
>>> Hi,
>>>
>>> I am attempting to create a Key/Value serializer for the Kafka table
>>> connector. I forked `KafkaTableSourceSinkFactoryBase`[1] and other relevant
>>> classes, updating the serializer.
>>>
>>> First, I created `JsonRowKeyedSerializationSchema` which implements
>>> `KeyedSerializationSchema`[2], which is deprecated. The way it works is you
>>> provide a list of indices in your Row output that are the Key. This works
>>> successfully.
>>>
>>> When I tried migrating my `JsonRowKeyedSerializationSchema` to implement
>>> `KafkaSerializationSchema`[3], I get a `java.lang.AbstractMethodError`
>>> exception. Normally, this would me I'm using an old interface however all
>>> my Flink dependencies are version 1.9. I can not find this abstract
>>> `serialize()` function in the Flink codebase. Has anyone come across this
>>> before?
>>>
>>> When I print the method of my `JsonRowKeyedSerializationSchema` class, I
>>> do see the below which seems to be getting called by the FlinkKafkaProducer
>>> but I do not see it in `KafkaSerializationSchema`:
>>>
>>> public abstract
>>> org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord
>>> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.Long)
>>> java.lang.Object
>>> java.lang.Long
>>>
>>>
>>> *`JsonRowKeyedSerializationSchema` class*
>>>
>>> import
>>> org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
>>> import org.apache.flink.types.Row;
>>> import org.apache.kafka.clients.producer.ProducerRecord;
>>>
>>> public class JsonRowKeyedSerializationSchema implements
>>> KafkaSerializationSchema<Row> {
>>>
>>>   // constructors and helpers
>>>
>>>   @Override
>>>   public ProducerRecord<byte[], byte[]> serialize(Row row, @Nullable
>>> Long aLong) {
>>>     return new ProducerRecord<>("some_topic", serializeKey(row),
>>> serializeValue(row));
>>>   }
>>> }
>>>
>>>
>>> *Stacktrace:*
>>>
>>> Caused by: java.lang.AbstractMethodError: Method
>>> com/mypackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord;
>>> is abstract
>>> at
>>> com.mypackage.flink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKeyedSerializationSchema.java)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98)
>>> at
>>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
>>> at
>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150)
>>> at
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> at
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
>>> [2]
>>> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java
>>> [3]
>>> https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java
>>>
>>>
>>

Mime
View raw message