flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jack Huang <jackhu...@mz.com>
Subject Handle deserialization error
Date Sat, 27 Aug 2016 00:37:04 GMT
Hi all,

I have a custom deserializer which I pass to a Kafka source to transform
JSON string to Scala case class.

val events = env.addSource(new FlinkKafkaConsumer09[Event]("events",
new JsonSerde(classOf[Event], new Event), kafkaProp))

‚Äč

There are time when the JSON message is malformed, in which case I want to
catch the exception, log some error message, and go on to the next message
without producing an event to the downstream. It doesn't seem like the
DeserializationSchema
interface allows such behavior. How could I achieve this?

Thanks,
Jack

Mime
View raw message