flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jack Huang <jackhu...@mz.com>
Subject Parsing source JSON String as Scala Case Class
Date Thu, 04 Aug 2016 01:56:58 GMT
Hi all,

I want to read a source of JSON String as Scala Case Class. I don't want to
have to write a serde for every case class I have. The idea is:

val events = env.addSource(new FlinkKafkaConsumer09[Event]("events",
new JsonSerde(classOf[Event]), kafkaProp))

​

I was implementing my own JsonSerde with Jackson/Gson, but in both case I
get the error

Task not serializable
    org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
    org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
    org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:568)
    org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:498)
    com.fractionalmedia.stream.PricerEvent$.main(PricerEvent.scala:100)

​
It seems that both Jackson and Gson have classes that is not serializable.

I couldn't find any other solution to perform this JSON-to-Case-Class
parsing, yet it seems a very basic need. What am I missing?


Thanks,
Jack

Mime
View raw message