flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stefan Richter <s.rich...@data-artisans.com>
Subject Re: flink doesn't seem to serialize generated Avro pojo
Date Thu, 22 Jun 2017 09:20:31 GMT
Hi,

this „amazing error“ message typically means that the class of the object instance you
created was loaded by a different classloader than the one that loaded the class in the code
that tries to cast it. A class in Java is
fully identified by the canonical classname AND the classloader that loaded it. This makes
it possible that two classes with the same name and bytecode are not instance of each other.
Unfortunately, I have no concrete idea why that happens in your case, 
but maybe this info helps to track down the problem.

Best,
Stefan

> Am 22.06.2017 um 11:09 schrieb Bart van Deenen <bartvandeenen@fastmail.fm>:
> 
> Hi All
> 
> I have a simple avro file 
> 
> {"namespace": "com.kpn.datalab.schemas.omnicrm.contact_history.v1",
> "type": "record",
> "name": "contactHistory",
> "fields": [
>     {"name": "events",     "type": {"type":"array",            
>     "items": "bytes"}},
>     {"name": "krn",      "type": "string"}
> ]
> }
> 
> I generate a Java pojo (gist: https://goo.gl/FtM7T6) from this file via
> 
> java -jar avro_tools.jar 
> Version 1.7.7 of Apache Avro
> 
> java -jar bin/avro_tools.jar compile schema <avrofile>
> generated/src/main/java
> 
> This pojo file doesn't seem to want to be serialized by Flink, when I
> pack the compiled class file into a fat-jar job file.
> 
> When I pass a certain byte array into the avro deserializer, this works
> fine in a regular Scala application, but when I do the same in a Flink
> job it bombs with a typecast exception.
> 
> Scala code:
> 
>      val payload: AnyRef = kafkaEvent.payload()   // this calls the
>      Avro deserializer: (gist: https://goo.gl/18UqJy)
> 
>      println("canonical name: " + payload.getClass.getCanonicalName)
>      val chAvro =
>      try {
>        payload.asInstanceOf[contactHistory]
>      } catch {
>        case c:ClassCastException =>
>          println(c.getMessage)
> 
> I get the following amazing error:
> 
> "canonical name: " +
> com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory        
> com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory cannot
> be cast to
> com.kpn.datalab.schemas.omnicrm.contact_history.v1.contactHistory
> 
> I've verified the exact same code with the exact same byte-array input
> (checked by printing a hexdump from the Flink job), and the code itself
> is not the problem.
> 
> If I put a jar containing the class file from the Pojo in the Flink lib
> directory, my Flink job works fine!
> 
> Any ideas how I can put my Pojo in the fat jar, because I don't want to
> restart my Flink when I add new Avro schemas?
> 
> Thanks
> 
> Bart


Mime
View raw message