flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Wouter Zorgdrager <zorgdrag...@gmail.com>
Subject Re: KafkaProducer with generic (Avro) serialization schema
Date Tue, 01 May 2018 14:30:56 GMT
So, I'm still struggling with this issue. I dived a bit more into the
problem and I'm pretty sure that the problem is that I have to (implicitly)
pass the SchemaFor and RecordTo classes to my serialization schema
(otherwise I can't make it generic). However those class aren't
serializable, but of course I can't annotate them transient nor make it a
lazy val which gives me the current issue.

I hope someone has some leads for me.

Thanks!

Op do 26 apr. 2018 om 23:03 schreef Wouter Zorgdrager <zorgdragerw@gmail.com
>:

> Hi Bill,
>
> Thanks for your answer. However this proposal isn't going to solve my
> issue, since the problem here is that the context bounds I need to give in
> order to serialize it to Avro (SchemaFor, ToRecord and FromRecord) aren't
> serializable classes. This results in Flink not being able to serialize the
> KafkaProducer failing the whole job.
>
> Thanks,
> Wouter
>
> Op do 26 apr. 2018 om 00:42 schreef Nortman, Bill <
> William.Nortman@pimco.com>:
>
>> The things I would try would first in you are you class Person and
>> Address have getters and setters and a no argument constructor.
>>
>>
>>
>> *From:* Wouter Zorgdrager [mailto:zorgdragerw@gmail.com]
>> *Sent:* Wednesday, April 25, 2018 7:17 AM
>> *To:* user@flink.apache.org
>> *Subject:* KafkaProducer with generic (Avro) serialization schema
>>
>>
>>
>> Dear reader,
>>
>>
>>
>> I'm currently working on writing a KafkaProducer which is able to
>> serialize a generic type using avro4s.
>>
>> However this serialization schema is not serializable itself. Here is my
>> code for this:
>>
>>
>>
>> The serialization schema:
>>
>> class AvroSerializationSchema[IN : SchemaFor : FromRecord: ToRecord]
>> extends SerializationSchema[IN] {
>>
>>
>>
>>   override def serialize(element: IN): Array[Byte] = {
>>
>>     val byteArray = new ByteArrayOutputStream()
>>
>>     val avroSer = AvroOutputStream.binary[IN](byteArray)
>>
>>     avroSer.write(element)
>>
>>     avroSer.flush()
>>
>>     avroSer.close()
>>
>>
>>
>>     return byteArray.toByteArray
>>
>>   }
>>
>> }
>>
>>
>>
>> The job code:
>>
>> case class Person(name : String, age : Int, address : Address)
>>
>> case class Address(city : String, street : String)
>>
>>
>>
>> class SimpleJob {
>>
>>
>>
>>   @transient
>>
>>   private lazy val serSchema : AvroSerializationSchema[Person] = new
>> AvroSerializationSchema[Person]()
>>
>>
>>
>>   def start() = {
>>
>>     val testPerson = Person("Test", 100, Address("Test", "Test"))
>>
>>
>>
>>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>>
>>
>>
>>     env.
>>
>>       fromCollection(Seq(testPerson)).
>>
>>       addSink(createKafkaSink())
>>
>>
>>
>>     env.execute("Flink sample job")
>>
>>   }
>>
>>
>>
>>
>>
>>   def createKafkaSink() : RichSinkFunction[Person] = {
>>
>>     //set some properties
>>
>>     val properties = new Properties()
>>
>>     properties.put("bootstrap.servers", "127.0.0.01:9092
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.01-3A9092&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=GR3YuCSPimKhPq1hcics55VX6yef8lIsMEyTmEGFRSc&e=>
>> ")
>>
>>     properties.put("zookeeper.connect", "127.0.0.1:2181
>> <https://urldefense.proofpoint.com/v2/url?u=http-3A__127.0.0.1-3A2181&d=DwMFaQ&c=91HTncUBNS9Yv-Uuv2IlCA&r=uIPe81JDQPedACYiDVt5c_onxyyylzQ6P_yssgrLiLA&m=r7xfZoirhywTOErLvFFTTOxHs8dZeMd6-JtIaFdDtb0&s=zkbyqz0oyZOwyBZ9Hy7PpuGlTyPPB639vVkkFc6FlpQ&e=>
>> ")
>>
>>
>>
>>     new FlinkKafkaProducer011[Person]("persons", serSchema, properties)
>>
>>   }
>>
>>
>>
>> }
>>
>>
>>
>> The code does compile, however it gives the following error on
>> runtime: InvalidProgramException: Object
>> org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper@639c2c1d
>> is not serializable.
>>
>>
>>
>> I assume this means that my custom SerializationSchema is not
>> serializable due to the use of SchemaFor, FromRecord and ToRecord.
>>
>> Anyone knows a solution or workaround?
>>
>>
>>
>> Thanks in advance!
>>
>> Wouter
>>
>> This message contains confidential information and is intended only for
>> the individual named. If you are not the named addressee, you should not
>> disseminate, distribute, alter or copy this e-mail. Please notify the
>> sender immediately by e-mail if you have received this e-mail by mistake
>> and delete this e-mail from your system. E-mail transmissions cannot be
>> guaranteed to be secure or without error as information could be
>> intercepted, corrupted, lost, destroyed, arrive late or incomplete, or
>> contain viruses. The sender, therefore, does not accept liability for any
>> errors or omissions in the contents of this message which arise during or
>> as a result of e-mail transmission. If verification is required, please
>> request a hard-copy version. This message is provided for information
>> purposes and should not be construed as a solicitation or offer to buy or
>> sell any securities or related financial instruments in any jurisdiction.
>> Securities are offered in the U.S. through PIMCO Investments LLC,
>> distributor and a company of PIMCO LLC.
>>
>> The individual providing the information herein is an employee of Pacific
>> Investment Management Company LLC ("PIMCO"), an SEC-registered investment
>> adviser.  To the extent such individual advises you regarding a PIMCO
>> investment strategy, he or she does so as an associated person of PIMCO.
>> To the extent that any information is provided to you related to a
>> PIMCO-sponsored investment fund ("PIMCO Fund"), it is being provided to you
>> in the individual's capacity as a registered representative of PIMCO
>> Investments LLC ("PI"), an SEC-registered broker-dealer.  PI is not
>> registered, and does not intend to register, as a municipal advisor and
>> therefore does not provide advice with respect to the investment of the
>> proceeds of municipal securities or municipal escrow investments.  In
>> addition, unless otherwise agreed by PIMCO, this communication and any
>> related attachments are being provided on the express basis that they will
>> not cause PIMCO LLC, or its affiliates, to become an investment advice
>> fiduciary under ERISA or the Internal Revenue Code.
>>
>

Mime
View raw message