From user-return-33617-archive-asf-public=cust-asf.ponee.io@flink.apache.org Thu Mar 19 16:13:55 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 93652180661 for ; Thu, 19 Mar 2020 17:13:54 +0100 (CET) Received: (qmail 38723 invoked by uid 500); 19 Mar 2020 16:13:52 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 38713 invoked by uid 99); 19 Mar 2020 16:13:52 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Mar 2020 16:13:52 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 34EC718140B for ; Thu, 19 Mar 2020 16:13:52 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 0.001 X-Spam-Level: X-Spam-Status: No, score=0.001 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, DKIM_VALID_EF=-0.1, HTML_MESSAGE=0.2, RCVD_IN_DNSWL_NONE=-0.0001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Authentication-Results: spamd3-us-west.apache.org (amavisd-new); dkim=pass (1024-bit key) header.d=jwplayer.com Received: from mx1-he-de.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id bvBK81RQ81Sv for ; Thu, 19 Mar 2020 16:13:50 +0000 (UTC) Received-SPF: Pass (mailfrom) identity=mailfrom; client-ip=2607:f8b0:4864:20::944; helo=mail-ua1-x944.google.com; envelope-from=swhelan@jwplayer.com; receiver= Received: from mail-ua1-x944.google.com (mail-ua1-x944.google.com [IPv6:2607:f8b0:4864:20::944]) by mx1-he-de.apache.org (ASF Mail Server at mx1-he-de.apache.org) with ESMTPS id D8FEF7E133 for ; Thu, 19 Mar 2020 16:13:49 +0000 (UTC) Received: by mail-ua1-x944.google.com with SMTP id t20so1004922uao.7 for ; Thu, 19 Mar 2020 09:13:49 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=jwplayer.com; s=google; h=mime-version:from:date:message-id:subject:to; bh=vhb1fL7fvJU3D3nXM4EVF7dxGWov2qbJ3mTiOssUm8Q=; b=G6F9U/prW21wfX4xDoUkTYdqHwmSLfkPSjukxAtANSekpC0AwxJZvAgDfhyRIZUnki mmaqJ1KSJlP6+GWxqZvrYELw5/hSFkctXh7WF3mdt8NY38/TtHdv5Rns6VnvdQh1sgdb 08NlI4IBDs7FBneVRpLHbnOt98/k0jJk9BT94= X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:from:date:message-id:subject:to; bh=vhb1fL7fvJU3D3nXM4EVF7dxGWov2qbJ3mTiOssUm8Q=; b=JXNKZIO4tFZX+X3ss2CmvTlSIlIPoAG3l9XL1OLAMcEQ9dgU+aOi5YQ/4PqvHbP1Ul wb/GF5Niqg7nB1kVCypDnpqyUw8nlbQXrXjX86/Ok4s7bOeJSUTMD43e3HMME9IOMrxu 80UG5cr9Jt+c/F5YPjAm25PDIyUalv45l9SsFDPhlV8UKsJ4JQdg2X44RdFQyyh9gTdX I4I+hhEzRIFUtaEN/aTxMgKz2akfvTp94VgdyLE0lRP0IamSvUK3sr7fqzqeW+Aki/5Y Agp7V0UieZ5nOd9nsYoJ6fRAxR7yC8GWeAvg2ntnKwwL7wX55D3CAWBJ8SsOjMG4i6db pYXQ== X-Gm-Message-State: ANhLgQ2c8LEbk1+BrrtE053p7AdFO+Lkk/u8ER/QtMZd9bmmcD2fmee2 MaNuROxr0Eq1XpyketuUaatM1rQdIGcVgyGvrYl+00d26o3peg== X-Google-Smtp-Source: ADFU+vvuWOdZKvvXTPTSzryLXk0Y5nCPvJ45T/kGJwFO5Ran3Zlp51cbbrUyKH3gpWWeOI5lEjMCS213z2KIoI98wG4= X-Received: by 2002:ab0:160e:: with SMTP id k14mr2471836uae.112.1584634427203; Thu, 19 Mar 2020 09:13:47 -0700 (PDT) MIME-Version: 1.0 From: Steve Whelan Date: Thu, 19 Mar 2020 12:13:37 -0400 Message-ID: Subject: java.lang.AbstractMethodError when implementing KafkaSerializationSchema To: user Content-Type: multipart/alternative; boundary="00000000000021f2ea05a137746c" --00000000000021f2ea05a137746c Content-Type: text/plain; charset="UTF-8" Hi, I am attempting to create a Key/Value serializer for the Kafka table connector. I forked `KafkaTableSourceSinkFactoryBase`[1] and other relevant classes, updating the serializer. First, I created `JsonRowKeyedSerializationSchema` which implements `KeyedSerializationSchema`[2], which is deprecated. The way it works is you provide a list of indices in your Row output that are the Key. This works successfully. When I tried migrating my `JsonRowKeyedSerializationSchema` to implement `KafkaSerializationSchema`[3], I get a `java.lang.AbstractMethodError` exception. Normally, this would me I'm using an old interface however all my Flink dependencies are version 1.9. I can not find this abstract `serialize()` function in the Flink codebase. Has anyone come across this before? When I print the method of my `JsonRowKeyedSerializationSchema` class, I do see the below which seems to be getting called by the FlinkKafkaProducer but I do not see it in `KafkaSerializationSchema`: public abstract org.apache.flink.kafka.shaded.org.apache.kafka.clients.producer.ProducerRecord org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.Long) java.lang.Object java.lang.Long *`JsonRowKeyedSerializationSchema` class* import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.flink.types.Row; import org.apache.kafka.clients.producer.ProducerRecord; public class JsonRowKeyedSerializationSchema implements KafkaSerializationSchema { // constructors and helpers @Override public ProducerRecord serialize(Row row, @Nullable Long aLong) { return new ProducerRecord<>("some_topic", serializeKey(row), serializeValue(row)); } } *Stacktrace:* Caused by: java.lang.AbstractMethodError: Method com/mypackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(Ljava/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/producer/ProducerRecord; is abstract at com.mypackage.flink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKeyedSerializationSchema.java) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:98) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:546) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:523) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:483) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:150) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202) [1] https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java [2] https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchema.java [3] https://github.com/apache/flink/blob/release-1.9.0/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaSerializationSchema.java --00000000000021f2ea05a137746c Content-Type: text/html; charset="UTF-8" Content-Transfer-Encoding: quoted-printable
Hi,

I am attempting to create a Key/Val= ue serializer for the=C2=A0Kafka table connector. I forked `KafkaTableSourc= eSinkFactoryBase`[1] and other relevant classes, updating the serializer.

First, I created `JsonRowKeyedSerializationSchema` = which implements `KeyedSerializationSchema`[2], which is deprecated. The wa= y it works is you provide a list of indices in your Row output that are the= Key. This works successfully.=C2=A0

When I tr= ied migrating my=C2=A0`JsonRowKeyedSerializationSchema` to implement `Kafka= SerializationSchema`[3], I get a `java.lang.AbstractMethodError` exception.= Normally, this would me I'm using an old interface however all my Flin= k dependencies are version 1.9. I can not find this abstract `serialize()` = function in the Flink codebase. Has anyone come across this before?

When I print the method of my=C2=A0`JsonRowKeyedSerializa= tionSchema` class, I do see the below which seems to be getting called by t= he FlinkKafkaProducer but I do not see it in `KafkaSerializationSchema`:

public abstract org.apache.flink.kafka.shaded.org.ap= ache.kafka.clients.producer.ProducerRecord org.apache.flink.streaming.conne= ctors.kafka.KafkaSerializationSchema.serialize(java.lang.Object,java.lang.L= ong)
java.lang.Object
java.lang.Long

`JsonRowKeyedSerializationSchema` class

import org.apache.flink.streaming.connectors.kafka= .KafkaSerializationSchema;
import org.apache.flink.types.Row;
import = org.apache.kafka.clients.producer.ProducerRecord;

public class JsonR= owKeyedSerializationSchema implements KafkaSerializationSchema<Row> {=
=C2=A0
=C2=A0 // constructors and helpers

=C2=A0 @Override=C2=A0 public ProducerRecord<byte[], byte[]> serialize(Row row, @Nu= llable Long aLong) {
=C2=A0 =C2=A0 return new ProducerRecord<>(&qu= ot;some_topic", serializeKey(row), serializeValue(row));
=C2=A0 }}


Stacktrace:
<= div>
Caused by: java.lang.AbstractMethodError: Method com/myp= ackage/flink/serialization/json/JsonRowKeyedSerializationSchema.serialize(L= java/lang/Object;Ljava/lang/Long;)Lorg/apache/flink/kafka/shaded/org/apache= /kafka/clients/producer/ProducerRecord; is abstract
at com.mypackage.fl= ink.serialization.json.JsonRowKeyedSerializationSchema.serialize(JsonRowKey= edSerializationSchema.java)
at org.apache.flink.streaming.connectors.ka= fka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:816)
at org.apach= e.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProd= ucer.java:98)
at org.apache.flink.streaming.api.functions.sink.TwoPhase= CommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
at org.a= pache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.ja= va:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chaini= ngOutput.pushToOperator(OperatorChain.java:546)
at org.apache.flink.str= eaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.jav= a:523)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$Chaini= ngOutput.collect(OperatorChain.java:483)
at org.apache.flink.streaming.= api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamO= perator.java:727)
at org.apache.flink.streaming.api.operators.AbstractS= treamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
a= t org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamM= ap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$C= hainingOutput.pushToOperator(OperatorChain.java:546)
at org.apache.flin= k.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChai= n.java:523)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$C= hainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.strea= ming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractSt= reamOperator.java:727)
at org.apache.flink.streaming.api.operators.Abst= ractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)<= br> at org.apache.flink.streaming.api.operators.StreamMap.processElement(St= reamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorCh= ain$ChainingOutput.pushToOperator(OperatorChain.java:546)
at org.apache= .flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(Operato= rChain.java:523)
at org.apache.flink.streaming.runtime.tasks.OperatorCh= ain$ChainingOutput.collect(OperatorChain.java:483)
at org.apache.flink.= streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(Abstr= actStreamOperator.java:727)
at org.apache.flink.streaming.api.operators= .AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:= 705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$M= anualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.j= ava:310)
at org.apache.flink.streaming.api.operators.StreamSourceContex= ts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
= at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.e= mitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.st= reaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java= :185)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetc= her.runFetchLoop(KafkaFetcher.java:150)
at org.apache.flink.streaming.c= onnectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)=
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSou= rce.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.= run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.S= ourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)


--00000000000021f2ea05a137746c--