flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Yassine Marzougui <yassmar...@gmail.com>
Subject Re: Handle deserialization error
Date Sat, 27 Aug 2016 01:51:29 GMT
Hi Jack,

As Robert Metzger mentioned in a previous thread, there's an ongoing
discussion about the issue in this JIRA:
https://issues.apache.org/jira/browse/FLINK-3679.

A possible workaround is to use a SimpleStringSchema in the Kafka source,
and chain it with a flatMap operator where you can use your custom
deserializer and handle deserialization errors.

Best,
Yassine

On Aug 27, 2016 02:37, "Jack Huang" <jackhuang@mz.com> wrote:

> Hi all,
>
> I have a custom deserializer which I pass to a Kafka source to transform
> JSON string to Scala case class.
>
> val events = env.addSource(new FlinkKafkaConsumer09[Event]("events", new JsonSerde(classOf[Event],
new Event), kafkaProp))
>
> ‚Äč
>
> There are time when the JSON message is malformed, in which case I want to
> catch the exception, log some error message, and go on to the next message
> without producing an event to the downstream. It doesn't seem like the DeserializationSchema
> interface allows such behavior. How could I achieve this?
>
> Thanks,
> Jack
>

Mime
View raw message