flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bart van Deenen <bartvandee...@fastmail.fm>
Subject flink doesn't seem to serialize generated Avro pojo
Date Thu, 22 Jun 2017 09:09:42 GMT
Hi All

I have a simple avro file 

{"namespace": "com.kpn.datalab.schemas.omnicrm.contact_history.v1",
 "type": "record",
 "name": "contactHistory",
 "fields": [
     {"name": "events",     "type": {"type":"array",            
     "items": "bytes"}},
     {"name": "krn",      "type": "string"}
 ]
}

I generate a Java pojo (gist: https://goo.gl/FtM7T6) from this file via

java -jar avro_tools.jar 
Version 1.7.7 of Apache Avro

java -jar bin/avro_tools.jar compile schema <avrofile>
generated/src/main/java

This pojo file doesn't seem to want to be serialized by Flink, when I
pack the compiled class file into a fat-jar job file.

When I pass a certain byte array into the avro deserializer, this works
fine in a regular Scala application, but when I do the same in a Flink
job it bombs with a typecast exception.

Scala code:

      val payload: AnyRef = kafkaEvent.payload()   // this calls the
      Avro deserializer: (gist: https://goo.gl/18UqJy)

      println("canonical name: " + payload.getClass.getCanonicalName)
      val chAvro =
      try {
        payload.asInstanceOf[contactHistory]
      } catch {
        case c:ClassCastException =>
          println(c.getMessage)
 
I get the following amazing error:

"canonical name: " +
com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory        
com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory cannot
be cast to
com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory

I've verified the exact same code with the exact same byte-array input
(checked by printing a hexdump from the Flink job), and the code itself
is not the problem.

If I put a jar containing the class file from the Pojo in the Flink lib
directory, my Flink job works fine!

Any ideas how I can put my Pojo in the fat jar, because I don't want to
restart my Flink when I add new Avro schemas?

Thanks

Bart

Mime
View raw message